You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 22:11:50 UTC

[24/49] carbondata git commit: [CARBONDATA-1572][Streaming] Support streaming ingest and query

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
new file mode 100644
index 0000000..b10bc8b
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
+import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
+import org.apache.carbondata.core.scan.complextypes.StructQueryType;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Stream input format
+ */
+public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
+
+  public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size";
+  public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
+
+  @Override public RecordReader<Void, Object> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new CarbonStreamRecordReader();
+  }
+
+  public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable,
+      CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache)
+      throws IOException {
+    GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      if (carbonColumns[i].isComplex()) {
+        if (carbonColumns[i].getDataType() == DataTypes.ARRAY) {
+          queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(),
+              carbonColumns[i].getColName(), i);
+        } else if (carbonColumns[i].getDataType() == DataTypes.STRUCT) {
+          queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(),
+              carbonColumns[i].getColName(), i);
+        } else {
+          throw new UnsupportedOperationException(
+              carbonColumns[i].getDataType().getName() + " is not supported");
+        }
+
+        fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache);
+      }
+    }
+
+    return queryTypes;
+  }
+
+  private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType,
+      CarbonDimension dimension, int parentBlockIndex,
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException {
+    for (int i = 0; i < dimension.getNumberOfChild(); i++) {
+      CarbonDimension child = dimension.getListOfChildDimensions().get(i);
+      DataType dataType = child.getDataType();
+      GenericQueryType queryType = null;
+      if (dataType == DataTypes.ARRAY) {
+        queryType =
+            new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
+
+      } else if (dataType == DataTypes.STRUCT) {
+        queryType =
+            new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
+        parentQueryType.addChildren(queryType);
+      } else {
+        boolean isDirectDictionary =
+            CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
+        DictionaryColumnUniqueIdentifier dictionarIdentifier =
+            new DictionaryColumnUniqueIdentifier(carbontable.getCarbonTableIdentifier(),
+                child.getColumnIdentifier(), child.getDataType(),
+                CarbonStorePath.getCarbonTablePath(carbontable.getAbsoluteTableIdentifier()));
+
+        queryType =
+            new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex,
+                child.getDataType(), 4, cache.get(dictionarIdentifier),
+                isDirectDictionary);
+      }
+      parentQueryType.addChildren(queryType);
+      if (child.getNumberOfChild() > 0) {
+        fillChildren(carbontable, queryType, child, parentBlockIndex, cache);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
new file mode 100644
index 0000000..1c21504
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Stream output format
+ */
+public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> {
+
+  static final byte[] CARBON_SYNC_MARKER =
+      "@carbondata_sync".getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+
+  public static final String CARBON_ENCODER_ROW_BUFFER_SIZE = "carbon.stream.row.buffer.size";
+
+  public static final int CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT = 1024;
+
+  public static final String CARBON_STREAM_BLOCKLET_ROW_NUMS = "carbon.stream.blocklet.row.nums";
+
+  public static final int CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT = 32000;
+
+  public static final String CARBON_STREAM_CACHE_SIZE = "carbon.stream.cache.size";
+
+  public static final int CARBON_STREAM_CACHE_SIZE_DEFAULT = 32 * 1024 * 1024;
+
+  private static final String LOAD_Model = "mapreduce.output.carbon.load.model";
+
+  @Override public RecordWriter<Void, Object> getRecordWriter(TaskAttemptContext job)
+      throws IOException, InterruptedException {
+    return new CarbonStreamRecordWriter(job);
+  }
+
+  public static void setCarbonLoadModel(Configuration hadoopConf, CarbonLoadModel carbonLoadModel)
+      throws IOException {
+    if (carbonLoadModel != null) {
+      hadoopConf.set(LOAD_Model, ObjectSerializationUtil.convertObjectToString(carbonLoadModel));
+    }
+  }
+
+  public static CarbonLoadModel getCarbonLoadModel(Configuration hadoopConf) throws IOException {
+    String value = hadoopConf.get(LOAD_Model);
+    if (value == null) {
+      return null;
+    } else {
+      return (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(value);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
new file mode 100644
index 0000000..1ff0fa7
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -0,0 +1,676 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.intf.RowImpl;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.util.CarbonTypeUtil;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Stream record reader
+ */
+public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
+  // vector reader
+  private boolean isVectorReader;
+
+  // metadata
+  private CarbonTable carbonTable;
+  private CarbonColumn[] storageColumns;
+  private boolean[] isRequired;
+  private int[] measureDataTypes;
+  private int dimensionCount;
+  private int measureCount;
+
+  // input
+  private FileSplit fileSplit;
+  private Configuration hadoopConf;
+  private StreamBlockletReader input;
+  private boolean isFirstRow = true;
+  private QueryModel model;
+
+  // decode data
+  private BitSet allNonNull;
+  private boolean[] isNoDictColumn;
+  private DirectDictionaryGenerator[] directDictionaryGenerators;
+  private CacheProvider cacheProvider;
+  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+  private GenericQueryType[] queryTypes;
+
+  // vectorized reader
+  private StructType outputSchema;
+  private ColumnarBatch columnarBatch;
+  private boolean isFinished = false;
+
+  // filter
+  private FilterExecuter filter;
+  private boolean[] isFilterRequired;
+  private Object[] filterValues;
+  private RowIntf filterRow;
+  private int[] filterMap;
+
+  // output
+  private CarbonColumn[] projection;
+  private boolean[] isProjectionRequired;
+  private int[] projectionMap;
+  private Object[] outputValues;
+  private InternalRow outputRow;
+
+  // empty project, null filter
+  private boolean skipScanData;
+
+  @Override public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    // input
+    if (split instanceof CarbonInputSplit) {
+      fileSplit = (CarbonInputSplit) split;
+    } else if (split instanceof CarbonMultiBlockSplit) {
+      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
+    } else {
+      fileSplit = (FileSplit) split;
+    }
+
+    // metadata
+    hadoopConf = context.getConfiguration();
+    if (model == null) {
+      CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+      model = format.getQueryModel(split, context);
+    }
+    carbonTable = model.getTable();
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+    dimensionCount = dimensions.size();
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+    measureCount = measures.size();
+    List<CarbonColumn> carbonColumnList =
+        carbonTable.getStreamStorageOrderColumn(carbonTable.getFactTableName());
+    storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
+    isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
+    directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(storageColumns[i].getDataType());
+      }
+    }
+    measureDataTypes = new int[measureCount];
+    for (int i = 0; i < measureCount; i++) {
+      measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType().getId();
+    }
+
+    // decode data
+    allNonNull = new BitSet(storageColumns.length);
+    projection = model.getProjectionColumns();
+
+    isRequired = new boolean[storageColumns.length];
+    boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
+    boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
+    isFilterRequired = new boolean[storageColumns.length];
+    filterMap = new int[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      if (storageColumns[i].isDimension()) {
+        if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
+          isRequired[i] = true;
+          isFilterRequired[i] = true;
+          filterMap[i] = storageColumns[i].getOrdinal();
+        }
+      } else {
+        if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
+          isRequired[i] = true;
+          isFilterRequired[i] = true;
+          filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
+        }
+      }
+    }
+
+    isProjectionRequired = new boolean[storageColumns.length];
+    projectionMap = new int[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      for (int j = 0; j < projection.length; j++) {
+        if (storageColumns[i].getColName().equals(projection[j].getColName())) {
+          isRequired[i] = true;
+          isProjectionRequired[i] = true;
+          projectionMap[i] = j;
+          break;
+        }
+      }
+    }
+
+    // initialize filter
+    if (null != model.getFilterExpressionResolverTree()) {
+      initializeFilter();
+    } else if (projection.length == 0) {
+      skipScanData = true;
+    }
+
+  }
+
+  private void initializeFilter() {
+
+    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+    for (int i = 0; i < dimLensWithComplex.length; i++) {
+      dimLensWithComplex[i] = Integer.MAX_VALUE;
+    }
+
+    int[] dictionaryColumnCardinality =
+        CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
+    SegmentProperties segmentProperties =
+        new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
+    Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
+
+    FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
+    filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
+        complexDimensionInfoMap);
+    // for row filter, we need update column index
+    FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
+        carbonTable.getDimensionOrdinalMax());
+
+  }
+
+  public void setQueryModel(QueryModel model) {
+    this.model = model;
+  }
+
+  private byte[] getSyncMarker(String filePath) throws IOException {
+    CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
+    FileHeader header = headerReader.readHeader();
+    return header.getSync_marker();
+  }
+
+  private void initializeAtFirstRow() throws IOException {
+    filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
+    filterRow = new RowImpl();
+    filterRow.setValues(filterValues);
+
+    outputValues = new Object[projection.length];
+    outputRow = new GenericInternalRow(outputValues);
+
+    Path file = fileSplit.getPath();
+
+    byte[] syncMarker = getSyncMarker(file.toUri().getPath());
+
+    FileSystem fs = file.getFileSystem(hadoopConf);
+
+    int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
+        CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
+
+    FSDataInputStream fileIn = fs.open(file, bufferSize);
+    fileIn.seek(fileSplit.getStart());
+    input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
+        fileSplit.getStart() == 0);
+
+    cacheProvider = CacheProvider.getInstance();
+    cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, carbonTable.getStorePath());
+    queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
+
+    outputSchema = new StructType(CarbonTypeUtil.convertCarbonSchemaToSparkSchema(projection));
+  }
+
+  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (isFirstRow) {
+      isFirstRow = false;
+      initializeAtFirstRow();
+    }
+    if (isFinished) {
+      return false;
+    }
+
+    if (isVectorReader) {
+      return nextColumnarBatch();
+    }
+
+    return nextRow();
+  }
+
+  /**
+   * for vector reader, check next columnar batch
+   */
+  private boolean nextColumnarBatch() throws IOException {
+    boolean hasNext;
+    boolean scanMore = false;
+    do {
+      // move to the next blocklet
+      hasNext = input.nextBlocklet();
+      if (hasNext) {
+        // read blocklet header
+        BlockletHeader header = input.readBlockletHeader();
+        if (isScanRequired(header)) {
+          scanMore = !scanBlockletAndFillVector(header);
+        } else {
+          input.skipBlockletData(true);
+          scanMore = true;
+        }
+      } else {
+        isFinished = true;
+        scanMore = false;
+      }
+    } while (scanMore);
+    return hasNext;
+  }
+
+  /**
+   * check next Row
+   */
+  private boolean nextRow() throws IOException {
+    // read row one by one
+    try {
+      boolean hasNext;
+      boolean scanMore = false;
+      do {
+        hasNext = input.hasNext();
+        if (hasNext) {
+          if (skipScanData) {
+            input.nextRow();
+            scanMore = false;
+          } else {
+            readRowFromStream();
+            if (null != filter) {
+              scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
+            } else {
+              scanMore = false;
+            }
+          }
+        } else {
+          if (input.nextBlocklet()) {
+            BlockletHeader header = input.readBlockletHeader();
+            if (isScanRequired(header)) {
+              if (skipScanData) {
+                input.skipBlockletData(false);
+              } else {
+                input.readBlockletData(header);
+              }
+            } else {
+              input.skipBlockletData(true);
+            }
+            scanMore = true;
+          } else {
+            isFinished = true;
+            scanMore = false;
+          }
+        }
+      } while (scanMore);
+      return hasNext;
+    } catch (FilterUnsupportedException e) {
+      throw new IOException("Failed to filter row in detail reader", e);
+    }
+  }
+
+  @Override public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override public Object getCurrentValue() throws IOException, InterruptedException {
+    if (isVectorReader) {
+      return columnarBatch;
+    }
+    return outputRow;
+  }
+
+  private boolean isScanRequired(BlockletHeader header) {
+    // TODO require to implement min-max index
+    if (null == filter) {
+      return true;
+    }
+    return true;
+  }
+
+  private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
+    // if filter is null and output projection is empty, use the row number of blocklet header
+    if (skipScanData) {
+      int rowNums = header.getBlocklet_info().getNum_rows();
+      columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums);
+      columnarBatch.setNumRows(rowNums);
+      input.skipBlockletData(true);
+      return rowNums > 0;
+    }
+
+    input.readBlockletData(header);
+    columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums());
+    int rowNum = 0;
+    if (null == filter) {
+      while (input.hasNext()) {
+        readRowFromStream();
+        putRowToColumnBatch(rowNum++);
+      }
+    } else {
+      try {
+        while (input.hasNext()) {
+          readRowFromStream();
+          if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
+            putRowToColumnBatch(rowNum++);
+          }
+        }
+      } catch (FilterUnsupportedException e) {
+        throw new IOException("Failed to filter row in vector reader", e);
+      }
+    }
+    columnarBatch.setNumRows(rowNum);
+    return rowNum > 0;
+  }
+
+  private void readRowFromStream() {
+    input.nextRow();
+    short nullLen = input.readShort();
+    BitSet nullBitSet = allNonNull;
+    if (nullLen > 0) {
+      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+    }
+    int colCount = 0;
+    // primitive type dimension
+    for (; colCount < isNoDictColumn.length; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        if (isNoDictColumn[colCount]) {
+          int v = input.readShort();
+          if (isRequired[colCount]) {
+            byte[] b = input.readBytes(v);
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = b;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b,
+                      storageColumns[colCount].getDataType());
+            }
+          } else {
+            input.skipBytes(v);
+          }
+        } else if (null != directDictionaryGenerators[colCount]) {
+          if (isRequired[colCount]) {
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = input.copy(4);
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
+            } else {
+              input.skipBytes(4);
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        } else {
+          if (isRequired[colCount]) {
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = input.copy(4);
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = input.readInt();
+            } else {
+              input.skipBytes(4);
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        }
+      }
+    }
+    // complex type dimension
+    for (; colCount < dimensionCount; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = null;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        short v = input.readShort();
+        if (isRequired[colCount]) {
+          byte[] b = input.readBytes(v);
+          if (isFilterRequired[colCount]) {
+            filterValues[filterMap[colCount]] = b;
+          }
+          if (isProjectionRequired[colCount]) {
+            outputValues[projectionMap[colCount]] = queryTypes[colCount]
+                .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(b));
+          }
+        } else {
+          input.skipBytes(v);
+        }
+      }
+    }
+    // measure
+    int dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = null;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN_TYPE_ID) {
+          if (isRequired[colCount]) {
+            boolean v = input.readBoolean();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(1);
+          }
+        } else if (dataType == DataTypes.SHORT_TYPE_ID) {
+          if (isRequired[colCount]) {
+            short v = input.readShort();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(2);
+          }
+        } else if (dataType == DataTypes.INT_TYPE_ID) {
+          if (isRequired[colCount]) {
+            int v = input.readInt();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        } else if (dataType == DataTypes.LONG_TYPE_ID) {
+          if (isRequired[colCount]) {
+            long v = input.readLong();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(8);
+          }
+        } else if (dataType == DataTypes.DOUBLE_TYPE_ID) {
+          if (isRequired[colCount]) {
+            double v = input.readDouble();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(8);
+          }
+        } else if (dataType == DataTypes.DECIMAL_TYPE_ID) {
+          int len = input.readShort();
+          if (isRequired[colCount]) {
+            BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(len);
+          }
+        }
+      }
+    }
+  }
+
+  private void putRowToColumnBatch(int rowId) {
+    for (int i = 0; i < projection.length; i++) {
+      Object value = outputValues[i];
+      ColumnVector col = columnarBatch.column(i);
+      DataType t = col.dataType();
+      if (null == value) {
+        col.putNull(rowId);
+      } else {
+        if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+          col.putBoolean(rowId, (boolean)value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+          col.putByte(rowId, (byte) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+          col.putShort(rowId, (short) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+          col.putInt(rowId, (int) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+          col.putLong(rowId, (long) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+          col.putFloat(rowId, (float) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+          col.putDouble(rowId, (double) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+          UTF8String v = (UTF8String) value;
+          col.putByteArray(rowId, v.getBytes());
+        } else if (t instanceof DecimalType) {
+          DecimalType dt = (DecimalType)t;
+          Decimal d = (Decimal) value;
+          if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+            col.putInt(rowId, (int)d.toUnscaledLong());
+          } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+            col.putLong(rowId, d.toUnscaledLong());
+          } else {
+            final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+            byte[] bytes = integer.toByteArray();
+            col.putByteArray(rowId, bytes, 0, bytes.length);
+          }
+        } else if (t instanceof CalendarIntervalType) {
+          CalendarInterval c = (CalendarInterval) value;
+          col.getChildColumn(0).putInt(rowId, c.months);
+          col.getChildColumn(1).putLong(rowId, c.microseconds);
+        } else if (t instanceof DateType) {
+          col.putInt(rowId, (int) value);
+        } else if (t instanceof TimestampType) {
+          col.putLong(rowId, (long) value);
+        }
+      }
+    }
+  }
+
+  @Override public float getProgress() throws IOException, InterruptedException {
+    return 0;
+  }
+
+  public void setVectorReader(boolean isVectorReader) {
+    this.isVectorReader = isVectorReader;
+  }
+
+  @Override public void close() throws IOException {
+    if (null != input) {
+      input.close();
+    }
+    if (null != columnarBatch) {
+      columnarBatch.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
new file mode 100644
index 0000000..8d7a2e3
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.DataLoadProcessBuilder;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.parser.RowParser;
+import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
+import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
+import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Stream record writer
+ */
+public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName());
+
+  // basic info
+  private Configuration hadoopConf;
+  private CarbonDataLoadConfiguration configuration;
+  private CarbonTable carbonTable;
+  private int maxRowNums;
+  private int maxCacheSize;
+
+  // parser and converter
+  private RowParser rowParser;
+  private RowConverter converter;
+  private CarbonRow currentRow = new CarbonRow(null);
+
+  // encoder
+  private DataField[] dataFields;
+  private BitSet nullBitSet;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private int dimensionWithComplexCount;
+  private int measureCount;
+  private int[] measureDataTypes;
+  private StreamBlockletWriter output = null;
+
+  // data write
+  private String segmentDir;
+  private String fileName;
+  private DataOutputStream outputStream;
+  private boolean isFirstRow = true;
+  private boolean hasException = false;
+
+  CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException {
+    initialize(job);
+  }
+
+  private void initialize(TaskAttemptContext job) throws IOException {
+    // set basic information
+    hadoopConf = job.getConfiguration();
+    CarbonLoadModel carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
+    if (carbonLoadModel == null) {
+      throw new IOException(
+          "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model");
+    }
+    carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
+    int taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
+    carbonLoadModel.setTaskNo("" + taskNo);
+    configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel);
+    maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS,
+        CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1;
+    maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE,
+        CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT);
+
+    CarbonTablePath tablePath =
+        CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
+    segmentDir = tablePath.getSegmentDir("0", carbonLoadModel.getSegmentId());
+    fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
+  }
+
+  private void initializeAtFirstRow() throws IOException, InterruptedException {
+    isFirstRow = false;
+
+    // initialize metadata
+    isNoDictionaryDimensionColumn =
+        CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
+    dimensionWithComplexCount = configuration.getDimensionCount();
+    measureCount = configuration.getMeasureCount();
+    dataFields = configuration.getDataFields();
+    measureDataTypes = new int[measureCount];
+    for (int i = 0; i < measureCount; i++) {
+      measureDataTypes[i] =
+          dataFields[dimensionWithComplexCount + i].getColumn().getDataType().getId();
+    }
+
+    // initialize parser and converter
+    rowParser = new RowParserImpl(dataFields, configuration);
+    BadRecordsLogger badRecordLogger =
+        DataConverterProcessorStepImpl.createBadRecordLogger(configuration);
+    converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
+    configuration.setCardinalityFinder(converter);
+    converter.initialize();
+
+    // initialize encoder
+    nullBitSet = new BitSet(dataFields.length);
+    int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
+        CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
+    output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize);
+
+    // initialize data writer
+    String filePath = segmentDir + File.separator + fileName;
+    FileFactory.FileType fileType = FileFactory.getFileType(filePath);
+    CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType);
+    if (carbonFile.exists()) {
+      // if the file is existed, use the append api
+      outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, fileType);
+    } else {
+      // IF the file is not existed, use the create api
+      outputStream = FileFactory.getDataOutputStream(filePath, fileType);
+      writeFileHeader();
+    }
+  }
+
+  @Override public void write(Void key, Object value) throws IOException, InterruptedException {
+    if (isFirstRow) {
+      initializeAtFirstRow();
+    }
+
+    // parse and convert row
+    currentRow.setData(rowParser.parseRow((Object[]) value));
+    converter.convert(currentRow);
+
+    // null bit set
+    nullBitSet.clear();
+    for (int i = 0; i < dataFields.length; i++) {
+      if (null == currentRow.getObject(i)) {
+        nullBitSet.set(i);
+      }
+    }
+    output.nextRow();
+    byte[] b = nullBitSet.toByteArray();
+    output.writeShort(b.length);
+    if (b.length > 0) {
+      output.writeBytes(b);
+    }
+    int dimCount = 0;
+    Object columnValue;
+
+    // primitive type dimension
+    for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+      columnValue = currentRow.getObject(dimCount);
+      if (null != columnValue) {
+        if (isNoDictionaryDimensionColumn[dimCount]) {
+          byte[] col = (byte[]) columnValue;
+          output.writeShort(col.length);
+          output.writeBytes(col);
+        } else {
+          output.writeInt((int) columnValue);
+        }
+      }
+    }
+    // complex type dimension
+    for (; dimCount < dimensionWithComplexCount; dimCount++) {
+      columnValue = currentRow.getObject(dimCount);
+      if (null != columnValue) {
+        byte[] col = (byte[]) columnValue;
+        output.writeShort(col.length);
+        output.writeBytes(col);
+      }
+    }
+    // measure
+    int dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++) {
+      columnValue = currentRow.getObject(dimCount + msrCount);
+      if (null != columnValue) {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN_TYPE_ID) {
+          output.writeBoolean((boolean) columnValue);
+        } else if (dataType == DataTypes.SHORT_TYPE_ID) {
+          output.writeShort((short) columnValue);
+        } else if (dataType == DataTypes.INT_TYPE_ID) {
+          output.writeInt((int) columnValue);
+        } else if (dataType == DataTypes.LONG_TYPE_ID) {
+          output.writeLong((long) columnValue);
+        } else if (dataType == DataTypes.DOUBLE_TYPE_ID) {
+          output.writeDouble((double) columnValue);
+        } else if (dataType == DataTypes.DECIMAL_TYPE_ID) {
+          BigDecimal val = (BigDecimal) columnValue;
+          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+          output.writeShort(bigDecimalInBytes.length);
+          output.writeBytes(bigDecimalInBytes);
+        } else {
+          String msg =
+              "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
+                  .getName();
+          LOGGER.error(msg);
+          throw new IOException(msg);
+        }
+      }
+    }
+
+    if (output.isFull()) {
+      appendBlockletToDataFile();
+    }
+  }
+
+  private void writeFileHeader() throws IOException {
+    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+    for (int i = 0; i < dimLensWithComplex.length; i++) {
+      dimLensWithComplex[i] = Integer.MAX_VALUE;
+    }
+    int[] dictionaryColumnCardinality =
+        CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
+    List<Integer> cardinality = new ArrayList<>();
+    List<org.apache.carbondata.format.ColumnSchema> columnSchemaList = AbstractFactDataWriter
+        .getColumnSchemaListAndCardinality(cardinality, dictionaryColumnCardinality,
+            wrapperColumnSchemaList);
+    FileHeader fileHeader =
+        CarbonMetadataUtil.getFileHeader(true, columnSchemaList, System.currentTimeMillis());
+    fileHeader.setIs_footer_present(false);
+    fileHeader.setIs_splitable(true);
+    fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
+    outputStream.write(CarbonUtil.getByteArray(fileHeader));
+  }
+
+  /**
+   * write a blocklet to file
+   */
+  private void appendBlockletToDataFile() throws IOException {
+    if (output.getRowIndex() == -1) {
+      return;
+    }
+    output.apppendBlocklet(outputStream);
+    outputStream.flush();
+    // reset data
+    output.reset();
+  }
+
+  @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+    try {
+      // append remain buffer data
+      if (!hasException) {
+        appendBlockletToDataFile();
+        converter.finish();
+      }
+    } finally {
+      // close resource
+      CarbonUtil.closeStreams(outputStream);
+      output.close();
+    }
+  }
+
+  public String getSegmentDir() {
+    return segmentDir;
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
new file mode 100644
index 0000000..eafb142
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.BlockletHeader;
+
+/**
+ * stream blocklet reader
+ */
+public class StreamBlockletReader {
+
+  private byte[] buffer;
+  private int offset;
+  private final byte[] syncMarker;
+  private final byte[] syncBuffer;
+  private final int syncLen;
+  private long pos = 0;
+  private final InputStream in;
+  private final long limitStart;
+  private final long limitEnd;
+  private boolean isAlreadySync = false;
+  private Compressor compressor = CompressorFactory.getInstance().getCompressor();
+  private int rowNums = 0;
+  private int rowIndex = 0;
+  private boolean isHeaderPresent;
+
+  StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) {
+    this.syncMarker = syncMarker;
+    this.syncLen = syncMarker.length;
+    this.syncBuffer = new byte[syncMarker.length];
+    this.in = in;
+    this.limitStart = limit;
+    this.limitEnd = limitStart + syncLen;
+    this.isHeaderPresent = isHeaderPresent;
+  }
+
+  private void ensureCapacity(int capacity) {
+    if (buffer == null || capacity > buffer.length) {
+      buffer = new byte[capacity];
+    }
+  }
+
+  /**
+   * find the first position of sync_marker in input stream
+   */
+  private boolean sync() throws IOException {
+    int len = in.read(syncBuffer);
+    if (len < syncLen) {
+      return false;
+    }
+    pos += syncLen;
+    boolean skipHeader = false;
+    for (int i = 0; i < limitStart; i++) {
+      int j = 0;
+      for (; j < syncLen; j++) {
+        if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break;
+      }
+      if (syncLen == j) {
+        if (isHeaderPresent) {
+          if (skipHeader) {
+            return true;
+          } else {
+            skipHeader = true;
+          }
+        } else {
+          return true;
+        }
+      }
+      int value = in.read();
+      if (-1 == value) {
+        return false;
+      }
+      syncBuffer[i % syncLen] = (byte) value;
+      pos++;
+    }
+    return false;
+  }
+
+  BlockletHeader readBlockletHeader() throws IOException {
+    int len = readIntFromStream();
+    byte[] b = new byte[len];
+    readBytesFromStream(b);
+    BlockletHeader header = CarbonUtil.readBlockletHeader(b);
+    rowNums = header.getBlocklet_info().getNum_rows();
+    rowIndex = 0;
+    return header;
+  }
+
+  void readBlockletData(BlockletHeader header) throws IOException {
+    ensureCapacity(header.getBlocklet_length());
+    offset = 0;
+    int len = readIntFromStream();
+    byte[] b = new byte[len];
+    readBytesFromStream(b);
+    compressor.rawUncompress(b, buffer);
+  }
+
+  void skipBlockletData(boolean reset) throws IOException {
+    int len = readIntFromStream();
+    skip(len);
+    pos += len;
+    if (reset) {
+      this.rowNums = 0;
+      this.rowIndex = 0;
+    }
+  }
+
+  private void skip(int len) throws IOException {
+    long remaining = len;
+    do {
+      long skipLen = in.skip(remaining);
+      remaining -= skipLen;
+    } while (remaining > 0);
+  }
+
+  /**
+   * find the next blocklet
+   */
+  boolean nextBlocklet() throws IOException {
+    if (pos >= limitStart) {
+      return false;
+    }
+    if (isAlreadySync) {
+      int v = in.read(syncBuffer);
+      if (v < syncLen) {
+        return false;
+      }
+      pos += syncLen;
+    } else {
+      isAlreadySync = true;
+      if (!sync()) {
+        return false;
+      }
+    }
+
+    return pos < limitEnd;
+  }
+
+  boolean hasNext() throws IOException {
+    return rowIndex < rowNums;
+  }
+
+  void nextRow() {
+    rowIndex++;
+  }
+
+  int readIntFromStream() throws IOException {
+    int ch1 = in.read();
+    int ch2 = in.read();
+    int ch3 = in.read();
+    int ch4 = in.read();
+    if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
+    pos += 4;
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+  }
+
+  void readBytesFromStream(byte[] b) throws IOException {
+    int len = in.read(b, 0, b.length);
+    if (len < b.length) {
+      throw new EOFException();
+    }
+    pos += b.length;
+  }
+
+  boolean readBoolean() {
+    return (buffer[offset++]) != 0;
+  }
+
+  short readShort() {
+    short v =  (short) ((buffer[offset + 1] & 255) +
+        ((buffer[offset]) << 8));
+    offset += 2;
+    return v;
+  }
+
+  byte[] copy(int len) {
+    byte[] b = new byte[len];
+    System.arraycopy(buffer, offset, b, 0, len);
+    return b;
+  }
+
+  int readInt() {
+    int v = ((buffer[offset + 3] & 255) +
+        ((buffer[offset + 2] & 255) << 8) +
+        ((buffer[offset + 1] & 255) << 16) +
+        ((buffer[offset]) << 24));
+    offset += 4;
+    return v;
+  }
+
+  long readLong() {
+    long v = ((long)(buffer[offset + 7] & 255)) +
+        ((long) (buffer[offset + 6] & 255) << 8) +
+        ((long) (buffer[offset + 5] & 255) << 16) +
+        ((long) (buffer[offset + 4] & 255) << 24) +
+        ((long) (buffer[offset + 3] & 255) << 32) +
+        ((long) (buffer[offset + 2] & 255) << 40) +
+        ((long) (buffer[offset + 1] & 255) << 48) +
+        ((long) (buffer[offset]) << 56);
+    offset += 8;
+    return v;
+  }
+
+  double readDouble() {
+    return Double.longBitsToDouble(readLong());
+  }
+
+  byte[] readBytes(int len) {
+    byte[] b = new byte[len];
+    System.arraycopy(buffer, offset, b, 0, len);
+    offset += len;
+    return b;
+  }
+
+  void skipBytes(int len) {
+    offset += len;
+  }
+
+  int getRowNums() {
+    return rowNums;
+  }
+
+  void close() {
+    CarbonUtil.closeStreams(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
new file mode 100644
index 0000000..a0328b3
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.BlockletInfo;
+import org.apache.carbondata.format.MutationType;
+
+/**
+ * stream blocklet writer
+ */
+public class StreamBlockletWriter {
+  private byte[] buffer;
+  private int maxSize;
+  private int maxRowNum;
+  private int rowSize;
+  private int count = 0;
+  private int rowIndex = -1;
+  private Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+  StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize) {
+    buffer = new byte[maxSize];
+    this.maxSize = maxSize;
+    this.maxRowNum = maxRowNum;
+    this.rowSize = rowSize;
+  }
+
+  private void ensureCapacity(int space) {
+    int newcount = space + count;
+    if (newcount > buffer.length) {
+      byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)];
+      System.arraycopy(buffer, 0, newbuf, 0, count);
+      buffer = newbuf;
+    }
+  }
+
+  void reset() {
+    count = 0;
+    rowIndex = -1;
+  }
+
+  byte[] getBytes() {
+    return buffer;
+  }
+
+  int getCount() {
+    return count;
+  }
+
+  int getRowIndex() {
+    return rowIndex;
+  }
+
+  void nextRow() {
+    rowIndex++;
+  }
+
+  boolean isFull() {
+    return rowIndex == maxRowNum || count >= maxSize;
+  }
+
+  void writeBoolean(boolean val) {
+    ensureCapacity(1);
+    buffer[count] = (byte) (val ? 1 : 0);
+    count += 1;
+  }
+
+  void writeShort(int val) {
+    ensureCapacity(2);
+    buffer[count + 1] = (byte) (val);
+    buffer[count] = (byte) (val >>> 8);
+    count += 2;
+  }
+
+  void writeInt(int val) {
+    ensureCapacity(4);
+    buffer[count + 3] = (byte) (val);
+    buffer[count + 2] = (byte) (val >>> 8);
+    buffer[count + 1] = (byte) (val >>> 16);
+    buffer[count] = (byte) (val >>> 24);
+    count += 4;
+  }
+
+  void writeLong(long val) {
+    ensureCapacity(8);
+    buffer[count + 7] = (byte) (val);
+    buffer[count + 6] = (byte) (val >>> 8);
+    buffer[count + 5] = (byte) (val >>> 16);
+    buffer[count + 4] = (byte) (val >>> 24);
+    buffer[count + 3] = (byte) (val >>> 32);
+    buffer[count + 2] = (byte) (val >>> 40);
+    buffer[count + 1] = (byte) (val >>> 48);
+    buffer[count] = (byte) (val >>> 56);
+    count += 8;
+  }
+
+  void writeDouble(double val) {
+    writeLong(Double.doubleToLongBits(val));
+  }
+
+  void writeBytes(byte[] b) {
+    writeBytes(b, 0, b.length);
+  }
+
+  void writeBytes(byte[] b, int off, int len) {
+    ensureCapacity(len);
+    System.arraycopy(b, off, buffer, count, len);
+    count += len;
+  }
+
+  void apppendBlocklet(DataOutputStream outputStream) throws IOException {
+    outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
+
+    BlockletInfo blockletInfo = new BlockletInfo();
+    blockletInfo.setNum_rows(getRowIndex() + 1);
+    BlockletHeader blockletHeader = new BlockletHeader();
+    blockletHeader.setBlocklet_length(getCount());
+    blockletHeader.setMutation(MutationType.INSERT);
+    blockletHeader.setBlocklet_info(blockletInfo);
+    byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader);
+    outputStream.writeInt(headerBytes.length);
+    outputStream.write(headerBytes);
+
+    byte[] compressed = compressor.compressByte(getBytes(), getCount());
+    outputStream.writeInt(compressed.length);
+    outputStream.write(compressed);
+  }
+
+  void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index a559cc4..b4444be 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -109,12 +109,14 @@ public class CarbonInputFormatUtil {
     plan.addDimension(queryDimension);
   }
 
