You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/28 03:19:43 UTC

[2/5] carbondata git commit: [CARBONDATA-2165]Remove spark in carbon-hadoop module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
new file mode 100644
index 0000000..69d2a3b
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.intf.RowImpl;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.InputMetricsStats;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Stream record reader
+ */
+public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
+  // vector reader
+  private boolean isVectorReader;
+
+  // metadata
+  private CarbonTable carbonTable;
+  private CarbonColumn[] storageColumns;
+  private boolean[] isRequired;
+  private DataType[] measureDataTypes;
+  private int dimensionCount;
+  private int measureCount;
+
+  // input
+  private FileSplit fileSplit;
+  private Configuration hadoopConf;
+  private StreamBlockletReader input;
+  private boolean isFirstRow = true;
+  private QueryModel model;
+
+  // decode data
+  private BitSet allNonNull;
+  private boolean[] isNoDictColumn;
+  private DirectDictionaryGenerator[] directDictionaryGenerators;
+  private CacheProvider cacheProvider;
+  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+  private GenericQueryType[] queryTypes;
+
+  // vectorized reader
+  private StructType outputSchema;
+  private ColumnarBatch columnarBatch;
+  private boolean isFinished = false;
+
+  // filter
+  private FilterExecuter filter;
+  private boolean[] isFilterRequired;
+  private Object[] filterValues;
+  private RowIntf filterRow;
+  private int[] filterMap;
+
+  // output
+  private CarbonColumn[] projection;
+  private boolean[] isProjectionRequired;
+  private int[] projectionMap;
+  private Object[] outputValues;
+  private InternalRow outputRow;
+
+  // empty project, null filter
+  private boolean skipScanData;
+
+  // return raw row for handoff
+  private boolean useRawRow = false;
+
+  // InputMetricsStats
+  private InputMetricsStats inputMetricsStats;
+
+  @Override public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    // input
+    if (split instanceof CarbonInputSplit) {
+      fileSplit = (CarbonInputSplit) split;
+    } else if (split instanceof CarbonMultiBlockSplit) {
+      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
+    } else {
+      fileSplit = (FileSplit) split;
+    }
+
+    // metadata
+    hadoopConf = context.getConfiguration();
+    if (model == null) {
+      CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+      model = format.createQueryModel(split, context);
+    }
+    carbonTable = model.getTable();
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
+    dimensionCount = dimensions.size();
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
+    measureCount = measures.size();
+    List<CarbonColumn> carbonColumnList =
+        carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
+    storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
+    isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
+    directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(storageColumns[i].getDataType());
+      }
+    }
+    measureDataTypes = new DataType[measureCount];
+    for (int i = 0; i < measureCount; i++) {
+      measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
+    }
+
+    // decode data
+    allNonNull = new BitSet(storageColumns.length);
+    projection = model.getProjectionColumns();
+
+    isRequired = new boolean[storageColumns.length];
+    boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
+    boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
+    isFilterRequired = new boolean[storageColumns.length];
+    filterMap = new int[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      if (storageColumns[i].isDimension()) {
+        if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
+          isRequired[i] = true;
+          isFilterRequired[i] = true;
+          filterMap[i] = storageColumns[i].getOrdinal();
+        }
+      } else {
+        if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
+          isRequired[i] = true;
+          isFilterRequired[i] = true;
+          filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
+        }
+      }
+    }
+
+    isProjectionRequired = new boolean[storageColumns.length];
+    projectionMap = new int[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      for (int j = 0; j < projection.length; j++) {
+        if (storageColumns[i].getColName().equals(projection[j].getColName())) {
+          isRequired[i] = true;
+          isProjectionRequired[i] = true;
+          projectionMap[i] = j;
+          break;
+        }
+      }
+    }
+
+    // initialize filter
+    if (null != model.getFilterExpressionResolverTree()) {
+      initializeFilter();
+    } else if (projection.length == 0) {
+      skipScanData = true;
+    }
+
+  }
+
+  private void initializeFilter() {
+
+    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
+    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+    for (int i = 0; i < dimLensWithComplex.length; i++) {
+      dimLensWithComplex[i] = Integer.MAX_VALUE;
+    }
+
+    int[] dictionaryColumnCardinality =
+        CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
+    SegmentProperties segmentProperties =
+        new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
+    Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
+
+    FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
+    filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
+        complexDimensionInfoMap);
+    // for row filter, we need update column index
+    FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
+        carbonTable.getDimensionOrdinalMax());
+
+  }
+
+  public void setQueryModel(QueryModel model) {
+    this.model = model;
+  }
+
+  private byte[] getSyncMarker(String filePath) throws IOException {
+    CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
+    FileHeader header = headerReader.readHeader();
+    return header.getSync_marker();
+  }
+
+  public void setUseRawRow(boolean useRawRow) {
+    this.useRawRow = useRawRow;
+  }
+
+  private void initializeAtFirstRow() throws IOException {
+    filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
+    filterRow = new RowImpl();
+    filterRow.setValues(filterValues);
+
+    outputValues = new Object[projection.length];
+    outputRow = new GenericInternalRow(outputValues);
+
+    Path file = fileSplit.getPath();
+
+    byte[] syncMarker = getSyncMarker(file.toString());
+
+    FileSystem fs = file.getFileSystem(hadoopConf);
+
+    int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
+        CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
+
+    FSDataInputStream fileIn = fs.open(file, bufferSize);
+    fileIn.seek(fileSplit.getStart());
+    input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
+        fileSplit.getStart() == 0);
+
+    cacheProvider = CacheProvider.getInstance();
+    cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
+    queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
+
+    outputSchema = new StructType((StructField[])
+        DataTypeUtil.getDataTypeConverter().convertCarbonSchemaToSparkSchema(projection));
+  }
+
+  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (isFirstRow) {
+      isFirstRow = false;
+      initializeAtFirstRow();
+    }
+    if (isFinished) {
+      return false;
+    }
+
+    if (isVectorReader) {
+      return nextColumnarBatch();
+    }
+
+    return nextRow();
+  }
+
+  /**
+   * for vector reader, check next columnar batch
+   */
+  private boolean nextColumnarBatch() throws IOException {
+    boolean hasNext;
+    boolean scanMore = false;
+    do {
+      // move to the next blocklet
+      hasNext = input.nextBlocklet();
+      if (hasNext) {
+        // read blocklet header
+        BlockletHeader header = input.readBlockletHeader();
+        if (isScanRequired(header)) {
+          scanMore = !scanBlockletAndFillVector(header);
+        } else {
+          input.skipBlockletData(true);
+          scanMore = true;
+        }
+      } else {
+        isFinished = true;
+        scanMore = false;
+      }
+    } while (scanMore);
+    return hasNext;
+  }
+
+  /**
+   * check next Row
+   */
+  private boolean nextRow() throws IOException {
+    // read row one by one
+    try {
+      boolean hasNext;
+      boolean scanMore = false;
+      do {
+        hasNext = input.hasNext();
+        if (hasNext) {
+          if (skipScanData) {
+            input.nextRow();
+            scanMore = false;
+          } else {
+            if (useRawRow) {
+              // read raw row for streaming handoff which does not require decode raw row
+              readRawRowFromStream();
+            } else {
+              readRowFromStream();
+            }
+            if (null != filter) {
+              scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
+            } else {
+              scanMore = false;
+            }
+          }
+        } else {
+          if (input.nextBlocklet()) {
+            BlockletHeader header = input.readBlockletHeader();
+            if (isScanRequired(header)) {
+              if (skipScanData) {
+                input.skipBlockletData(false);
+              } else {
+                input.readBlockletData(header);
+              }
+            } else {
+              input.skipBlockletData(true);
+            }
+            scanMore = true;
+          } else {
+            isFinished = true;
+            scanMore = false;
+          }
+        }
+      } while (scanMore);
+      return hasNext;
+    } catch (FilterUnsupportedException e) {
+      throw new IOException("Failed to filter row in detail reader", e);
+    }
+  }
+
+  @Override public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override public Object getCurrentValue() throws IOException, InterruptedException {
+    if (isVectorReader) {
+      int value = columnarBatch.numValidRows();
+      if (inputMetricsStats != null) {
+        inputMetricsStats.incrementRecordRead((long) value);
+      }
+
+      return columnarBatch;
+    }
+
+    if (inputMetricsStats != null) {
+      inputMetricsStats.incrementRecordRead(1L);
+    }
+
+    return outputRow;
+  }
+
+  private boolean isScanRequired(BlockletHeader header) {
+    // TODO require to implement min-max index
+    if (null == filter) {
+      return true;
+    }
+    return true;
+  }
+
+  private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
+    // if filter is null and output projection is empty, use the row number of blocklet header
+    if (skipScanData) {
+      int rowNums = header.getBlocklet_info().getNum_rows();
+      columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums);
+      columnarBatch.setNumRows(rowNums);
+      input.skipBlockletData(true);
+      return rowNums > 0;
+    }
+
+    input.readBlockletData(header);
+    columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums());
+    int rowNum = 0;
+    if (null == filter) {
+      while (input.hasNext()) {
+        readRowFromStream();
+        putRowToColumnBatch(rowNum++);
+      }
+    } else {
+      try {
+        while (input.hasNext()) {
+          readRowFromStream();
+          if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
+            putRowToColumnBatch(rowNum++);
+          }
+        }
+      } catch (FilterUnsupportedException e) {
+        throw new IOException("Failed to filter row in vector reader", e);
+      }
+    }
+    columnarBatch.setNumRows(rowNum);
+    return rowNum > 0;
+  }
+
+  private void readRowFromStream() {
+    input.nextRow();
+    short nullLen = input.readShort();
+    BitSet nullBitSet = allNonNull;
+    if (nullLen > 0) {
+      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+    }
+    int colCount = 0;
+    // primitive type dimension
+    for (; colCount < isNoDictColumn.length; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        if (isNoDictColumn[colCount]) {
+          int v = input.readShort();
+          if (isRequired[colCount]) {
+            byte[] b = input.readBytes(v);
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = b;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b,
+                      storageColumns[colCount].getDataType());
+            }
+          } else {
+            input.skipBytes(v);
+          }
+        } else if (null != directDictionaryGenerators[colCount]) {
+          if (isRequired[colCount]) {
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = input.copy(4);
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
+            } else {
+              input.skipBytes(4);
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        } else {
+          if (isRequired[colCount]) {
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = input.copy(4);
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = input.readInt();
+            } else {
+              input.skipBytes(4);
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        }
+      }
+    }
+    // complex type dimension
+    for (; colCount < dimensionCount; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = null;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        short v = input.readShort();
+        if (isRequired[colCount]) {
+          byte[] b = input.readBytes(v);
+          if (isFilterRequired[colCount]) {
+            filterValues[filterMap[colCount]] = b;
+          }
+          if (isProjectionRequired[colCount]) {
+            outputValues[projectionMap[colCount]] = queryTypes[colCount]
+                .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(b));
+          }
+        } else {
+          input.skipBytes(v);
+        }
+      }
+    }
+    // measure
+    DataType dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = null;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          if (isRequired[colCount]) {
+            boolean v = input.readBoolean();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(1);
+          }
+        } else if (dataType == DataTypes.SHORT) {
+          if (isRequired[colCount]) {
+            short v = input.readShort();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(2);
+          }
+        } else if (dataType == DataTypes.INT) {
+          if (isRequired[colCount]) {
+            int v = input.readInt();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        } else if (dataType == DataTypes.LONG) {
+          if (isRequired[colCount]) {
+            long v = input.readLong();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(8);
+          }
+        } else if (dataType == DataTypes.DOUBLE) {
+          if (isRequired[colCount]) {
+            double v = input.readDouble();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(8);
+          }
+        } else if (DataTypes.isDecimal(dataType)) {
+          int len = input.readShort();
+          if (isRequired[colCount]) {
+            BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v);
+            }
+          } else {
+            input.skipBytes(len);
+          }
+        }
+      }
+    }
+  }
+
+  private void readRawRowFromStream() {
+    input.nextRow();
+    short nullLen = input.readShort();
+    BitSet nullBitSet = allNonNull;
+    if (nullLen > 0) {
+      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+    }
+    int colCount = 0;
+    // primitive type dimension
+    for (; colCount < isNoDictColumn.length; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      } else {
+        if (isNoDictColumn[colCount]) {
+          int v = input.readShort();
+          outputValues[colCount] = input.readBytes(v);
+        } else {
+          outputValues[colCount] = input.readInt();
+        }
+      }
+    }
+    // complex type dimension
+    for (; colCount < dimensionCount; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = null;
+      } else {
+        short v = input.readShort();
+        outputValues[colCount] = input.readBytes(v);
+      }
+    }
+    // measure
+    DataType dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = null;
+      } else {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          outputValues[colCount] = input.readBoolean();
+        } else if (dataType == DataTypes.SHORT) {
+          outputValues[colCount] = input.readShort();
+        } else if (dataType == DataTypes.INT) {
+          outputValues[colCount] = input.readInt();
+        } else if (dataType == DataTypes.LONG) {
+          outputValues[colCount] = input.readLong();
+        } else if (dataType == DataTypes.DOUBLE) {
+          outputValues[colCount] = input.readDouble();
+        } else if (DataTypes.isDecimal(dataType)) {
+          int len = input.readShort();
+          outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+        }
+      }
+    }
+  }
+
+  private void putRowToColumnBatch(int rowId) {
+    for (int i = 0; i < projection.length; i++) {
+      Object value = outputValues[i];
+      ColumnVector col = columnarBatch.column(i);
+      org.apache.spark.sql.types.DataType t = col.dataType();
+      if (null == value) {
+        col.putNull(rowId);
+      } else {
+        if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+          col.putBoolean(rowId, (boolean)value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+          col.putByte(rowId, (byte) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+          col.putShort(rowId, (short) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+          col.putInt(rowId, (int) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+          col.putLong(rowId, (long) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+          col.putFloat(rowId, (float) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+          col.putDouble(rowId, (double) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+          UTF8String v = (UTF8String) value;
+          col.putByteArray(rowId, v.getBytes());
+        } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+          DecimalType dt = (DecimalType)t;
+          Decimal d = Decimal.fromDecimal(value);
+          if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+            col.putInt(rowId, (int)d.toUnscaledLong());
+          } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+            col.putLong(rowId, d.toUnscaledLong());
+          } else {
+            final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+            byte[] bytes = integer.toByteArray();
+            col.putByteArray(rowId, bytes, 0, bytes.length);
+          }
+        } else if (t instanceof CalendarIntervalType) {
+          CalendarInterval c = (CalendarInterval) value;
+          col.getChildColumn(0).putInt(rowId, c.months);
+          col.getChildColumn(1).putLong(rowId, c.microseconds);
+        } else if (t instanceof org.apache.spark.sql.types.DateType) {
+          col.putInt(rowId, (int) value);
+        } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+          col.putLong(rowId, (long) value);
+        }
+      }
+    }
+  }
+
+  @Override public float getProgress() throws IOException, InterruptedException {
+    return 0;
+  }
+
+  public void setVectorReader(boolean isVectorReader) {
+    this.isVectorReader = isVectorReader;
+  }
+
+  public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
+    this.inputMetricsStats = inputMetricsStats;
+  }
+
+  @Override public void close() throws IOException {
+    if (null != input) {
+      input.close();
+    }
+    if (null != columnarBatch) {
+      columnarBatch.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
new file mode 100644
index 0000000..4e555d3
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.DataLoadProcessBuilder;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.parser.RowParser;
+import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
+import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Stream record writer
+ */
+public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName());
+
+  // basic info
+  private Configuration hadoopConf;
+  private CarbonLoadModel carbonLoadModel;
+  private CarbonDataLoadConfiguration configuration;
+  private CarbonTable carbonTable;
+  private int maxRowNums;
+  private int maxCacheSize;
+
+  // parser and converter
+  private RowParser rowParser;
+  private BadRecordsLogger badRecordLogger;
+  private RowConverter converter;
+  private CarbonRow currentRow = new CarbonRow(null);
+
+  // encoder
+  private DataField[] dataFields;
+  private BitSet nullBitSet;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private int dimensionWithComplexCount;
+  private int measureCount;
+  private DataType[] measureDataTypes;
+  private StreamBlockletWriter output = null;
+
+  // data write
+  private String segmentDir;
+  private String fileName;
+  private DataOutputStream outputStream;
+  private boolean isFirstRow = true;
+  private boolean hasException = false;
+
+  CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException {
+    initialize(job);
+  }
+
+  public CarbonStreamRecordWriter(TaskAttemptContext job, CarbonLoadModel carbonLoadModel)
+      throws IOException {
+    this.carbonLoadModel = carbonLoadModel;
+    initialize(job);
+  }
+
+  private void initialize(TaskAttemptContext job) throws IOException {
+    // set basic information
+    hadoopConf = job.getConfiguration();
+    if (carbonLoadModel == null) {
+      carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
+      if (carbonLoadModel == null) {
+        throw new IOException(
+            "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model");
+      }
+    }
+    String segmentId = CarbonStreamOutputFormat.getSegmentId(hadoopConf);
+    carbonLoadModel.setSegmentId(segmentId);
+    carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
+    long taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
+    carbonLoadModel.setTaskNo("" + taskNo);
+    configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel);
+    maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS,
+        CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1;
+    maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE,
+        CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT);
+
+    segmentDir = CarbonTablePath.getSegmentPath(
+        carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId);
+    fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
+  }
+
+  private void initializeAtFirstRow() throws IOException, InterruptedException {
+
+    // initialize metadata
+    isNoDictionaryDimensionColumn =
+        CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
+    dimensionWithComplexCount = configuration.getDimensionCount();
+    measureCount = configuration.getMeasureCount();
+    dataFields = configuration.getDataFields();
+    measureDataTypes = new DataType[measureCount];
+    for (int i = 0; i < measureCount; i++) {
+      measureDataTypes[i] =
+          dataFields[dimensionWithComplexCount + i].getColumn().getDataType();
+    }
+
+    // initialize parser and converter
+    rowParser = new RowParserImpl(dataFields, configuration);
+    badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
+    converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
+    configuration.setCardinalityFinder(converter);
+    converter.initialize();
+
+    // initialize encoder
+    nullBitSet = new BitSet(dataFields.length);
+    int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
+        CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
+    output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize);
+
+    // initialize data writer
+    String filePath = segmentDir + File.separator + fileName;
+    FileFactory.FileType fileType = FileFactory.getFileType(filePath);
+    CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType);
+    if (carbonFile.exists()) {
+      // if the file is existed, use the append api
+      outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, fileType);
+    } else {
+      // IF the file is not existed, use the create api
+      outputStream = FileFactory.getDataOutputStream(filePath, fileType);
+      writeFileHeader();
+    }
+
+    isFirstRow = false;
+  }
+
+  @Override public void write(Void key, Object value) throws IOException, InterruptedException {
+    if (isFirstRow) {
+      initializeAtFirstRow();
+    }
+
+    // parse and convert row
+    currentRow.setData(rowParser.parseRow((Object[]) value));
+    converter.convert(currentRow);
+
+    // null bit set
+    nullBitSet.clear();
+    for (int i = 0; i < dataFields.length; i++) {
+      if (null == currentRow.getObject(i)) {
+        nullBitSet.set(i);
+      }
+    }
+    output.nextRow();
+    byte[] b = nullBitSet.toByteArray();
+    output.writeShort(b.length);
+    if (b.length > 0) {
+      output.writeBytes(b);
+    }
+    int dimCount = 0;
+    Object columnValue;
+
+    // primitive type dimension
+    for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+      columnValue = currentRow.getObject(dimCount);
+      if (null != columnValue) {
+        if (isNoDictionaryDimensionColumn[dimCount]) {
+          byte[] col = (byte[]) columnValue;
+          output.writeShort(col.length);
+          output.writeBytes(col);
+        } else {
+          output.writeInt((int) columnValue);
+        }
+      }
+    }
+    // complex type dimension
+    for (; dimCount < dimensionWithComplexCount; dimCount++) {
+      columnValue = currentRow.getObject(dimCount);
+      if (null != columnValue) {
+        byte[] col = (byte[]) columnValue;
+        output.writeShort(col.length);
+        output.writeBytes(col);
+      }
+    }
+    // measure
+    DataType dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++) {
+      columnValue = currentRow.getObject(dimCount + msrCount);
+      if (null != columnValue) {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          output.writeBoolean((boolean) columnValue);
+        } else if (dataType == DataTypes.SHORT) {
+          output.writeShort((short) columnValue);
+        } else if (dataType == DataTypes.INT) {
+          output.writeInt((int) columnValue);
+        } else if (dataType == DataTypes.LONG) {
+          output.writeLong((long) columnValue);
+        } else if (dataType == DataTypes.DOUBLE) {
+          output.writeDouble((double) columnValue);
+        } else if (DataTypes.isDecimal(dataType)) {
+          BigDecimal val = (BigDecimal) columnValue;
+          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+          output.writeShort(bigDecimalInBytes.length);
+          output.writeBytes(bigDecimalInBytes);
+        } else {
+          String msg =
+              "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
+                  .getName();
+          LOGGER.error(msg);
+          throw new IOException(msg);
+        }
+      }
+    }
+
+    if (output.isFull()) {
+      appendBlockletToDataFile();
+    }
+  }
+
+  private void writeFileHeader() throws IOException {
+    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
+    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+    for (int i = 0; i < dimLensWithComplex.length; i++) {
+      dimLensWithComplex[i] = Integer.MAX_VALUE;
+    }
+    int[] dictionaryColumnCardinality =
+        CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
+    List<Integer> cardinality = new ArrayList<>();
+    List<org.apache.carbondata.format.ColumnSchema> columnSchemaList = AbstractFactDataWriter
+        .getColumnSchemaListAndCardinality(cardinality, dictionaryColumnCardinality,
+            wrapperColumnSchemaList);
+    FileHeader fileHeader =
+        CarbonMetadataUtil.getFileHeader(true, columnSchemaList, System.currentTimeMillis());
+    fileHeader.setIs_footer_present(false);
+    fileHeader.setIs_splitable(true);
+    fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
+    outputStream.write(CarbonUtil.getByteArray(fileHeader));
+  }
+
+  /**
+   * write a blocklet to file
+   */
+  private void appendBlockletToDataFile() throws IOException {
+    if (output.getRowIndex() == -1) {
+      return;
+    }
+    output.apppendBlocklet(outputStream);
+    outputStream.flush();
+    // reset data
+    output.reset();
+  }
+
+  @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+    try {
+      // append remain buffer data
+      if (!hasException && !isFirstRow) {
+        appendBlockletToDataFile();
+        converter.finish();
+      }
+    } finally {
+      // close resource
+      CarbonUtil.closeStreams(outputStream);
+      if (output != null) {
+        output.close();
+      }
+      if (badRecordLogger != null) {
+        badRecordLogger.closeStreams();
+      }
+    }
+  }
+
+  public String getSegmentDir() {
+    return segmentDir;
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  public void setHasException(boolean hasException) {
+    this.hasException = hasException;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
new file mode 100644
index 0000000..43fe6ed
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.BlockletHeader;
+
+/**
+ * stream blocklet reader
+ */
+public class StreamBlockletReader {
+
+  private byte[] buffer;
+  private int offset;
+  private final byte[] syncMarker;
+  private final byte[] syncBuffer;
+  private final int syncLen;
+  private long pos = 0;
+  private final InputStream in;
+  private final long limitStart;
+  private final long limitEnd;
+  private boolean isAlreadySync = false;
+  private Compressor compressor = CompressorFactory.getInstance().getCompressor();
+  private int rowNums = 0;
+  private int rowIndex = 0;
+  private boolean isHeaderPresent;
+
+  StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) {
+    this.syncMarker = syncMarker;
+    syncLen = syncMarker.length;
+    syncBuffer = new byte[syncLen];
+    this.in = in;
+    limitStart = limit;
+    limitEnd = limitStart + syncLen;
+    this.isHeaderPresent = isHeaderPresent;
+  }
+
+  private void ensureCapacity(int capacity) {
+    if (buffer == null || capacity > buffer.length) {
+      buffer = new byte[capacity];
+    }
+  }
+
+  /**
+   * find the first position of sync_marker in input stream
+   */
+  private boolean sync() throws IOException {
+    if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
+      return false;
+    }
+    boolean skipHeader = false;
+    for (int i = 0; i < limitStart; i++) {
+      int j = 0;
+      for (; j < syncLen; j++) {
+        if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break;
+      }
+      if (syncLen == j) {
+        if (isHeaderPresent) {
+          if (skipHeader) {
+            return true;
+          } else {
+            skipHeader = true;
+          }
+        } else {
+          return true;
+        }
+      }
+      int value = in.read();
+      if (-1 == value) {
+        return false;
+      }
+      syncBuffer[i % syncLen] = (byte) value;
+      pos++;
+    }
+    return false;
+  }
+
+  BlockletHeader readBlockletHeader() throws IOException {
+    int len = readIntFromStream();
+    byte[] b = new byte[len];
+    if (!readBytesFromStream(b, 0, len)) {
+      throw new EOFException("Failed to read blocklet header");
+    }
+    BlockletHeader header = CarbonUtil.readBlockletHeader(b);
+    rowNums = header.getBlocklet_info().getNum_rows();
+    rowIndex = 0;
+    return header;
+  }
+
+  void readBlockletData(BlockletHeader header) throws IOException {
+    ensureCapacity(header.getBlocklet_length());
+    offset = 0;
+    int len = readIntFromStream();
+    byte[] b = new byte[len];
+    if (!readBytesFromStream(b, 0, len)) {
+      throw new EOFException("Failed to read blocklet data");
+    }
+    compressor.rawUncompress(b, buffer);
+  }
+
+  void skipBlockletData(boolean reset) throws IOException {
+    int len = readIntFromStream();
+    skip(len);
+    pos += len;
+    if (reset) {
+      this.rowNums = 0;
+      this.rowIndex = 0;
+    }
+  }
+
+  private void skip(int len) throws IOException {
+    long remaining = len;
+    do {
+      long skipLen = in.skip(remaining);
+      remaining -= skipLen;
+    } while (remaining > 0);
+  }
+
+  /**
+   * find the next blocklet
+   */
+  boolean nextBlocklet() throws IOException {
+    if (pos >= limitStart) {
+      return false;
+    }
+    if (isAlreadySync) {
+      if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
+        return false;
+      }
+    } else {
+      isAlreadySync = true;
+      if (!sync()) {
+        return false;
+      }
+    }
+
+    return pos < limitEnd;
+  }
+
+  boolean hasNext() throws IOException {
+    return rowIndex < rowNums;
+  }
+
+  void nextRow() {
+    rowIndex++;
+  }
+
+  int readIntFromStream() throws IOException {
+    int ch1 = in.read();
+    int ch2 = in.read();
+    int ch3 = in.read();
+    int ch4 = in.read();
+    if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
+    pos += 4;
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+  }
+
+  /**
+   * Reads <code>len</code> bytes of data from the input stream into
+   * an array of bytes.
+   * @return <code>true</code> if reading data successfully, or
+   * <code>false</code> if there is no more data because the end of the stream has been reached.
+   */
+  boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException {
+    int readLen = in.read(b, offset, len);
+    if (readLen < 0) {
+      return false;
+    }
+    pos += readLen;
+    if (readLen < len) {
+      return readBytesFromStream(b, offset + readLen, len - readLen);
+    } else {
+      return true;
+    }
+  }
+
+  boolean readBoolean() {
+    return (buffer[offset++]) != 0;
+  }
+
+  short readShort() {
+    short v =  (short) ((buffer[offset + 1] & 255) +
+        ((buffer[offset]) << 8));
+    offset += 2;
+    return v;
+  }
+
+  byte[] copy(int len) {
+    byte[] b = new byte[len];
+    System.arraycopy(buffer, offset, b, 0, len);
+    return b;
+  }
+
+  int readInt() {
+    int v = ((buffer[offset + 3] & 255) +
+        ((buffer[offset + 2] & 255) << 8) +
+        ((buffer[offset + 1] & 255) << 16) +
+        ((buffer[offset]) << 24));
+    offset += 4;
+    return v;
+  }
+
+  long readLong() {
+    long v = ((long)(buffer[offset + 7] & 255)) +
+        ((long) (buffer[offset + 6] & 255) << 8) +
+        ((long) (buffer[offset + 5] & 255) << 16) +
+        ((long) (buffer[offset + 4] & 255) << 24) +
+        ((long) (buffer[offset + 3] & 255) << 32) +
+        ((long) (buffer[offset + 2] & 255) << 40) +
+        ((long) (buffer[offset + 1] & 255) << 48) +
+        ((long) (buffer[offset]) << 56);
+    offset += 8;
+    return v;
+  }
+
+  double readDouble() {
+    return Double.longBitsToDouble(readLong());
+  }
+
+  byte[] readBytes(int len) {
+    byte[] b = new byte[len];
+    System.arraycopy(buffer, offset, b, 0, len);
+    offset += len;
+    return b;
+  }
+
+  void skipBytes(int len) {
+    offset += len;
+  }
+
+  int getRowNums() {
+    return rowNums;
+  }
+
+  void close() {
+    CarbonUtil.closeStreams(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
new file mode 100644
index 0000000..509e2aa
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.BlockletInfo;
+import org.apache.carbondata.format.MutationType;
+
+/**
+ * stream blocklet writer
+ */
+public class StreamBlockletWriter {
+  private byte[] buffer;
+  private int maxSize;
+  private int maxRowNum;
+  private int rowSize;
+  private int count = 0;
+  private int rowIndex = -1;
+  private Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+  StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize) {
+    buffer = new byte[maxSize];
+    this.maxSize = maxSize;
+    this.maxRowNum = maxRowNum;
+    this.rowSize = rowSize;
+  }
+
+  private void ensureCapacity(int space) {
+    int newcount = space + count;
+    if (newcount > buffer.length) {
+      byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)];
+      System.arraycopy(buffer, 0, newbuf, 0, count);
+      buffer = newbuf;
+    }
+  }
+
+  void reset() {
+    count = 0;
+    rowIndex = -1;
+  }
+
+  byte[] getBytes() {
+    return buffer;
+  }
+
+  int getCount() {
+    return count;
+  }
+
+  int getRowIndex() {
+    return rowIndex;
+  }
+
+  void nextRow() {
+    rowIndex++;
+  }
+
+  boolean isFull() {
+    return rowIndex == maxRowNum || count >= maxSize;
+  }
+
+  void writeBoolean(boolean val) {
+    ensureCapacity(1);
+    buffer[count] = (byte) (val ? 1 : 0);
+    count += 1;
+  }
+
+  void writeShort(int val) {
+    ensureCapacity(2);
+    buffer[count + 1] = (byte) (val);
+    buffer[count] = (byte) (val >>> 8);
+    count += 2;
+  }
+
+  void writeInt(int val) {
+    ensureCapacity(4);
+    buffer[count + 3] = (byte) (val);
+    buffer[count + 2] = (byte) (val >>> 8);
+    buffer[count + 1] = (byte) (val >>> 16);
+    buffer[count] = (byte) (val >>> 24);
+    count += 4;
+  }
+
+  void writeLong(long val) {
+    ensureCapacity(8);
+    buffer[count + 7] = (byte) (val);
+    buffer[count + 6] = (byte) (val >>> 8);
+    buffer[count + 5] = (byte) (val >>> 16);
+    buffer[count + 4] = (byte) (val >>> 24);
+    buffer[count + 3] = (byte) (val >>> 32);
+    buffer[count + 2] = (byte) (val >>> 40);
+    buffer[count + 1] = (byte) (val >>> 48);
+    buffer[count] = (byte) (val >>> 56);
+    count += 8;
+  }
+
+  void writeDouble(double val) {
+    writeLong(Double.doubleToLongBits(val));
+  }
+
+  void writeBytes(byte[] b) {
+    writeBytes(b, 0, b.length);
+  }
+
+  void writeBytes(byte[] b, int off, int len) {
+    ensureCapacity(len);
+    System.arraycopy(b, off, buffer, count, len);
+    count += len;
+  }
+
+  void apppendBlocklet(DataOutputStream outputStream) throws IOException {
+    outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
+
+    BlockletInfo blockletInfo = new BlockletInfo();
+    blockletInfo.setNum_rows(getRowIndex() + 1);
+    BlockletHeader blockletHeader = new BlockletHeader();
+    blockletHeader.setBlocklet_length(getCount());
+    blockletHeader.setMutation(MutationType.INSERT);
+    blockletHeader.setBlocklet_info(blockletInfo);
+    byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader);
+    outputStream.writeInt(headerBytes.length);
+    outputStream.write(headerBytes);
+
+    byte[] compressed = compressor.compressByte(getBytes(), getCount());
+    outputStream.writeInt(compressed.length);
+    outputStream.write(compressed);
+  }
+
+  void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 8c9889d..9e83924 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -42,8 +42,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletIndex;
-import org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.streaming.CarbonStreamRecordWriter;
 
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
deleted file mode 100644
index 6d1fa45..0000000
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming
-
-import org.apache.spark.scheduler.SparkListener
-import org.apache.spark.scheduler.SparkListenerApplicationEnd
-
-class CarbonSparkStreamingListener extends SparkListener {
-
-  /**
-   * When Spark Streaming App stops, remove all locks for stream table.
-   */
-  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-    CarbonStreamSparkStreaming.cleanAllLockAfterStop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
deleted file mode 100644
index 4aa1517..0000000
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming
-
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink
-import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.Time
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-
-/**
- * Interface used to write stream data to stream table
- * when integrate with Spark Streaming.
- *
- * NOTE: Current integration with Spark Streaming is an alpha feature.
- */
-class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession,
-    val carbonTable: CarbonTable,
-    val configuration: Configuration) {
-
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  private var isInitialize: Boolean = false
-
-  private var lock: ICarbonLock = null
-  private var carbonAppendableStreamSink: Sink = null
-
-  /**
-   * Acquired the lock for stream table
-   */
-  def lockStreamTable(): Unit = {
-    lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-      LockUsage.STREAMING_LOCK)
-    if (lock.lockWithRetries()) {
-      LOGGER.info("Acquired the lock for stream table: " +
-                  carbonTable.getDatabaseName + "." +
-                  carbonTable.getTableName)
-    } else {
-      LOGGER.error("Not able to acquire the lock for stream table:" +
-                   carbonTable.getDatabaseName + "." + carbonTable.getTableName)
-      throw new InterruptedException(
-        "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." +
-        carbonTable.getTableName)
-    }
-  }
-
-  /**
-   * unlock for stream table
-   */
-  def unLockStreamTable(): Unit = {
-    if (null != lock) {
-      lock.unlock()
-      LOGGER.info("unlock for stream table: " +
-                  carbonTable.getDatabaseName + "." +
-                  carbonTable.getTableName)
-    }
-  }
-
-  def initialize(): Unit = {
-    carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink(
-      sparkSession,
-      configuration,
-      carbonTable,
-      extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink]
-
-    lockStreamTable()
-
-    isInitialize = true
-  }
-
-  def writeStreamData(dataFrame: DataFrame, time: Time): Unit = {
-    if (!isInitialize) {
-      initialize()
-    }
-    carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame)
-  }
-
-  private val extraOptions = new scala.collection.mutable.HashMap[String, String]
-  private var mode: SaveMode = SaveMode.ErrorIfExists
-
-  this.option("dbName", carbonTable.getDatabaseName)
-  this.option("tableName", carbonTable.getTableName)
-
-  /**
-   * Specifies the behavior when data or table already exists. Options include:
-   *   - `SaveMode.Overwrite`: overwrite the existing data.
-   *   - `SaveMode.Append`: append the data.
-   *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
-   *   - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
-   */
-  def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = {
-    if (mode == SaveMode.ErrorIfExists) {
-      mode = saveMode
-    }
-    this
-  }
-
-  /**
-   * Specifies the behavior when data or table already exists. Options include:
-   *   - `overwrite`: overwrite the existing data.
-   *   - `append`: append the data.
-   *   - `ignore`: ignore the operation (i.e. no-op).
-   *   - `error or default`: default option, throw an exception at runtime.
-   */
-  def mode(saveMode: String): CarbonStreamSparkStreamingWriter = {
-    if (mode == SaveMode.ErrorIfExists) {
-      mode = saveMode.toLowerCase(util.Locale.ROOT) match {
-        case "overwrite" => SaveMode.Overwrite
-        case "append" => SaveMode.Append
-        case "ignore" => SaveMode.Ignore
-        case "error" | "default" => SaveMode.ErrorIfExists
-        case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
-          "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.")
-      }
-    }
-    this
-  }
-
-  /**
-   * Adds an output option
-   */
-  def option(key: String, value: String): CarbonStreamSparkStreamingWriter = {
-    if (!extraOptions.contains(key)) {
-      extraOptions += (key -> value)
-    }
-    this
-  }
-
-  /**
-   * Adds an output option
-   */
-  def option(key: String, value: Boolean): CarbonStreamSparkStreamingWriter =
-    option(key, value.toString)
-
-  /**
-   * Adds an output option
-   */
-  def option(key: String, value: Long): CarbonStreamSparkStreamingWriter =
-    option(key, value.toString)
-
-  /**
-   * Adds an output option
-   */
-  def option(key: String, value: Double): CarbonStreamSparkStreamingWriter =
-    option(key, value.toString)
-}
-
-object CarbonStreamSparkStreaming {
-
-  @transient private val tableMap =
-    new util.HashMap[String, CarbonStreamSparkStreamingWriter]()
-
-  def getTableMap: util.Map[String, CarbonStreamSparkStreamingWriter] = tableMap
-
-  /**
-   * remove all stream lock.
-   */
-  def cleanAllLockAfterStop(): Unit = {
-    tableMap.asScala.values.foreach { writer => writer.unLockStreamTable() }
-    tableMap.clear()
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
deleted file mode 100644
index 4df04b9..0000000
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.Date
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.SparkSession
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.Segment
-import org.apache.carbondata.core.datastore.block.SegmentProperties
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
-import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
-import org.apache.carbondata.spark.rdd.CarbonRDD
-import org.apache.carbondata.spark.util.CommonUtil
-
-
-/**
- * partition of the handoff segment
- */
-class HandoffPartition(
-    val rddId: Int,
-    val idx: Int,
-    @transient val inputSplit: CarbonInputSplit
-) extends Partition {
-
-  val split = new SerializableWritable[CarbonInputSplit](inputSplit)
-
-  override val index: Int = idx
-
-  override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-/**
- * package the record reader of the handoff segment to RawResultIterator
- */
-class StreamingRawResultIterator(
-    recordReader: CarbonStreamRecordReader
-) extends RawResultIterator(null, null, null) {
-
-  override def hasNext: Boolean = {
-    recordReader.nextKeyValue()
-  }
-
-  override def next(): Array[Object] = {
-    val rowTmp = recordReader
-      .getCurrentValue
-      .asInstanceOf[GenericInternalRow]
-      .values
-      .asInstanceOf[Array[Object]]
-    val row = new Array[Object](rowTmp.length)
-    System.arraycopy(rowTmp, 0, row, 0, rowTmp.length)
-    row
-  }
-}
-
-/**
- * execute streaming segment handoff
- */
-class StreamHandoffRDD[K, V](
-    sc: SparkContext,
-    result: HandoffResult[K, V],
-    carbonLoadModel: CarbonLoadModel,
-    handOffSegmentId: String
-) extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
-
-  private val jobTrackerId: String = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    formatter.format(new Date())
-  }
-
-  override def internalCompute(
-      split: Partition,
-      context: TaskContext
-  ): Iterator[(K, V)] = {
-    carbonLoadModel.setTaskNo("" + split.index)
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    CarbonMetadata.getInstance().addCarbonTable(carbonTable)
-    // the input iterator is using raw row
-    val iteratorList = prepareInputIterator(split, carbonTable)
-
-    CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, true, false)
-    // use CompactionResultSortProcessor to sort data dan write to columnar files
-    val processor = prepareHandoffProcessor(carbonTable)
-    val status = processor.execute(iteratorList)
-
-    new Iterator[(K, V)] {
-      private var finished = false
-
-      override def hasNext: Boolean = {
-        !finished
-      }
-
-      override def next(): (K, V) = {
-        finished = true
-        result.getKey("" + split.index, status)
-      }
-    }
-  }
-
-  /**
-   * prepare input iterator by basing CarbonStreamRecordReader
-   */
-  private def prepareInputIterator(
-      split: Partition,
-      carbonTable: CarbonTable
-  ): util.ArrayList[RawResultIterator] = {
-    val inputSplit = split.asInstanceOf[HandoffPartition].split.value
-    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
-    val hadoopConf = new Configuration()
-    CarbonInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
-    CarbonInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
-    CarbonInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
-    val projection = new CarbonProjection
-    val dataFields = carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName)
-    (0 until dataFields.size()).foreach { index =>
-      projection.addColumn(dataFields.get(index).getColName)
-    }
-    CarbonInputFormat.setColumnProjection(hadoopConf, projection)
-    CarbonInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo)
-    val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
-    val format = new CarbonTableInputFormat[Array[Object]]()
-    val model = format.createQueryModel(inputSplit, attemptContext)
-    val inputFormat = new CarbonStreamInputFormat
-    val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
-      .asInstanceOf[CarbonStreamRecordReader]
-    streamReader.setVectorReader(false)
-    streamReader.setQueryModel(model)
-    streamReader.setUseRawRow(true)
-    streamReader.initialize(inputSplit, attemptContext)
-    val iteratorList = new util.ArrayList[RawResultIterator](1)
-    iteratorList.add(new StreamingRawResultIterator(streamReader))
-    iteratorList
-  }
-
-  private def prepareHandoffProcessor(
-      carbonTable: CarbonTable
-  ): CompactionResultSortProcessor = {
-    val wrapperColumnSchemaList = CarbonUtil.getColumnSchemaList(
-      carbonTable.getDimensionByTableName(carbonTable.getTableName),
-      carbonTable.getMeasureByTableName(carbonTable.getTableName))
-    val dimLensWithComplex =
-      (0 until wrapperColumnSchemaList.size()).map(_ => Integer.MAX_VALUE).toArray
-    val dictionaryColumnCardinality =
-      CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList)
-    val segmentProperties =
-      new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality)
-
-    new CompactionResultSortProcessor(
-      carbonLoadModel,
-      carbonTable,
-      segmentProperties,
-      CompactionType.STREAMING,
-      carbonTable.getTableName,
-      null
-    )
-  }
-
-  /**
-   * get the partitions of the handoff segment
-   */
-  override protected def getPartitions: Array[Partition] = {
-    val job = Job.getInstance(FileFactory.getConfiguration)
-    val inputFormat = new CarbonTableInputFormat[Array[Object]]()
-    val segmentList = new util.ArrayList[Segment](1)
-    segmentList.add(Segment.toSegment(handOffSegmentId))
-    val splits = inputFormat.getSplitsOfStreaming(
-      job,
-      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,
-      segmentList
-    )
-
-    (0 until splits.size()).map { index =>
-      new HandoffPartition(id, index, splits.get(index).asInstanceOf[CarbonInputSplit])
-    }.toArray[Partition]
-  }
-}
-
-object StreamHandoffRDD {
-
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def iterateStreamingHandoff(
-      carbonLoadModel: CarbonLoadModel,
-      sparkSession: SparkSession
-  ): Unit = {
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val identifier = carbonTable.getAbsoluteTableIdentifier
-    var continueHandoff = false
-    // require handoff lock on table
-    val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
-    try {
-      if (lock.lockWithRetries()) {
-        LOGGER.info("Acquired the handoff lock for table" +
-                    s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
-        // handoff streaming segment one by one
-        do {
-          val segmentStatusManager = new SegmentStatusManager(identifier)
-          var loadMetadataDetails: Array[LoadMetadataDetails] = null
-          // lock table to read table status file
-          val statusLock = segmentStatusManager.getTableStatusLock
-          try {
-            if (statusLock.lockWithRetries()) {
-              loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
-                CarbonTablePath.getMetadataPath(identifier.getTablePath))
-            }
-          } finally {
-            if (null != statusLock) {
-              statusLock.unlock()
-            }
-          }
-          if (null != loadMetadataDetails) {
-            val streamSegments =
-              loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH)
-
-            continueHandoff = streamSegments.length > 0
-            if (continueHandoff) {
-              // handoff a streaming segment
-              val loadMetadataDetail = streamSegments(0)
-              executeStreamingHandoff(
-                carbonLoadModel,
-                sparkSession,
-                loadMetadataDetail.getLoadName
-              )
-            }
-          } else {
-            continueHandoff = false
-          }
-        } while (continueHandoff)
-      }
-    } finally {
-      if (null != lock) {
-        lock.unlock()
-      }
-    }
-  }
-
-  /**
-   * start new thread to execute stream segment handoff
-   */
-  def startStreamingHandoffThread(
-      carbonLoadModel: CarbonLoadModel,
-      sparkSession: SparkSession,
-      isDDL: Boolean
-  ): Unit = {
-    if (isDDL) {
-      iterateStreamingHandoff(carbonLoadModel, sparkSession)
-    } else {
-      // start a new thread to execute streaming segment handoff
-      val handoffThread = new Thread() {
-        override def run(): Unit = {
-          iterateStreamingHandoff(carbonLoadModel, sparkSession)
-        }
-      }
-      handoffThread.start()
-    }
-  }
-
-  /**
-   * invoke StreamHandoffRDD to handoff a streaming segment to a columnar segment
-   */
-  def executeStreamingHandoff(
-      carbonLoadModel: CarbonLoadModel,
-      sparkSession: SparkSession,
-      handoffSegmenId: String
-  ): Unit = {
-    var loadStatus = SegmentStatus.SUCCESS
-    var errorMessage: String = "Handoff failure"
-    try {
-      // generate new columnar segment
-      val newMetaEntry = new LoadMetadataDetails
-      carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-      CarbonLoaderUtil.populateNewLoadMetaEntry(
-        newMetaEntry,
-        SegmentStatus.INSERT_IN_PROGRESS,
-        carbonLoadModel.getFactTimeStamp,
-        false)
-      val operationContext = new OperationContext()
-      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
-        new LoadTablePreStatusUpdateEvent(
-          carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
-          carbonLoadModel)
-      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
-
-      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
-      val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
-        new LoadTablePostStatusUpdateEvent(carbonLoadModel)
-      OperationListenerBus.getInstance()
-        .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
-      // convert a streaming segment to columnar segment
-      val status = new StreamHandoffRDD(
-        sparkSession.sparkContext,
-        new HandoffResultImpl(),
-        carbonLoadModel,
-        handoffSegmenId).collect()
-
-      status.foreach { x =>
-        if (!x._2) {
-          loadStatus = SegmentStatus.LOAD_FAILURE
-        }
-      }
-    } catch {
-      case ex: Exception =>
-        loadStatus = SegmentStatus.LOAD_FAILURE
-        errorMessage = errorMessage + ": " + ex.getCause.getMessage
-        LOGGER.error(errorMessage)
-        LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId")
-    }
-
-    if (loadStatus == SegmentStatus.LOAD_FAILURE) {
-      CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
-      LOGGER.info("********starting clean up**********")
-      CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
-      LOGGER.info("********clean up done**********")
-      LOGGER.audit(s"Handoff is failed for " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-      LOGGER.error("Cannot write load metadata file as handoff failed")
-      throw new Exception(errorMessage)
-    }
-
-    if (loadStatus == SegmentStatus.SUCCESS) {
-      val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel)
-      if (!done) {
-        val errorMessage = "Handoff failed due to failure in table status updation."
-        LOGGER.audit("Handoff is failed for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        LOGGER.error("Handoff failed due to failure in table status updation.")
-        throw new Exception(errorMessage)
-      }
-      done
-    }
-
-  }
-
-  /**
-   * update streaming segment and new columnar segment
-   */
-  private def updateLoadMetadata(
-      handoffSegmentId: String,
-      loadModel: CarbonLoadModel
-  ): Boolean = {
-    var status = false
-    val metaDataFilepath = loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
-    val identifier = loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier
-    val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath)
-    val fileType = FileFactory.getFileType(metadataPath)
-    if (!FileFactory.isFileExist(metadataPath, fileType)) {
-      FileFactory.mkdirs(metadataPath, fileType)
-    }
-    val tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath)
-    val segmentStatusManager = new SegmentStatusManager(identifier)
-    val carbonLock = segmentStatusManager.getTableStatusLock
-    try {
-      if (carbonLock.lockWithRetries()) {
-        LOGGER.info(
-          "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
-          + " for table status updation")
-        val listOfLoadFolderDetailsArray =
-          SegmentStatusManager.readLoadMetadata(metaDataFilepath)
-
-        // update new columnar segment to success status
-        val newSegment =
-          listOfLoadFolderDetailsArray.find(_.getLoadName.equals(loadModel.getSegmentId))
-        if (newSegment.isEmpty) {
-          throw new Exception("Failed to update table status for new segment")
-        } else {
-          newSegment.get.setSegmentStatus(SegmentStatus.SUCCESS)
-          newSegment.get.setLoadEndTime(System.currentTimeMillis())
-        }
-
-        // update streaming segment to compacted status
-        val streamSegment =
-          listOfLoadFolderDetailsArray.find(_.getLoadName.equals(handoffSegmentId))
-        if (streamSegment.isEmpty) {
-          throw new Exception("Failed to update table status for streaming segment")
-        } else {
-          streamSegment.get.setSegmentStatus(SegmentStatus.COMPACTED)
-          streamSegment.get.setMergedLoadName(loadModel.getSegmentId)
-        }
-
-        // refresh table status file
-        SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray)
-        status = true
-      } else {
-        LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
-          .getDatabaseName() + "." + loadModel.getTableName())
-      }
-    } finally {
-      if (carbonLock.unlock()) {
-        LOGGER.info("Table unlocked successfully after table status updation" +
-                    loadModel.getDatabaseName() + "." + loadModel.getTableName())
-      } else {
-        LOGGER.error("Unable to unlock Table lock for table" + loadModel.getDatabaseName() +
-                     "." + loadModel.getTableName() + " during table status updation")
-      }
-    }
-    status
-  }
-}