You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/28 03:19:46 UTC

[5/5] carbondata git commit: [CARBONDATA-2165]Remove spark in carbon-hadoop module

[CARBONDATA-2165]Remove spark in carbon-hadoop module

1. Streaming relation RecordReader is moved to carbon-streaming module.
2. RDD related class is moved to carbon-spark2 module

This closes #2074


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c723947a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c723947a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c723947a

Branch: refs/heads/master
Commit: c723947a79332c66175f5a33cf57f08fe70fe1a9
Parents: 2e1ddb5
Author: Jacky Li <ja...@qq.com>
Authored: Sat Mar 17 18:13:08 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 28 11:19:23 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/util/DataTypeConverter.java |   3 +
 .../core/util/DataTypeConverterImpl.java        |   5 +
 hadoop/CARBON_HADOOPLogResource.properties      |  18 -
 hadoop/pom.xml                                  |   7 -
 .../readsupport/impl/RawDataReadSupport.java    |  42 -
 .../streaming/CarbonStreamInputFormat.java      | 115 ---
 .../streaming/CarbonStreamOutputFormat.java     |  87 ---
 .../streaming/CarbonStreamRecordReader.java     | 759 ------------------
 .../streaming/CarbonStreamRecordWriter.java     | 325 --------
 .../hadoop/streaming/StreamBlockletReader.java  | 259 -------
 .../hadoop/streaming/StreamBlockletWriter.java  | 152 ----
 .../hadoop/testutil/StoreCreator.java           | 495 ++++++++++++
 .../carbondata/hadoop/util/CarbonTypeUtil.java  | 101 ---
 .../hadoop/ft/CarbonTableInputFormatTest.java   |   6 +-
 .../hadoop/ft/CarbonTableOutputFormatTest.java  |   2 +-
 .../streaming/CarbonStreamInputFormatTest.java  |  99 ---
 .../streaming/CarbonStreamOutputFormatTest.java | 121 ---
 .../hadoop/test/util/StoreCreator.java          | 492 ------------
 integration/spark-common/pom.xml                |   2 +-
 .../spark/util/SparkDataTypeConverterImpl.java  |  81 ++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   3 +-
 .../carbondata/spark/rdd/StreamHandoffRDD.scala | 435 +++++++++++
 .../carbondata/spark/util/CarbonScalaUtil.scala |  52 +-
 .../CarbonSparkStreamingListener.scala          |  30 +
 .../streaming/CarbonStreamSparkStreaming.scala  | 184 +++++
 .../CarbonStreamingQueryListener.scala          |  77 ++
 .../streaming/StreamSinkFactory.scala           | 236 ++++++
 .../streaming/CarbonAppendableStreamSink.scala  | 362 +++++++++
 .../spark/sql/test/TestQueryExecutor.scala      |   4 +-
 integration/spark2/pom.xml                      |   2 +-
 .../org/apache/spark/sql/CarbonSession.scala    |   2 +-
 .../CarbonAlterTableCompactionCommand.scala     |   3 +-
 streaming/pom.xml                               |   9 +-
 .../streaming/CarbonStreamInputFormat.java      | 115 +++
 .../streaming/CarbonStreamOutputFormat.java     |  87 +++
 .../streaming/CarbonStreamRecordReader.java     | 761 +++++++++++++++++++
 .../streaming/CarbonStreamRecordWriter.java     | 325 ++++++++
 .../streaming/StreamBlockletReader.java         | 259 +++++++
 .../streaming/StreamBlockletWriter.java         | 152 ++++
 .../streaming/segment/StreamSegment.java        |   2 +-
 .../CarbonSparkStreamingListener.scala          |  31 -
 .../streaming/CarbonStreamSparkStreaming.scala  | 187 -----
 .../carbondata/streaming/StreamHandoffRDD.scala | 436 -----------
 .../streaming/StreamSinkFactory.scala           | 236 ------
 .../streaming/parser/FieldConverter.scala       |  95 +++
 .../streaming/parser/RowStreamParserImp.scala   |   7 +-
 .../streaming/CarbonAppendableStreamSink.scala  | 362 ---------
 .../CarbonStreamingQueryListener.scala          |  77 --
 .../streaming/CarbonStreamInputFormatTest.java  |  99 +++
 .../streaming/CarbonStreamOutputFormatTest.java | 121 +++
 50 files changed, 3948 insertions(+), 3974 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