-  public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable) {
+  public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable,
+      boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
     List<CarbonMeasure> measures =
         carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
-    QueryModel.processFilterExpression(filterExpression, dimensions, measures);
+    QueryModel.processFilterExpression(filterExpression, dimensions, measures,
+        isFilterDimensions, isFilterMeasures);
 
     if (null != filterExpression) {
       // Optimize Filter Expression and fit RANGE filters is conditions apply.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
new file mode 100644
index 0000000..395015e
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.util;
+
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+
+public class CarbonTypeUtil {
+
+  public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
+      DataType carbonDataType) {
+    if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
+      return DataTypes.StringType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
+      return DataTypes.ShortType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
+      return DataTypes.IntegerType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+      return DataTypes.LongType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
+      return DataTypes.DoubleType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
+      return DataTypes.BooleanType;
+    } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
+      return DataTypes.createDecimalType();
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
+      return DataTypes.TimestampType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
+      return DataTypes.DateType;
+    } else {
+      return null;
+    }
+  }
+
+  public static StructField[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) {
+    StructField[] fields = new StructField[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      CarbonColumn carbonColumn = carbonColumns[i];
+      if (carbonColumn.isDimension()) {
+        if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+              .getDirectDictionaryGenerator(carbonColumn.getDataType());
+          fields[i] = new StructField(carbonColumn.getColName(),
+              CarbonTypeUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null);
+        } else if (!carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+        } else if (carbonColumn.isComplex()) {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+        } else {
+          fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil
+              .convertCarbonToSparkDataType(
+                  org.apache.carbondata.core.metadata.datatype.DataTypes.INT), true, null);
+        }
+      } else if (carbonColumn.isMeasure()) {
+        DataType dataType = carbonColumn.getDataType();
+        if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              CarbonTypeUtil.convertCarbonToSparkDataType(dataType), true, null);
+        } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
+          CarbonMeasure measure = (CarbonMeasure) carbonColumn;
+          fields[i] = new StructField(carbonColumn.getColName(),
+              new DecimalType(measure.getPrecision(), measure.getScale()), true, null);
+        } else {
+          fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil
+              .convertCarbonToSparkDataType(
+                  org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE), true, null);
+        }
+      }
+    }
+    return fields;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index ea90bbf..29d8d03 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -116,6 +116,45 @@ public class StoreCreator {
     return absoluteTableIdentifier;
   }
 
+  public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath) {
+    CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
+    CarbonLoadModel loadModel = new CarbonLoadModel();
+    loadModel.setCarbonDataLoadSchema(schema);
+    loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
+    loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+    loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+    loadModel.setFactFilePath(factFilePath);
+    loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
+    loadModel.setStorePath(absoluteTableIdentifier.getStorePath());
+    loadModel.setDateFormat(null);
+    loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));
+    loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+    loadModel
+        .setSerializationNullFormat(
+            TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
+    loadModel
+        .setBadRecordsLoggerEnable(
+            TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false");
+    loadModel
+        .setBadRecordsAction(
+            TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE");
+    loadModel
+        .setIsEmptyDataBadRecord(
+            DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false");
+    loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary");
+    loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
+    loadModel.setTaskNo("0");
+    loadModel.setSegmentId("0");
+    loadModel.setPartitionId("0");
+    loadModel.setFactTimeStamp(System.currentTimeMillis());
+    loadModel.setMaxColumns("10");
+    return loadModel;
+  }
+
   /**
    * Create store without any restructure
    */
@@ -131,42 +170,7 @@ public class StoreCreator {
 
       CarbonTable table = createTable();
       writeDictionary(factFilePath, table);
-      CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
-      CarbonLoadModel loadModel = new CarbonLoadModel();
-      String partitionId = "0";
-      loadModel.setCarbonDataLoadSchema(schema);
-      loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
-      loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
-      loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
-      loadModel.setFactFilePath(factFilePath);
-      loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
-      loadModel.setStorePath(absoluteTableIdentifier.getStorePath());
-      loadModel.setDateFormat(null);
-      loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
-          CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-          CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));
-      loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
-          CarbonCommonConstants.CARBON_DATE_FORMAT,
-          CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
-      loadModel
-          .setSerializationNullFormat(
-              TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
-      loadModel
-          .setBadRecordsLoggerEnable(
-              TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false");
-      loadModel
-          .setBadRecordsAction(
-              TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE");
-      loadModel
-          .setIsEmptyDataBadRecord(
-              DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false");
-      loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary");
-      loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
-      loadModel.setTaskNo("0");
-      loadModel.setSegmentId("0");
-      loadModel.setPartitionId("0");
-      loadModel.setFactTimeStamp(System.currentTimeMillis());
-      loadModel.setMaxColumns("10");
+      CarbonLoadModel loadModel = buildCarbonLoadModel(table, factFilePath);
 
       executeGraph(loadModel, absoluteTableIdentifier.getStorePath());
 
@@ -176,7 +180,7 @@ public class StoreCreator {
 
   }
 
-  private static CarbonTable createTable() throws IOException {
+  public static CarbonTable createTable() throws IOException {
     TableInfo tableInfo = new TableInfo();
     tableInfo.setStorePath(absoluteTableIdentifier.getStorePath());
     tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 2e840c0..4cbc692 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -135,7 +135,7 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
         QueryModel.createModel(identifier, queryPlan, carbonTable, new DataTypeConverterImpl());
     // set the filter to the query model in order to filter blocklet before scan
     Expression filter = getFilterPredicates(configuration);
-    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
     FilterResolverIntf filterIntf =
         CarbonInputFormatUtil.resolveFilter(filter, identifier, tableProvider);
     queryModel.setFilterExpressionResolverTree(filterIntf);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
index f129474..5e3e5b7 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.AbstractRecordReader;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.util.CarbonTypeUtil;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index fc34127..7ec6b7b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -21,6 +21,8 @@ import java.text.SimpleDateFormat
 import java.util.{ArrayList, Date, List}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
@@ -38,9 +40,11 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, TaskMetricsMap}
+import org.apache.carbondata.core.statusmanager.FileFormat
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, DataTypeUtil, TaskMetricsMap}
 import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
 import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
@@ -82,11 +86,45 @@ class CarbonScanRDD(
 
     // get splits
     val splits = format.getSplits(job)
-    val result = distributeSplits(splits)
-    result
+
+    // separate split
+    // 1. for batch splits, invoke distributeSplits method to create partitions
+    // 2. for stream splits, create partition for each split by default
+    val columnarSplits = new ArrayList[InputSplit]()
+    val streamSplits = new ArrayBuffer[InputSplit]()
+    splits.asScala.foreach { split =>
+      val carbonInputSplit = split.asInstanceOf[CarbonInputSplit]
+      if (FileFormat.rowformat == carbonInputSplit.getFileFormat) {
+        streamSplits += split
+      } else {
+        columnarSplits.add(split)
+      }
+    }
+    val batchPartitions = distributeColumnarSplits(columnarSplits)
+    if (streamSplits.isEmpty) {
+      batchPartitions.toArray
+    } else {
+      val index = batchPartitions.length
+      val streamPartitions: mutable.Buffer[Partition] =
+        streamSplits.zipWithIndex.map { splitWithIndex =>
+          val multiBlockSplit =
+            new CarbonMultiBlockSplit(identifier,
+              Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
+              splitWithIndex._1.getLocations,
+              FileFormat.rowformat)
+          new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit)
+        }
+      if (batchPartitions.isEmpty) {
+        streamPartitions.toArray
+      } else {
+        // should keep the order by index of partition
+        batchPartitions.appendAll(streamPartitions)
+        batchPartitions.toArray
+      }
+    }
   }
 
-  private def distributeSplits(splits: List[InputSplit]): Array[Partition] = {
+  private def distributeColumnarSplits(splits: List[InputSplit]): mutable.Buffer[Partition] = {
     // this function distributes the split based on following logic:
     // 1. based on data locality, to make split balanced on all available nodes
     // 2. if the number of split for one
@@ -190,7 +228,7 @@ class CarbonScanRDD(
          | no.of.nodes: $noOfNodes,
          | parallelism: $parallelism
        """.stripMargin)
-    result.toArray(new Array[Partition](result.size()))
+    result.asScala
   }
 
   override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
@@ -210,20 +248,34 @@ class CarbonScanRDD(
     inputMetricsStats.initBytesReadCallback(context, inputSplit)
     val iterator = if (inputSplit.getAllSplits.size() > 0) {
       val model = format.getQueryModel(inputSplit, attemptContext)
-      val reader = {
-        if (vectorReader) {
-          val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats)
-          if (carbonRecordReader == null) {
-            new CarbonRecordReader(model,
-              format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats)
+      // get RecordReader by FileFormat
+      val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
+        case FileFormat.rowformat =>
+          // create record reader for row format
+          DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+          val inputFormat = new CarbonStreamInputFormat
+          val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
+            .asInstanceOf[CarbonStreamRecordReader]
+          streamReader.setVectorReader(vectorReader)
+          model.setStatisticsRecorder(
+            CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
+          streamReader.setQueryModel(model)
+          streamReader
+        case _ =>
+          // create record reader for CarbonData file format
+          if (vectorReader) {
+            val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats)
+            if (carbonRecordReader == null) {
+              new CarbonRecordReader(model,
+                format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats)
+            } else {
+              carbonRecordReader
+            }
           } else {
-            carbonRecordReader
+            new CarbonRecordReader(model,
+              format.getReadSupportClass(attemptContext.getConfiguration),
+              inputMetricsStats)
           }
-        } else {
-          new CarbonRecordReader(model,
-            format.getReadSupportClass(attemptContext.getConfiguration),
-            inputMetricsStats)
-        }
       }
 
       reader.initialize(inputSplit, attemptContext)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index ed01728..18e37ad 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -36,7 +36,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-spark-common</artifactId>
+      <artifactId>carbondata-streaming</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index d5adc2f..10336eb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -23,9 +23,11 @@ import org.apache.spark.CarbonInputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.management.LoadTableByInsertCommand
+import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, StreamSinkProvider}
+import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -36,6 +38,7 @@ import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.streaming.StreamSinkFactory
 
 case class CarbonDatasourceHadoopRelation(
     sparkSession: SparkSession,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index eeca8b8..6020eee 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
+import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener
 import org.apache.spark.sql.hive.CarbonSessionState
 import org.apache.spark.sql.internal.{SessionState, SharedState}
 import org.apache.spark.util.Utils
@@ -168,6 +169,7 @@ object CarbonSession {
             SparkSession.sqlListener.set(null)
           }
         })
+        session.streams.addListener(new CarbonStreamingQueryListener(session))
       }
 
       return session