index 7c63860..474493a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.util;
 
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+
 public interface DataTypeConverter {
 
   Object convertFromStringToDecimal(Object data);
@@ -31,4 +33,5 @@ public interface DataTypeConverter {
   Object wrapWithGenericArrayData(Object data);
   Object wrapWithGenericRow(Object[] fields);
 
+  Object[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
index ea5740d..a4f571e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.math.BigDecimal;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 
 public class DataTypeConverterImpl implements DataTypeConverter, Serializable {
 
@@ -91,4 +92,8 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable {
     return fields;
   }
 
+  @Override
+  public Object[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) {
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/CARBON_HADOOPLogResource.properties
----------------------------------------------------------------------
diff --git a/hadoop/CARBON_HADOOPLogResource.properties b/hadoop/CARBON_HADOOPLogResource.properties
deleted file mode 100644
index 135a578..0000000
--- a/hadoop/CARBON_HADOOPLogResource.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-#
-carbon.hadoop = {0}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop/pom.xml b/hadoop/pom.xml
index 916b9db..41e2822 100644
--- a/hadoop/pom.xml
+++ b/hadoop/pom.xml
@@ -40,10 +40,6 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -55,9 +51,6 @@
     <resources>
       <resource>
         <directory>.</directory>
-        <includes>
-          <include>CARBON_HADOOPLogResource.properties</include>
-        </includes>
       </resource>
     </resources>
     <plugins>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
deleted file mode 100644
index b2cd450..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.readsupport.impl;
-
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-
-public class RawDataReadSupport implements CarbonReadSupport<InternalRow> {
-
-  @Override
-  public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable) { }
-
-  /**
-   * return column data as InternalRow
-   *
-   * @param data column data
-   */
-  @Override
-  public InternalRow readRow(Object[] data) {
-    return new GenericInternalRow(data);
-  }
-
-  @Override public void close() { }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
deleted file mode 100644
index a6e9563..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
-import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
-import org.apache.carbondata.core.scan.complextypes.StructQueryType;
-import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * Stream input format
- */
-public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
-
-  public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size";
-  public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
-
-  @Override public RecordReader<Void, Object> createRecordReader(InputSplit split,
-      TaskAttemptContext context) throws IOException, InterruptedException {
-    return new CarbonStreamRecordReader();
-  }
-
-  public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable,
-      CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache)
-      throws IOException {
-    GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length];
-    for (int i = 0; i < carbonColumns.length; i++) {
-      if (carbonColumns[i].isComplex()) {
-        if (DataTypes.isArrayType(carbonColumns[i].getDataType())) {
-          queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(),
-              carbonColumns[i].getColName(), i);
-        } else if (DataTypes.isStructType(carbonColumns[i].getDataType())) {
-          queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(),
-              carbonColumns[i].getColName(), i);
-        } else {
-          throw new UnsupportedOperationException(
-              carbonColumns[i].getDataType().getName() + " is not supported");
-        }
-
-        fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache);
-      }
-    }
-
-    return queryTypes;
-  }
-
-  private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType,
-      CarbonDimension dimension, int parentBlockIndex,
-      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException {
-    for (int i = 0; i < dimension.getNumberOfChild(); i++) {
-      CarbonDimension child = dimension.getListOfChildDimensions().get(i);
-      DataType dataType = child.getDataType();
-      GenericQueryType queryType = null;
-      if (DataTypes.isArrayType(dataType)) {
-        queryType =
-            new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
-
-      } else if (DataTypes.isStructType(dataType)) {
-        queryType =
-            new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
-        parentQueryType.addChildren(queryType);
-      } else {
-        boolean isDirectDictionary =
-            CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
-        String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties()
-            .get(CarbonCommonConstants.DICTIONARY_PATH);
-        DictionaryColumnUniqueIdentifier dictionarIdentifier =
-            new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
-                child.getColumnIdentifier(), child.getDataType(), dictionaryPath);
-
-        queryType =
-            new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex,
-                child.getDataType(), 4, cache.get(dictionarIdentifier),
-                isDirectDictionary);
-      }
-      parentQueryType.addChildren(queryType);
-      if (child.getNumberOfChild() > 0) {
-        fillChildren(carbontable, queryType, child, parentBlockIndex, cache);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
deleted file mode 100644
index 2599fa7..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-/**
- * Stream output format
- */
-public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> {
-
-  static final byte[] CARBON_SYNC_MARKER =
-      "@carbondata_sync".getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
-
-  public static final String CARBON_ENCODER_ROW_BUFFER_SIZE = "carbon.stream.row.buffer.size";
-
-  public static final int CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT = 1024;
-
-  public static final String CARBON_STREAM_BLOCKLET_ROW_NUMS = "carbon.stream.blocklet.row.nums";
-
-  public static final int CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT = 32000;
-
-  public static final String CARBON_STREAM_CACHE_SIZE = "carbon.stream.cache.size";
-
-  public static final int CARBON_STREAM_CACHE_SIZE_DEFAULT = 32 * 1024 * 1024;
-
-  private static final String LOAD_Model = "mapreduce.output.carbon.load.model";
-
-  private static final String SEGMENT_ID = "carbon.segment.id";
-
-  @Override public RecordWriter<Void, Object> getRecordWriter(TaskAttemptContext job)
-      throws IOException, InterruptedException {
-    return new CarbonStreamRecordWriter(job);
-  }
-
-  public static void setCarbonLoadModel(Configuration hadoopConf, CarbonLoadModel carbonLoadModel)
-      throws IOException {
-    if (carbonLoadModel != null) {
-      hadoopConf.set(LOAD_Model, ObjectSerializationUtil.convertObjectToString(carbonLoadModel));
-    }
-  }
-
-  public static CarbonLoadModel getCarbonLoadModel(Configuration hadoopConf) throws IOException {
-    String value = hadoopConf.get(LOAD_Model);
-    if (value == null) {
-      return null;
-    } else {
-      return (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(value);
-    }
-  }
-
-  public static void setSegmentId(Configuration hadoopConf, String segmentId) throws IOException {
-    if (segmentId != null) {
-      hadoopConf.set(SEGMENT_ID, segmentId);
-    }
-  }
-
-  public static String getSegmentId(Configuration hadoopConf) throws IOException {
-    return hadoopConf.get(SEGMENT_ID);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
deleted file mode 100644
index 1e227c4..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ /dev/null
@@ -1,759 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.reader.CarbonHeaderReader;
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.intf.RowImpl;
-import org.apache.carbondata.core.scan.filter.intf.RowIntf;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.format.BlockletHeader;
-import org.apache.carbondata.format.FileHeader;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.InputMetricsStats;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.hadoop.util.CarbonTypeUtil;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Stream record reader
- */
-public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
-  // vector reader
-  private boolean isVectorReader;
-
-  // metadata
-  private CarbonTable carbonTable;
-  private CarbonColumn[] storageColumns;
-  private boolean[] isRequired;
-  private DataType[] measureDataTypes;
-  private int dimensionCount;
-  private int measureCount;
-
-  // input
-  private FileSplit fileSplit;
-  private Configuration hadoopConf;
-  private StreamBlockletReader input;
-  private boolean isFirstRow = true;
-  private QueryModel model;
-
-  // decode data
-  private BitSet allNonNull;
-  private boolean[] isNoDictColumn;
-  private DirectDictionaryGenerator[] directDictionaryGenerators;
-  private CacheProvider cacheProvider;
-  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
-  private GenericQueryType[] queryTypes;
-
-  // vectorized reader
-  private StructType outputSchema;
-  private ColumnarBatch columnarBatch;
-  private boolean isFinished = false;
-
-  // filter
-  private FilterExecuter filter;
-  private boolean[] isFilterRequired;
-  private Object[] filterValues;
-  private RowIntf filterRow;
-  private int[] filterMap;
-
-  // output
-  private CarbonColumn[] projection;
-  private boolean[] isProjectionRequired;
-  private int[] projectionMap;
-  private Object[] outputValues;
-  private InternalRow outputRow;
-
-  // empty project, null filter
-  private boolean skipScanData;
-
-  // return raw row for handoff
-  private boolean useRawRow = false;
-
-  // InputMetricsStats
-  private InputMetricsStats inputMetricsStats;
-
-  @Override public void initialize(InputSplit split, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    // input
-    if (split instanceof CarbonInputSplit) {
-      fileSplit = (CarbonInputSplit) split;
-    } else if (split instanceof CarbonMultiBlockSplit) {
-      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
-    } else {
-      fileSplit = (FileSplit) split;
-    }
-
-    // metadata
-    hadoopConf = context.getConfiguration();
-    if (model == null) {
-      CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
-      model = format.createQueryModel(split, context);
-    }
-    carbonTable = model.getTable();
-    List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getTableName());
-    dimensionCount = dimensions.size();
-    List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getTableName());
-    measureCount = measures.size();
-    List<CarbonColumn> carbonColumnList =
-        carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
-    storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
-    isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
-    directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
-    for (int i = 0; i < storageColumns.length; i++) {
-      if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
-            .getDirectDictionaryGenerator(storageColumns[i].getDataType());
-      }
-    }
-    measureDataTypes = new DataType[measureCount];
-    for (int i = 0; i < measureCount; i++) {
-      measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
-    }
-
-    // decode data
-    allNonNull = new BitSet(storageColumns.length);
-    projection = model.getProjectionColumns();
-
-    isRequired = new boolean[storageColumns.length];
-    boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
-    boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
-    isFilterRequired = new boolean[storageColumns.length];
-    filterMap = new int[storageColumns.length];
-    for (int i = 0; i < storageColumns.length; i++) {
-      if (storageColumns[i].isDimension()) {
-        if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
-          isRequired[i] = true;
-          isFilterRequired[i] = true;
-          filterMap[i] = storageColumns[i].getOrdinal();
-        }
-      } else {
-        if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
-          isRequired[i] = true;
-          isFilterRequired[i] = true;
-          filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
-        }
-      }
-    }
-
-    isProjectionRequired = new boolean[storageColumns.length];
-    projectionMap = new int[storageColumns.length];
-    for (int i = 0; i < storageColumns.length; i++) {
-      for (int j = 0; j < projection.length; j++) {
-        if (storageColumns[i].getColName().equals(projection[j].getColName())) {
-          isRequired[i] = true;
-          isProjectionRequired[i] = true;
-          projectionMap[i] = j;
-          break;
-        }
-      }
-    }
-
-    // initialize filter
-    if (null != model.getFilterExpressionResolverTree()) {
-      initializeFilter();
-    } else if (projection.length == 0) {
-      skipScanData = true;
-    }
-
-  }
-
-  private void initializeFilter() {
-
-    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
-        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
-            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
-    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
-    for (int i = 0; i < dimLensWithComplex.length; i++) {
-      dimLensWithComplex[i] = Integer.MAX_VALUE;
-    }
-
-    int[] dictionaryColumnCardinality =
-        CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
-    SegmentProperties segmentProperties =
-        new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
-    Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
-
-    FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
-    filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
-        complexDimensionInfoMap);
-    // for row filter, we need update column index
-    FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
-        carbonTable.getDimensionOrdinalMax());
-
-  }
-
-  public void setQueryModel(QueryModel model) {
-    this.model = model;
-  }
-
-  private byte[] getSyncMarker(String filePath) throws IOException {
-    CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
-    FileHeader header = headerReader.readHeader();
-    return header.getSync_marker();
-  }
-
-  public void setUseRawRow(boolean useRawRow) {
-    this.useRawRow = useRawRow;
-  }
-
-  private void initializeAtFirstRow() throws IOException {
-    filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
-    filterRow = new RowImpl();
-    filterRow.setValues(filterValues);
-
-    outputValues = new Object[projection.length];
-    outputRow = new GenericInternalRow(outputValues);
-
-    Path file = fileSplit.getPath();
-
-    byte[] syncMarker = getSyncMarker(file.toString());
-
-    FileSystem fs = file.getFileSystem(hadoopConf);
-
-    int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
-        CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
-
-    FSDataInputStream fileIn = fs.open(file, bufferSize);
-    fileIn.seek(fileSplit.getStart());
-    input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
-        fileSplit.getStart() == 0);
-
-    cacheProvider = CacheProvider.getInstance();
-    cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
-    queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
-
-    outputSchema = new StructType(CarbonTypeUtil.convertCarbonSchemaToSparkSchema(projection));
-  }
-
-  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (isFirstRow) {
-      isFirstRow = false;
-      initializeAtFirstRow();
-    }
-    if (isFinished) {
-      return false;
-    }
-
-    if (isVectorReader) {
-      return nextColumnarBatch();
-    }
-
-    return nextRow();
-  }
-
-  /**
-   * for vector reader, check next columnar batch
-   */
-  private boolean nextColumnarBatch() throws IOException {
-    boolean hasNext;
-    boolean scanMore = false;
-    do {
-      // move to the next blocklet
-      hasNext = input.nextBlocklet();
-      if (hasNext) {
-        // read blocklet header
-        BlockletHeader header = input.readBlockletHeader();
-        if (isScanRequired(header)) {
-          scanMore = !scanBlockletAndFillVector(header);
-        } else {
-          input.skipBlockletData(true);
-          scanMore = true;
-        }
-      } else {
-        isFinished = true;
-        scanMore = false;
-      }
-    } while (scanMore);
-    return hasNext;
-  }
-
-  /**
-   * check next Row
-   */
-  private boolean nextRow() throws IOException {
-    // read row one by one
-    try {
-      boolean hasNext;
-      boolean scanMore = false;
-      do {
-        hasNext = input.hasNext();
-        if (hasNext) {
-          if (skipScanData) {
-            input.nextRow();
-            scanMore = false;
-          } else {
-            if (useRawRow) {
-              // read raw row for streaming handoff which does not require decode raw row
-              readRawRowFromStream();
-            } else {
-              readRowFromStream();
-            }
-            if (null != filter) {
-              scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
-            } else {
-              scanMore = false;
-            }
-          }
-        } else {
-          if (input.nextBlocklet()) {
-            BlockletHeader header = input.readBlockletHeader();
-            if (isScanRequired(header)) {
-              if (skipScanData) {
-                input.skipBlockletData(false);
-              } else {
-                input.readBlockletData(header);
-              }
-            } else {
-              input.skipBlockletData(true);
-            }
-            scanMore = true;
-          } else {
-            isFinished = true;
-            scanMore = false;
-          }
-        }
-      } while (scanMore);
-      return hasNext;
-    } catch (FilterUnsupportedException e) {
-      throw new IOException("Failed to filter row in detail reader", e);
-    }
-  }
-
-  @Override public Void getCurrentKey() throws IOException, InterruptedException {
-    return null;
-  }
-
-  @Override public Object getCurrentValue() throws IOException, InterruptedException {
-    if (isVectorReader) {
-      int value = columnarBatch.numValidRows();
-      if (inputMetricsStats != null) {
-        inputMetricsStats.incrementRecordRead((long) value);
-      }
-
-      return columnarBatch;
-    }
-
-    if (inputMetricsStats != null) {
-      inputMetricsStats.incrementRecordRead(1L);
-    }
-
-    return outputRow;
-  }
-
-  private boolean isScanRequired(BlockletHeader header) {
-    // TODO require to implement min-max index
-    if (null == filter) {
-      return true;
-    }
-    return true;
-  }
-
-  private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
-    // if filter is null and output projection is empty, use the row number of blocklet header
-    if (skipScanData) {
-      int rowNums = header.getBlocklet_info().getNum_rows();
-      columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums);
-      columnarBatch.setNumRows(rowNums);
-      input.skipBlockletData(true);
-      return rowNums > 0;
-    }
-
-    input.readBlockletData(header);
-    columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums());
-    int rowNum = 0;
-    if (null == filter) {
-      while (input.hasNext()) {
-        readRowFromStream();
-        putRowToColumnBatch(rowNum++);
-      }
-    } else {
-      try {
-        while (input.hasNext()) {
-          readRowFromStream();
-          if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
-            putRowToColumnBatch(rowNum++);
-          }
-        }
-      } catch (FilterUnsupportedException e) {
-        throw new IOException("Failed to filter row in vector reader", e);
-      }
-    }
-    columnarBatch.setNumRows(rowNum);
-    return rowNum > 0;
-  }
-
-  private void readRowFromStream() {
-    input.nextRow();
-    short nullLen = input.readShort();
-    BitSet nullBitSet = allNonNull;
-    if (nullLen > 0) {
-      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
-    }
-    int colCount = 0;
-    // primitive type dimension
-    for (; colCount < isNoDictColumn.length; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        if (isFilterRequired[colCount]) {
-          filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
-        }
-        if (isProjectionRequired[colCount]) {
-          outputValues[projectionMap[colCount]] = null;
-        }
-      } else {
-        if (isNoDictColumn[colCount]) {
-          int v = input.readShort();
-          if (isRequired[colCount]) {
-            byte[] b = input.readBytes(v);
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = b;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] =
-                  DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b,
-                      storageColumns[colCount].getDataType());
-            }
-          } else {
-            input.skipBytes(v);
-          }
-        } else if (null != directDictionaryGenerators[colCount]) {
-          if (isRequired[colCount]) {
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = input.copy(4);
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] =
-                  directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
-            } else {
-              input.skipBytes(4);
-            }
-          } else {
-            input.skipBytes(4);
-          }
-        } else {
-          if (isRequired[colCount]) {
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = input.copy(4);
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = input.readInt();
-            } else {
-              input.skipBytes(4);
-            }
-          } else {
-            input.skipBytes(4);
-          }
-        }
-      }
-    }
-    // complex type dimension
-    for (; colCount < dimensionCount; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        if (isFilterRequired[colCount]) {
-          filterValues[filterMap[colCount]] = null;
-        }
-        if (isProjectionRequired[colCount]) {
-          outputValues[projectionMap[colCount]] = null;
-        }
-      } else {
-        short v = input.readShort();
-        if (isRequired[colCount]) {
-          byte[] b = input.readBytes(v);
-          if (isFilterRequired[colCount]) {
-            filterValues[filterMap[colCount]] = b;
-          }
-          if (isProjectionRequired[colCount]) {
-            outputValues[projectionMap[colCount]] = queryTypes[colCount]
-                .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(b));
-          }
-        } else {
-          input.skipBytes(v);
-        }
-      }
-    }
-    // measure
-    DataType dataType;
-    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
-      if (nullBitSet.get(colCount)) {
-        if (isFilterRequired[colCount]) {
-          filterValues[filterMap[colCount]] = null;
-        }
-        if (isProjectionRequired[colCount]) {
-          outputValues[projectionMap[colCount]] = null;
-        }
-      } else {
-        dataType = measureDataTypes[msrCount];
-        if (dataType == DataTypes.BOOLEAN) {
-          if (isRequired[colCount]) {
-            boolean v = input.readBoolean();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(1);
-          }
-        } else if (dataType == DataTypes.SHORT) {
-          if (isRequired[colCount]) {
-            short v = input.readShort();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(2);
-          }
-        } else if (dataType == DataTypes.INT) {
-          if (isRequired[colCount]) {
-            int v = input.readInt();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(4);
-          }
-        } else if (dataType == DataTypes.LONG) {
-          if (isRequired[colCount]) {
-            long v = input.readLong();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(8);
-          }
-        } else if (dataType == DataTypes.DOUBLE) {
-          if (isRequired[colCount]) {
-            double v = input.readDouble();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(8);
-          }
-        } else if (DataTypes.isDecimal(dataType)) {
-          int len = input.readShort();
-          if (isRequired[colCount]) {
-            BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = Decimal.apply(v);
-            }
-          } else {
-            input.skipBytes(len);
-          }
-        }
-      }
-    }
-  }
-
-  private void readRawRowFromStream() {
-    input.nextRow();
-    short nullLen = input.readShort();
-    BitSet nullBitSet = allNonNull;
-    if (nullLen > 0) {
-      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
-    }
-    int colCount = 0;
-    // primitive type dimension
-    for (; colCount < isNoDictColumn.length; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
-      } else {
-        if (isNoDictColumn[colCount]) {
-          int v = input.readShort();
-          outputValues[colCount] = input.readBytes(v);
-        } else {
-          outputValues[colCount] = input.readInt();
-        }
-      }
-    }
-    // complex type dimension
-    for (; colCount < dimensionCount; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        outputValues[colCount] = null;
-      } else {
-        short v = input.readShort();
-        outputValues[colCount] = input.readBytes(v);
-      }
-    }
-    // measure
-    DataType dataType;
-    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
-      if (nullBitSet.get(colCount)) {
-        outputValues[colCount] = null;
-      } else {
-        dataType = measureDataTypes[msrCount];
-        if (dataType == DataTypes.BOOLEAN) {
-          outputValues[colCount] = input.readBoolean();
-        } else if (dataType == DataTypes.SHORT) {
-          outputValues[colCount] = input.readShort();
-        } else if (dataType == DataTypes.INT) {
-          outputValues[colCount] = input.readInt();
-        } else if (dataType == DataTypes.LONG) {
-          outputValues[colCount] = input.readLong();
-        } else if (dataType == DataTypes.DOUBLE) {
-          outputValues[colCount] = input.readDouble();
-        } else if (DataTypes.isDecimal(dataType)) {
-          int len = input.readShort();
-          outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
-        }
-      }
-    }
-  }
-
-  private void putRowToColumnBatch(int rowId) {
-    for (int i = 0; i < projection.length; i++) {
-      Object value = outputValues[i];
-      ColumnVector col = columnarBatch.column(i);
-      org.apache.spark.sql.types.DataType t = col.dataType();
-      if (null == value) {
-        col.putNull(rowId);
-      } else {
-        if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-          col.putBoolean(rowId, (boolean)value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-          col.putByte(rowId, (byte) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-          col.putShort(rowId, (short) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-          col.putInt(rowId, (int) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-          col.putLong(rowId, (long) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-          col.putFloat(rowId, (float) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-          col.putDouble(rowId, (double) value);
-        } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-          UTF8String v = (UTF8String) value;
-          col.putByteArray(rowId, v.getBytes());
-        } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
-          DecimalType dt = (DecimalType)t;
-          Decimal d = Decimal.fromDecimal(value);
-          if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-            col.putInt(rowId, (int)d.toUnscaledLong());
-          } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-            col.putLong(rowId, d.toUnscaledLong());
-          } else {
-            final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-            byte[] bytes = integer.toByteArray();
-            col.putByteArray(rowId, bytes, 0, bytes.length);
-          }
-        } else if (t instanceof CalendarIntervalType) {
-          CalendarInterval c = (CalendarInterval) value;
-          col.getChildColumn(0).putInt(rowId, c.months);
-          col.getChildColumn(1).putLong(rowId, c.microseconds);
-        } else if (t instanceof org.apache.spark.sql.types.DateType) {
-          col.putInt(rowId, (int) value);
-        } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-          col.putLong(rowId, (long) value);
-        }
-      }
-    }
-  }
-
-  @Override public float getProgress() throws IOException, InterruptedException {
-    return 0;
-  }
-
-  public void setVectorReader(boolean isVectorReader) {
-    this.isVectorReader = isVectorReader;
-  }
-
-  public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
-    this.inputMetricsStats = inputMetricsStats;
-  }
-
-  @Override public void close() throws IOException {
-    if (null != input) {
-      input.close();
-    }
-    if (null != columnarBatch) {
-      columnarBatch.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
deleted file mode 100644
index a4b3be8..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonMetadataUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.format.FileHeader;
-import org.apache.carbondata.processing.loading.BadRecordsLogger;
-import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
-import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.loading.DataField;
-import org.apache.carbondata.processing.loading.DataLoadProcessBuilder;
-import org.apache.carbondata.processing.loading.converter.RowConverter;
-import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.loading.parser.RowParser;
-import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
-import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskID;
-
-/**
- * Stream record writer
- */
-public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName());
-
-  // basic info
-  private Configuration hadoopConf;
-  private CarbonLoadModel carbonLoadModel;
-  private CarbonDataLoadConfiguration configuration;
-  private CarbonTable carbonTable;
-  private int maxRowNums;
-  private int maxCacheSize;
-
-  // parser and converter
-  private RowParser rowParser;
-  private BadRecordsLogger badRecordLogger;
-  private RowConverter converter;
-  private CarbonRow currentRow = new CarbonRow(null);
-
-  // encoder
-  private DataField[] dataFields;
-  private BitSet nullBitSet;
-  private boolean[] isNoDictionaryDimensionColumn;
-  private int dimensionWithComplexCount;
-  private int measureCount;
-  private DataType[] measureDataTypes;
-  private StreamBlockletWriter output = null;
-
-  // data write
-  private String segmentDir;
-  private String fileName;
-  private DataOutputStream outputStream;
-  private boolean isFirstRow = true;
-  private boolean hasException = false;
-
-  CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException {
-    initialize(job);
-  }
-
-  public CarbonStreamRecordWriter(TaskAttemptContext job, CarbonLoadModel carbonLoadModel)
-      throws IOException {
-    this.carbonLoadModel = carbonLoadModel;
-    initialize(job);
-  }
-
-  private void initialize(TaskAttemptContext job) throws IOException {
-    // set basic information
-    hadoopConf = job.getConfiguration();
-    if (carbonLoadModel == null) {
-      carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
-      if (carbonLoadModel == null) {
-        throw new IOException(
-            "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model");
-      }
-    }
-    String segmentId = CarbonStreamOutputFormat.getSegmentId(hadoopConf);
-    carbonLoadModel.setSegmentId(segmentId);
-    carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
-    long taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
-    carbonLoadModel.setTaskNo("" + taskNo);
-    configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel);
-    maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS,
-        CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1;
-    maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE,
-        CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT);
-
-    segmentDir = CarbonTablePath.getSegmentPath(
-        carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId);
-    fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
-  }
-
-  private void initializeAtFirstRow() throws IOException, InterruptedException {
-
-    // initialize metadata
-    isNoDictionaryDimensionColumn =
-        CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
-    dimensionWithComplexCount = configuration.getDimensionCount();
-    measureCount = configuration.getMeasureCount();
-    dataFields = configuration.getDataFields();
-    measureDataTypes = new DataType[measureCount];
-    for (int i = 0; i < measureCount; i++) {
-      measureDataTypes[i] =
-          dataFields[dimensionWithComplexCount + i].getColumn().getDataType();
-    }
-
-    // initialize parser and converter
-    rowParser = new RowParserImpl(dataFields, configuration);
-    badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
-    converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
-    configuration.setCardinalityFinder(converter);
-    converter.initialize();
-
-    // initialize encoder
-    nullBitSet = new BitSet(dataFields.length);
-    int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
-        CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
-    output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize);
-
-    // initialize data writer
-    String filePath = segmentDir + File.separator + fileName;
-    FileFactory.FileType fileType = FileFactory.getFileType(filePath);
-    CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType);
-    if (carbonFile.exists()) {
-      // if the file is existed, use the append api
-      outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, fileType);
-    } else {
-      // IF the file is not existed, use the create api
-      outputStream = FileFactory.getDataOutputStream(filePath, fileType);
-      writeFileHeader();
-    }
-
-    isFirstRow = false;
-  }
-
-  @Override public void write(Void key, Object value) throws IOException, InterruptedException {
-    if (isFirstRow) {
-      initializeAtFirstRow();
-    }
-
-    // parse and convert row
-    currentRow.setData(rowParser.parseRow((Object[]) value));
-    converter.convert(currentRow);
-
-    // null bit set
-    nullBitSet.clear();
-    for (int i = 0; i < dataFields.length; i++) {
-      if (null == currentRow.getObject(i)) {
-        nullBitSet.set(i);
-      }
-    }
-    output.nextRow();
-    byte[] b = nullBitSet.toByteArray();
-    output.writeShort(b.length);
-    if (b.length > 0) {
-      output.writeBytes(b);
-    }
-    int dimCount = 0;
-    Object columnValue;
-
-    // primitive type dimension
-    for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
-      columnValue = currentRow.getObject(dimCount);
-      if (null != columnValue) {
-        if (isNoDictionaryDimensionColumn[dimCount]) {
-          byte[] col = (byte[]) columnValue;
-          output.writeShort(col.length);
-          output.writeBytes(col);
-        } else {
-          output.writeInt((int) columnValue);
-        }
-      }
-    }
-    // complex type dimension
-    for (; dimCount < dimensionWithComplexCount; dimCount++) {
-      columnValue = currentRow.getObject(dimCount);
-      if (null != columnValue) {
-        byte[] col = (byte[]) columnValue;
-        output.writeShort(col.length);
-        output.writeBytes(col);
-      }
-    }
-    // measure
-    DataType dataType;
-    for (int msrCount = 0; msrCount < measureCount; msrCount++) {
-      columnValue = currentRow.getObject(dimCount + msrCount);
-      if (null != columnValue) {
-        dataType = measureDataTypes[msrCount];
-        if (dataType == DataTypes.BOOLEAN) {
-          output.writeBoolean((boolean) columnValue);
-        } else if (dataType == DataTypes.SHORT) {
-          output.writeShort((short) columnValue);
-        } else if (dataType == DataTypes.INT) {
-          output.writeInt((int) columnValue);
-        } else if (dataType == DataTypes.LONG) {
-          output.writeLong((long) columnValue);
-        } else if (dataType == DataTypes.DOUBLE) {
-          output.writeDouble((double) columnValue);
-        } else if (DataTypes.isDecimal(dataType)) {
-          BigDecimal val = (BigDecimal) columnValue;
-          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
-          output.writeShort(bigDecimalInBytes.length);
-          output.writeBytes(bigDecimalInBytes);
-        } else {
-          String msg =
-              "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
-                  .getName();
-          LOGGER.error(msg);
-          throw new IOException(msg);
-        }
-      }
-    }
-
-    if (output.isFull()) {
-      appendBlockletToDataFile();
-    }
-  }
-
-  private void writeFileHeader() throws IOException {
-    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
-        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
-            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
-    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
-    for (int i = 0; i < dimLensWithComplex.length; i++) {
-      dimLensWithComplex[i] = Integer.MAX_VALUE;
-    }
-    int[] dictionaryColumnCardinality =
-        CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
-    List<Integer> cardinality = new ArrayList<>();
-    List<org.apache.carbondata.format.ColumnSchema> columnSchemaList = AbstractFactDataWriter
-        .getColumnSchemaListAndCardinality(cardinality, dictionaryColumnCardinality,
-            wrapperColumnSchemaList);
-    FileHeader fileHeader =
-        CarbonMetadataUtil.getFileHeader(true, columnSchemaList, System.currentTimeMillis());
-    fileHeader.setIs_footer_present(false);
-    fileHeader.setIs_splitable(true);
-    fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
-    outputStream.write(CarbonUtil.getByteArray(fileHeader));
-  }
-
-  /**
-   * write a blocklet to file
-   */
-  private void appendBlockletToDataFile() throws IOException {
-    if (output.getRowIndex() == -1) {
-      return;
-    }
-    output.apppendBlocklet(outputStream);
-    outputStream.flush();
-    // reset data
-    output.reset();
-  }
-
-  @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-    try {
-      // append remain buffer data
-      if (!hasException && !isFirstRow) {
-        appendBlockletToDataFile();
-        converter.finish();
-      }
-    } finally {
-      // close resource
-      CarbonUtil.closeStreams(outputStream);
-      if (output != null) {
-        output.close();
-      }
-      if (badRecordLogger != null) {
-        badRecordLogger.closeStreams();
-      }
-    }
-  }
-
-  public String getSegmentDir() {
-    return segmentDir;
-  }
-
-  public String getFileName() {
-    return fileName;
-  }
-
-  public void setHasException(boolean hasException) {
-    this.hasException = hasException;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
deleted file mode 100644
index 1989198..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.format.BlockletHeader;
-
-/**
- * stream blocklet reader
- */
-public class StreamBlockletReader {
-
-  private byte[] buffer;
-  private int offset;
-  private final byte[] syncMarker;
-  private final byte[] syncBuffer;
-  private final int syncLen;
-  private long pos = 0;
-  private final InputStream in;
-  private final long limitStart;
-  private final long limitEnd;
-  private boolean isAlreadySync = false;
-  private Compressor compressor = CompressorFactory.getInstance().getCompressor();
-  private int rowNums = 0;
-  private int rowIndex = 0;
-  private boolean isHeaderPresent;
-
-  StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) {
-    this.syncMarker = syncMarker;
-    syncLen = syncMarker.length;
-    syncBuffer = new byte[syncLen];
-    this.in = in;
-    limitStart = limit;
-    limitEnd = limitStart + syncLen;
-    this.isHeaderPresent = isHeaderPresent;
-  }
-
-  private void ensureCapacity(int capacity) {
-    if (buffer == null || capacity > buffer.length) {
-      buffer = new byte[capacity];
-    }
-  }
-
-  /**
-   * find the first position of sync_marker in input stream
-   */
-  private boolean sync() throws IOException {
-    if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
-      return false;
-    }
-    boolean skipHeader = false;
-    for (int i = 0; i < limitStart; i++) {
-      int j = 0;
-      for (; j < syncLen; j++) {
-        if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break;
-      }
-      if (syncLen == j) {
-        if (isHeaderPresent) {
-          if (skipHeader) {
-            return true;
-          } else {
-            skipHeader = true;
-          }
-        } else {
-          return true;
-        }
-      }
-      int value = in.read();
-      if (-1 == value) {
-        return false;
-      }
-      syncBuffer[i % syncLen] = (byte) value;
-      pos++;
-    }
-    return false;
-  }
-
-  BlockletHeader readBlockletHeader() throws IOException {
-    int len = readIntFromStream();
-    byte[] b = new byte[len];
-    if (!readBytesFromStream(b, 0, len)) {
-      throw new EOFException("Failed to read blocklet header");
-    }
-    BlockletHeader header = CarbonUtil.readBlockletHeader(b);
-    rowNums = header.getBlocklet_info().getNum_rows();
-    rowIndex = 0;
-    return header;
-  }
-
-  void readBlockletData(BlockletHeader header) throws IOException {
-    ensureCapacity(header.getBlocklet_length());
-    offset = 0;
-    int len = readIntFromStream();
-    byte[] b = new byte[len];
-    if (!readBytesFromStream(b, 0, len)) {
-      throw new EOFException("Failed to read blocklet data");
-    }
-    compressor.rawUncompress(b, buffer);
-  }
-
-  void skipBlockletData(boolean reset) throws IOException {
-    int len = readIntFromStream();
-    skip(len);
-    pos += len;
-    if (reset) {
-      this.rowNums = 0;
-      this.rowIndex = 0;
-    }
-  }
-
-  private void skip(int len) throws IOException {
-    long remaining = len;
-    do {
-      long skipLen = in.skip(remaining);
-      remaining -= skipLen;
-    } while (remaining > 0);
-  }
-
-  /**
-   * find the next blocklet
-   */
-  boolean nextBlocklet() throws IOException {
-    if (pos >= limitStart) {
-      return false;
-    }
-    if (isAlreadySync) {
-      if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
-        return false;
-      }
-    } else {
-      isAlreadySync = true;
-      if (!sync()) {
-        return false;
-      }
-    }
-
-    return pos < limitEnd;
-  }
-
-  boolean hasNext() throws IOException {
-    return rowIndex < rowNums;
-  }
-
-  void nextRow() {
-    rowIndex++;
-  }
-
-  int readIntFromStream() throws IOException {
-    int ch1 = in.read();
-    int ch2 = in.read();
-    int ch3 = in.read();
-    int ch4 = in.read();
-    if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
-    pos += 4;
-    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
-  }
-
-  /**
-   * Reads <code>len</code> bytes of data from the input stream into
-   * an array of bytes.
-   * @return <code>true</code> if reading data successfully, or
-   * <code>false</code> if there is no more data because the end of the stream has been reached.
-   */
-  boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException {
-    int readLen = in.read(b, offset, len);
-    if (readLen < 0) {
-      return false;
-    }
-    pos += readLen;
-    if (readLen < len) {
-      return readBytesFromStream(b, offset + readLen, len - readLen);
-    } else {
-      return true;
-    }
-  }
-
-  boolean readBoolean() {
-    return (buffer[offset++]) != 0;
-  }
-
-  short readShort() {
-    short v =  (short) ((buffer[offset + 1] & 255) +
-        ((buffer[offset]) << 8));
-    offset += 2;
-    return v;
-  }
-
-  byte[] copy(int len) {
-    byte[] b = new byte[len];
-    System.arraycopy(buffer, offset, b, 0, len);
-    return b;
-  }
-
-  int readInt() {
-    int v = ((buffer[offset + 3] & 255) +
-        ((buffer[offset + 2] & 255) << 8) +
-        ((buffer[offset + 1] & 255) << 16) +
-        ((buffer[offset]) << 24));
-    offset += 4;
-    return v;
-  }
-
-  long readLong() {
-    long v = ((long)(buffer[offset + 7] & 255)) +
-        ((long) (buffer[offset + 6] & 255) << 8) +
-        ((long) (buffer[offset + 5] & 255) << 16) +
-        ((long) (buffer[offset + 4] & 255) << 24) +
-        ((long) (buffer[offset + 3] & 255) << 32) +
-        ((long) (buffer[offset + 2] & 255) << 40) +
-        ((long) (buffer[offset + 1] & 255) << 48) +
-        ((long) (buffer[offset]) << 56);
-    offset += 8;
-    return v;
-  }
-
-  double readDouble() {
-    return Double.longBitsToDouble(readLong());
-  }
-
-  byte[] readBytes(int len) {
-    byte[] b = new byte[len];
-    System.arraycopy(buffer, offset, b, 0, len);
-    offset += len;
-    return b;
-  }
-
-  void skipBytes(int len) {
-    offset += len;
-  }
-
-  int getRowNums() {
-    return rowNums;
-  }
-
-  void close() {
-    CarbonUtil.closeStreams(in);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
deleted file mode 100644
index a0328b3..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.format.BlockletHeader;
-import org.apache.carbondata.format.BlockletInfo;
-import org.apache.carbondata.format.MutationType;
-
-/**
- * stream blocklet writer
- */
-public class StreamBlockletWriter {
-  private byte[] buffer;
-  private int maxSize;
-  private int maxRowNum;
-  private int rowSize;
-  private int count = 0;
-  private int rowIndex = -1;
-  private Compressor compressor = CompressorFactory.getInstance().getCompressor();
-
-  StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize) {
-    buffer = new byte[maxSize];
-    this.maxSize = maxSize;
-    this.maxRowNum = maxRowNum;
-    this.rowSize = rowSize;
-  }
-
-  private void ensureCapacity(int space) {
-    int newcount = space + count;
-    if (newcount > buffer.length) {
-      byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)];
-      System.arraycopy(buffer, 0, newbuf, 0, count);
-      buffer = newbuf;
-    }
-  }
-
-  void reset() {
-    count = 0;
-    rowIndex = -1;
-  }
-
-  byte[] getBytes() {
-    return buffer;
-  }
-
-  int getCount() {
-    return count;
-  }
-
-  int getRowIndex() {
-    return rowIndex;
-  }
-
-  void nextRow() {
-    rowIndex++;
-  }
-
-  boolean isFull() {
-    return rowIndex == maxRowNum || count >= maxSize;
-  }
-
-  void writeBoolean(boolean val) {
-    ensureCapacity(1);
-    buffer[count] = (byte) (val ? 1 : 0);
-    count += 1;
-  }
-
-  void writeShort(int val) {
-    ensureCapacity(2);
-    buffer[count + 1] = (byte) (val);
-    buffer[count] = (byte) (val >>> 8);
-    count += 2;
-  }
-
-  void writeInt(int val) {
-    ensureCapacity(4);
-    buffer[count + 3] = (byte) (val);
-    buffer[count + 2] = (byte) (val >>> 8);
-    buffer[count + 1] = (byte) (val >>> 16);
-    buffer[count] = (byte) (val >>> 24);
-    count += 4;
-  }
-
-  void writeLong(long val) {
-    ensureCapacity(8);
-    buffer[count + 7] = (byte) (val);
-    buffer[count + 6] = (byte) (val >>> 8);
-    buffer[count + 5] = (byte) (val >>> 16);
-    buffer[count + 4] = (byte) (val >>> 24);
-    buffer[count + 3] = (byte) (val >>> 32);
-    buffer[count + 2] = (byte) (val >>> 40);
-    buffer[count + 1] = (byte) (val >>> 48);
-    buffer[count] = (byte) (val >>> 56);
-    count += 8;
-  }
-
-  void writeDouble(double val) {
-    writeLong(Double.doubleToLongBits(val));
-  }
-
-  void writeBytes(byte[] b) {
-    writeBytes(b, 0, b.length);
-  }
-
-  void writeBytes(byte[] b, int off, int len) {
-    ensureCapacity(len);
-    System.arraycopy(b, off, buffer, count, len);
-    count += len;
-  }
-
-  void apppendBlocklet(DataOutputStream outputStream) throws IOException {
-    outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
-
-    BlockletInfo blockletInfo = new BlockletInfo();
-    blockletInfo.setNum_rows(getRowIndex() + 1);
-    BlockletHeader blockletHeader = new BlockletHeader();
-    blockletHeader.setBlocklet_length(getCount());
-    blockletHeader.setMutation(MutationType.INSERT);
-    blockletHeader.setBlocklet_info(blockletInfo);
-    byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader);
-    outputStream.writeInt(headerBytes.length);
-    outputStream.write(headerBytes);
-
-    byte[] compressed = compressor.compressByte(getBytes(), getCount());
-    outputStream.writeInt(compressed.length);
-    outputStream.write(compressed);
-  }
-
-  void close() {
-  }
-}