You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/04/24 09:27:03 UTC

[carbondata] branch master updated: [CARBONDATA-3351] Support Binary Data Type

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 72a5973  [CARBONDATA-3351] Support Binary Data Type
72a5973 is described below

commit 72a5973eab38549fa16b5497cafa588e86fd6bb5
Author: xubo245 <xu...@huawei.com>
AuthorDate: Tue Apr 16 18:28:42 2019 +0800

    [CARBONDATA-3351] Support Binary Data Type
    
    CarbonData supports binary data type
    
    Background :
    Binary is basic data type and widely used in various scenarios. So it’s better to support binary data type in CarbonData. Download data from S3 will be slow when dataset has lots of small binary data. The majority of application scenarios are related to storage small binary data type into CarbonData, which can avoid small binary files problem and speed up S3 access performance, also can decrease cost of accessing OBS by decreasing the number of calling S3 API. It also will easier to m [...]
    
    Goals:
    
    Supporting write binary data type by Carbon Java SDK.
    Supporting read binary data type by Spark Carbon file format(carbon datasource) and CarbonSession.
    Supporting read binary data type by Carbon SDK
    Supporting write binary by spark
---
 .../core/constants/CarbonCommonConstants.java      |    1 +
 .../safe/AbstractNonDictionaryVectorFiller.java    |    2 +-
 .../SafeVariableLengthDimensionDataChunkStore.java |    2 +-
 .../carbondata/core/datastore/page/ColumnPage.java |   11 +-
 .../core/datastore/page/LazyColumnPage.java        |    2 +
 .../datastore/page/UnsafeVarLengthColumnPage.java  |    7 +-
 .../datastore/page/VarLengthColumnPageBase.java    |    1 +
 .../page/encoding/ColumnPageEncoderMeta.java       |    4 +-
 .../page/encoding/DefaultEncodingFactory.java      |   14 +-
 .../carbondata/core/datastore/row/CarbonRow.java   |    6 +-
 .../ThriftWrapperSchemaConverterImpl.java          |    4 +
 .../core/metadata/datatype/BinaryType.java         |   29 +
 .../core/metadata/datatype/DataType.java           |    2 +-
 .../core/metadata/datatype/DataTypes.java          |    4 +
 .../metadata/schema/table/TableSchemaBuilder.java  |    1 +
 .../result/vector/impl/CarbonColumnVectorImpl.java |    6 +-
 .../apache/carbondata/core/util/CarbonUtil.java    |    9 +-
 .../apache/carbondata/core/util/DataTypeUtil.java  |   11 +-
 docs/sdk-guide.md                                  |    1 +
 docs/supported-data-types-in-carbondata.md         |    2 +-
 format/src/main/thrift/schema.thrift               |    1 +
 .../hadoop/util/CarbonVectorizedRecordReader.java  |    3 +-
 integration/spark-common-test/pom.xml              |    1 -
 .../org/apache/carbondata/sdk/util/BinaryUtil.java |   88 ++
 .../src/test/resources/binaryStringNullData.csv    |    4 +
 .../src/test/resources/binarydata.csv              |    3 +
 .../src/test/resources/binarystringdata.csv        |    3 +
 .../src/test/resources/binarystringdata2.csv       |    3 +
 .../resources/jsonFiles/data/allPrimitiveType.json |    3 +-
 .../testsuite/binary/TestBinaryDataType.scala      | 1153 ++++++++++++++++++++
 ...ryWithColumnMetCacheAndCacheLevelProperty.scala |    2 -
 .../TestNonTransactionalCarbonTableForBinary.scala |  162 +++
 ...TestNonTransactionalCarbonTableJsonWriter.scala |   37 +-
 .../StandardPartitionBadRecordLoggerTest.scala     |    2 -
 .../carbondata/spark/util/CarbonScalaUtil.scala    |   21 +-
 .../spark/util/DataTypeConverterUtil.scala         |    3 +
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |   39 +-
 integration/spark-datasource/pom.xml               |    1 -
 .../converter/SparkDataTypeConverterImpl.java      |    5 +-
 .../vectorreader/VectorizedCarbonRecordReader.java |    2 +-
 .../datasources/CarbonSparkDataSourceUtil.scala    |   10 +-
 .../datasources/SparkCarbonFileFormat.scala        |    2 +
 .../apache/spark/sql/util/SparkTypeConverter.scala |    1 +
 .../org/apache/carbondata/sdk/util/BinaryUtil.java |   89 ++
 .../SparkCarbonDataSourceBinaryTest.scala          |  544 +++++++++
 .../datasource/SparkCarbonDataSourceTest.scala     |   26 +
 .../apache/spark/sql/optimizer/CarbonFilters.scala |    8 +-
 .../converter/impl/BinaryFieldConverterImpl.java   |   78 ++
 .../converter/impl/FieldEncoderFactory.java        |    2 +
 .../loading/sort/SortStepRowHandler.java           |    9 +
 .../store/CarbonFactDataHandlerColumnar.java       |    3 +-
 .../carbondata/processing/store/TablePage.java     |   10 +-
 store/sdk/pom.xml                                  |    3 +-
 .../carbondata/sdk/file/CSVCarbonWriter.java       |    2 +-
 .../apache/carbondata/sdk/file/CarbonReader.java   |    1 -
 .../carbondata/sdk/file/CarbonWriterBuilder.java   |   68 +-
 .../java/org/apache/carbondata/sdk/file/Field.java |    4 +
 .../carbondata/sdk/file/JsonCarbonWriter.java      |    3 +-
 .../org/apache/carbondata/sdk/file/RowUtil.java    |   11 +
 .../apache/carbondata/sdk/file/utils/SDKUtil.java  |   79 ++
 .../carbondata/sdk/file/CSVCarbonWriterTest.java   |   16 +-
 .../carbondata/sdk/file/CarbonReaderTest.java      |  186 +++-
 .../org/apache/carbondata/sdk/file/ImageTest.java  |  818 ++++++++++++++
 .../org/apache/carbondata/util/BinaryUtil.java     |  126 +++
 .../src/test/resources/image/carbondatalogo.jpg    |  Bin 0 -> 59099 bytes
 .../image/flowers/10686568196_b1915544a8.jpg       |  Bin 0 -> 97920 bytes
 .../image/flowers/10686568196_b1915544a8.txt       |    1 +
 .../image/flowers/10712722853_5632165b04.jpg       |  Bin 0 -> 63389 bytes
 .../image/flowers/10712722853_5632165b04.txt       |    1 +
 .../flowers/subfolder/10841136265_af473efc60.jpg   |  Bin 0 -> 62144 bytes
 .../flowers/subfolder/10841136265_af473efc60.txt   |    1 +
 .../src/test/resources/image/voc/2007_000027.jpg   |  Bin 0 -> 145493 bytes
 .../src/test/resources/image/voc/2007_000027.xml   |   63 ++
 .../src/test/resources/image/voc/2007_000032.jpg   |  Bin 0 -> 54757 bytes
 .../src/test/resources/image/voc/2007_000032.xml   |   63 ++
 .../src/test/resources/image/voc/2007_000033.jpg   |  Bin 0 -> 71205 bytes
 .../src/test/resources/image/voc/2007_000033.xml   |   51 +
 .../src/test/resources/image/voc/2007_000039.jpg   |  Bin 0 -> 64668 bytes
 .../src/test/resources/image/voc/2007_000039.xml   |   27 +
 .../src/test/resources/image/voc/2009_001444.jpg   |  Bin 0 -> 677151 bytes
 .../src/test/resources/image/voc/2009_001444.xml   |   28 +
 .../image/vocForSegmentationClass/2007_000032.jpg  |  Bin 0 -> 54757 bytes
 .../image/vocForSegmentationClass/2007_000032.png  |  Bin 0 -> 2334 bytes
 .../image/vocForSegmentationClass/2007_000033.jpg  |  Bin 0 -> 71205 bytes
 .../image/vocForSegmentationClass/2007_000033.png  |  Bin 0 -> 2814 bytes
 .../image/vocForSegmentationClass/2007_000042.jpg  |  Bin 0 -> 82847 bytes
 .../image/vocForSegmentationClass/2007_000042.png  |  Bin 0 -> 3620 bytes
 .../org/apache/carbondata/tool/CarbonCliTest.java  |   61 +-
 88 files changed, 3900 insertions(+), 150 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index e02241e..c9efc34 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1749,6 +1749,7 @@ public final class CarbonCommonConstants {
   public static final String ARRAY_SEPARATOR = "\001";
   public static final String STRING = "String";
   public static final String SHORT = "Short";
+  public static final String BINARY = "Binary";
   public static final String TIMESTAMP = "Timestamp";
   public static final String ARRAY = "array";
   public static final String STRUCT = "struct";
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
index 38e28ae..298d165 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
@@ -52,7 +52,7 @@ class NonDictionaryVectorFillerFactory {
       } else {
         return new StringVectorFiller(numberOfRows, actualDataLength);
       }
-    } else if (type == DataTypes.VARCHAR) {
+    } else if (type == DataTypes.VARCHAR || type == DataTypes.BINARY) {
       return new LongStringVectorFiller(numberOfRows, actualDataLength);
     } else if (type == DataTypes.TIMESTAMP) {
       return new TimeStampVectorFiller(numberOfRows);
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 219d8c9..b740b28 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -177,7 +177,7 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
             length)) {
       vector.putNull(vectorRow);
     } else {
-      if (dt == DataTypes.STRING || dt == DataTypes.VARCHAR) {
+      if (dt == DataTypes.STRING || dt == DataTypes.VARCHAR || dt == DataTypes.BINARY) {
         vector.putByteArray(vectorRow, currentDataOffset, length, data);
       } else if (dt == DataTypes.BOOLEAN) {
         vector.putBoolean(vectorRow, ByteUtil.toBoolean(data[currentDataOffset]));
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 22c5536..41d93ef 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -204,7 +204,8 @@ public abstract class ColumnPage {
             new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize);
       } else if (dataType == DataTypes.STRING
           || dataType == DataTypes.BYTE_ARRAY
-          || dataType == DataTypes.VARCHAR) {
+          || dataType == DataTypes.VARCHAR
+          || dataType == DataTypes.BINARY) {
         instance = new UnsafeVarLengthColumnPage(
             new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize);
       } else {
@@ -231,7 +232,8 @@ public abstract class ColumnPage {
         instance = newDecimalPage(columnPageEncoderMeta, new byte[pageSize][]);
       } else if (dataType == DataTypes.STRING
           || dataType == DataTypes.BYTE_ARRAY
-          || dataType == DataTypes.VARCHAR) {
+          || dataType == DataTypes.VARCHAR
+          || dataType == DataTypes.BINARY) {
         instance = new SafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
       } else {
         throw new RuntimeException("Unsupported data dataType: " + dataType);
@@ -426,6 +428,9 @@ public abstract class ColumnPage {
     } else if (dataType == DataTypes.FLOAT) {
       putFloat(rowId, (float) value);
       statsCollector.update((float) value);
+    } else if (dataType == DataTypes.BINARY) {
+      putBytes(rowId, (byte[]) value);
+      statsCollector.update((byte[]) value);
     } else {
       throw new RuntimeException("unsupported data type: " + dataType);
     }
@@ -782,6 +787,8 @@ public abstract class ColumnPage {
         || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.PLAIN_LONG_VALUE
         || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.PLAIN_VALUE)) {
       return compressor.compressByte(getComplexParentFlattenedBytePage());
+    } else if (dataType == DataTypes.BINARY) {
+      return getLVFlattenedBytePage();
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       return compressor.compressByte(getLVFlattenedBytePage());
     } else {
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 772916d..d0389d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -86,6 +86,8 @@ public class LazyColumnPage extends ColumnPage {
       return converter.decodeDouble(columnPage.getFloat(rowId));
     } else if (dataType == DataTypes.DOUBLE) {
       return columnPage.getDouble(rowId);
+    } else if (dataType == DataTypes.BINARY) {
+      return converter.decodeDouble(columnPage.getByte(rowId));
     } else {
       throw new RuntimeException("internal error: " + this.toString());
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index 4693dba..c23c147 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 /**
  * This extension uses unsafe memory to store page data, for variable length data type (string)
@@ -35,7 +36,11 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
   UnsafeVarLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize)
       throws MemoryException {
     super(columnPageEncoderMeta, pageSize);
-    capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BINARY) {
+      capacity = (int) (pageSize * DEFAULT_BINARY_SIZE * FACTOR);
+    } else {
+      capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
+    }
     memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
     baseAddress = memoryBlock.getBaseObject();
     baseOffset = memoryBlock.getBaseOffset();
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 0f409f6..a941880 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -42,6 +42,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   static final int longBits = DataTypes.LONG.getSizeBits();
   // default size for each row, grows as needed
   static final int DEFAULT_ROW_SIZE = 8;
+  static final int DEFAULT_BINARY_SIZE = 512;
 
   static final double FACTOR = 1.25;
 
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index 03a43f8..f04d38a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -158,7 +158,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
         out.writeInt(-1);
         out.writeInt(-1);
       }
-    } else if (dataType == DataTypes.BYTE_ARRAY) {
+    } else if (dataType == DataTypes.BYTE_ARRAY || dataType == DataTypes.BINARY) {
       // for complex type, it will come here, ignoring stats for complex type
       // TODO: support stats for complex type
     } else {
@@ -206,7 +206,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       in.readInt();
       // precision field is obsoleted. It is stored in the schema data type in columnSpec
       in.readInt();
-    } else if (dataType == DataTypes.BYTE_ARRAY) {
+    } else if (dataType == DataTypes.BYTE_ARRAY || dataType == DataTypes.BINARY) {
       // for complex type, it will come here, ignoring stats for complex type
       // TODO: support stats for complex type
     } else {
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 506e1c7..f2eb92f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -101,11 +101,10 @@ public class DefaultEncodingFactory extends EncodingFactory {
             dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex())
             .createEncoder(null);
       case PLAIN_VALUE:
-        return new HighCardDictDimensionIndexCodec(
-            dimensionSpec.isInSortColumns(),
+        return new HighCardDictDimensionIndexCodec(dimensionSpec.isInSortColumns(),
             dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
-            dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR)
-            .createEncoder(null);
+            dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR
+                || dimensionSpec.getSchemaDataType() == DataTypes.BINARY).createEncoder(null);
       default:
         throw new RuntimeException("unsupported dimension type: " +
             dimensionSpec.getColumnType());
@@ -114,9 +113,12 @@ public class DefaultEncodingFactory extends EncodingFactory {
 
   private ColumnPageEncoder createEncoderForMeasureOrNoDictionaryPrimitive(ColumnPage columnPage,
       TableSpec.ColumnSpec columnSpec) {
+
     SimpleStatsResult stats = columnPage.getStatistics();
     DataType dataType = stats.getDataType();
-    if (dataType == DataTypes.BOOLEAN) {
+    if (dataType == DataTypes.BOOLEAN
+        || dataType == DataTypes.BYTE_ARRAY
+        || columnPage.getDataType() == DataTypes.BINARY) {
       return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
     } else if (dataType == DataTypes.BYTE ||
         dataType == DataTypes.SHORT ||
@@ -128,8 +130,6 @@ public class DefaultEncodingFactory extends EncodingFactory {
       return createEncoderForDecimalDataTypeMeasure(columnPage, columnSpec);
     } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) {
       return selectCodecByAlgorithmForFloating(stats, false, columnSpec).createEncoder(null);
-    } else if (dataType == DataTypes.BYTE_ARRAY) {
-      return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
     } else {
       throw new RuntimeException("unsupported data type: " + stats.getDataType());
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
index 1141707..c140017 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
@@ -54,7 +54,11 @@ public class CarbonRow implements Serializable {
   }
 
   public String getString(int ordinal) {
-    return (String) data[ordinal];
+    if (null == data[ordinal]) {
+      return null;
+    } else {
+      return String.valueOf(data[ordinal]);
+    }
   }
 
   public Object getObject(int ordinal) {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index dca7fa2..d9fa936 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -152,6 +152,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
       return org.apache.carbondata.format.DataType.DATE;
     } else if (dataType.getId() == DataTypes.TIMESTAMP.getId()) {
       return org.apache.carbondata.format.DataType.TIMESTAMP;
+    } else if (dataType.getId() == DataTypes.BINARY.getId()) {
+      return org.apache.carbondata.format.DataType.BINARY;
     } else if (DataTypes.isArrayType(dataType)) {
       return org.apache.carbondata.format.DataType.ARRAY;
     } else if (DataTypes.isStructType(dataType)) {
@@ -498,6 +500,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         return DataTypes.TIMESTAMP;
       case DATE:
         return DataTypes.DATE;
+      case BINARY:
+        return DataTypes.BINARY;
       case ARRAY:
         return DataTypes.createDefaultArrayType();
       case STRUCT:
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/BinaryType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/BinaryType.java
new file mode 100644
index 0000000..6ecd9db
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/BinaryType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata.datatype;
+
+public class BinaryType extends DataType {
+  static final DataType BINARY =
+      new BinaryType(DataTypes.BINARY_TYPE_ID, 26, "BINARY", -1);
+  private BinaryType(int id, int precedenceOrder, String name, int sizeInBytes) {
+    super(id, precedenceOrder, name, sizeInBytes);
+  }
+  // this function is needed to ensure singleton pattern while supporting java serialization
+  private Object readResolve() {
+    return DataTypes.BINARY;
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index 8514ccb..4f282e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -97,7 +97,7 @@ public class DataType implements Serializable {
       return TIMESTAMP_CHAR;
     } else if (dataType == DataTypes.DATE) {
       return DATE_CHAR;
-    } else if (dataType == DataTypes.BYTE_ARRAY) {
+    } else if (dataType == DataTypes.BYTE_ARRAY || dataType == DataTypes.BINARY) {
       return BYTE_ARRAY_CHAR;
     } else {
       throw new RuntimeException("Unexpected type: " + dataType);
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
index d71eea4..c073fa0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
@@ -37,6 +37,7 @@ public class DataTypes {
   public static final DataType DOUBLE = DoubleType.DOUBLE;
   public static final DataType NULL = NullType.NULL;
   public static final DataType BYTE = ByteType.BYTE;
+  public static final DataType BINARY = BinaryType.BINARY;
 
   // internal use only, for variable length data type
   public static final DataType BYTE_ARRAY = ByteArrayType.BYTE_ARRAY;
@@ -69,6 +70,7 @@ public class DataTypes {
   public static final int STRUCT_TYPE_ID = 12;
   public static final int MAP_TYPE_ID = 13;
   public static final int VARCHAR_TYPE_ID = 18;
+  public static final int BINARY_TYPE_ID = 19;
 
   /**
    * create a DataType instance from uniqueId of the DataType
@@ -102,6 +104,8 @@ public class DataTypes {
       return NULL;
     } else if (id == DECIMAL_TYPE_ID) {
       return createDefaultDecimalType();
+    } else if (id == BINARY.getId()) {
+      return BINARY;
     } else if (id == ARRAY_TYPE_ID) {
       return createDefaultArrayType();
     } else if (id == STRUCT_TYPE_ID) {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 53542d5..1d64293 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -186,6 +186,7 @@ public class TableSchemaBuilder {
         field.getDataType() == DataTypes.VARCHAR ||
         field.getDataType() == DataTypes.DATE ||
         field.getDataType() == DataTypes.TIMESTAMP ||
+        field.getDataType() == DataTypes.BINARY ||
         field.getDataType().isComplexType() ||
         (isComplexChild))  {
       newColumn.setDimensionColumn(true);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index 30d2317..18f440a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -92,7 +92,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
     } else if (dataType instanceof DecimalType) {
       decimals = new BigDecimal[batchSize];
     } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY
-        || dataType == DataTypes.VARCHAR) {
+        || dataType == DataTypes.VARCHAR || dataType == DataTypes.BINARY) {
       dictionaryVector = new CarbonColumnVectorImpl(batchSize, DataTypes.INT);
       bytes = new byte[batchSize][];
     } else {
@@ -233,7 +233,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
     } else if (dataType instanceof DecimalType) {
       return decimals[rowId];
     } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY
-        || dataType == DataTypes.VARCHAR) {
+        || dataType == DataTypes.VARCHAR || dataType == DataTypes.BINARY) {
       if (null != carbonDictionary) {
         int dictKey = (Integer) dictionaryVector.getData(rowId);
         return carbonDictionary.getDictionaryValue(dictKey);
@@ -295,7 +295,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
     } else if (dataType instanceof DecimalType) {
       Arrays.fill(decimals, null);
     } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY
-        || dataType == DataTypes.VARCHAR) {
+        || dataType == DataTypes.VARCHAR || dataType == DataTypes.BINARY) {
       Arrays.fill(bytes, null);
       this.dictionaryVector.reset();
     } else {
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index a4af9cc..d8e4499 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2177,7 +2177,11 @@ public final class CarbonUtil {
         return DataTypes.FLOAT;
       case BYTE:
         return DataTypes.BYTE;
+      case BINARY:
+        return DataTypes.BINARY;
       default:
+        LOGGER.warn(String.format("Cannot match the data type, using default String data type: %s",
+            DataTypes.STRING.getName()));
         return DataTypes.STRING;
     }
   }
@@ -2382,9 +2386,8 @@ public final class CarbonUtil {
       return b.array();
     } else if (DataTypes.isDecimal(dataType)) {
       return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
-    } else if (dataType == DataTypes.BYTE_ARRAY) {
-      return (byte[]) value;
-    } else if (dataType == DataTypes.STRING
+    } else if (dataType == DataTypes.BYTE_ARRAY || dataType == DataTypes.BINARY
+        || dataType == DataTypes.STRING
         || dataType == DataTypes.DATE
         || dataType == DataTypes.VARCHAR) {
       return (byte[]) value;
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 303cc80..7129f34 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -476,7 +476,9 @@ public final class DataTypeUtil {
     } else if (actualDataType == DataTypes.LONG) {
       return ByteUtil.toXorBytes((Long) dimensionValue);
     } else if (actualDataType == DataTypes.TIMESTAMP) {
-      return ByteUtil.toXorBytes((Long)dimensionValue);
+      return ByteUtil.toXorBytes((Long) dimensionValue);
+    } else if (actualDataType == DataTypes.BINARY) {
+      return (byte[]) dimensionValue;
     } else {
       // Default action for String/Varchar
       return ByteUtil.toBytes(dimensionValue.toString());
@@ -603,6 +605,11 @@ public final class DataTypeUtil {
           return null;
         }
         return getDataTypeConverter().convertFromBigDecimalToDecimal(byteToBigDecimal(dataInBytes));
+      } else if (actualDataType == DataTypes.BINARY) {
+        if (isEmptyByteArray(dataInBytes)) {
+          return null;
+        }
+        return dataInBytes;
       } else {
         // Default action for String/Varchar
         return getDataTypeConverter().convertFromByteToUTF8String(dataInBytes);
@@ -1057,6 +1064,8 @@ public final class DataTypeUtil {
       return DataTypes.BYTE_ARRAY;
     } else if (DataTypes.BYTE_ARRAY.getName().equalsIgnoreCase(dataType.getName())) {
       return DataTypes.BYTE_ARRAY;
+    } else if (DataTypes.BINARY.getName().equalsIgnoreCase(dataType.getName())) {
+      return DataTypes.BINARY;
     } else if (dataType.getName().equalsIgnoreCase("decimal")) {
       return DataTypes.createDecimalType(precision, scale);
     } else if (dataType.getName().equalsIgnoreCase("array")) {
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index e040e64..002a06b 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -195,6 +195,7 @@ Each of SQL data types and Avro Data Types are mapped into data types of SDK. Fo
 | BIGINT | LONG | DataTypes.LONG |
 | DOUBLE | DOUBLE | DataTypes.DOUBLE |
 | VARCHAR |  -  | DataTypes.STRING |
+| BINARY |  -  | DataTypes.BINARY |
 | FLOAT | FLOAT | DataTypes.FLOAT |
 | BYTE |  -  | DataTypes.BYTE |
 | DATE | DATE | DataTypes.DATE |
diff --git a/docs/supported-data-types-in-carbondata.md b/docs/supported-data-types-in-carbondata.md
index daf1acf..4960453 100644
--- a/docs/supported-data-types-in-carbondata.md
+++ b/docs/supported-data-types-in-carbondata.md
@@ -51,4 +51,4 @@
 
   * Other Types
     * BOOLEAN
-
+    * BINARY
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index d39e548..5daf767 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -39,6 +39,7 @@ enum DataType {
 	MAP = 23,
 	FLOAT = 24,
 	BYTE = 25
+	BINARY = 26,
 }
 
 /**
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
index 1c11275..99db9d3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
@@ -147,7 +147,8 @@ public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
         DataType dataType = msr.getMeasure().getDataType();
         if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT
             || dataType == DataTypes.INT || dataType == DataTypes.LONG
-            || dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE) {
+            || dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE
+            || dataType == DataTypes.BINARY) {
           fields[msr.getOrdinal()] =
               new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
         } else if (DataTypes.isDecimal(dataType)) {
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index fd96f88..3fd2b03 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -163,7 +163,6 @@
   </dependencies>
 
   <build>
-    <testSourceDirectory>src/test/scala</testSourceDirectory>
     <resources>
       <resource>
         <directory>src/resources</directory>
diff --git a/integration/spark-common-test/src/test/java/org/apache/carbondata/sdk/util/BinaryUtil.java b/integration/spark-common-test/src/test/java/org/apache/carbondata/sdk/util/BinaryUtil.java
new file mode 100644
index 0000000..9144e4c
--- /dev/null
+++ b/integration/spark-common-test/src/test/java/org/apache/carbondata/sdk/util/BinaryUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.util;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.sdk.file.CarbonWriter;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.sdk.file.Schema;
+
+import java.io.*;
+
+import static org.apache.carbondata.sdk.file.utils.SDKUtil.listFiles;
+
+public class BinaryUtil {
+  public static void binaryToCarbon(String sourceImageFolder, String outputPath,
+                                    String sufAnnotation, final String sufImage) throws Exception {
+    Field[] fields = new Field[5];
+    fields[0] = new Field("binaryId", DataTypes.INT);
+    fields[1] = new Field("binaryName", DataTypes.STRING);
+    fields[2] = new Field("binary", DataTypes.BINARY);
+    fields[3] = new Field("labelName", DataTypes.STRING);
+    fields[4] = new Field("labelContent", DataTypes.STRING);
+    CarbonWriter writer = CarbonWriter
+        .builder()
+        .outputPath(outputPath)
+        .withCsvInput(new Schema(fields))
+        .withBlockSize(256)
+        .writtenBy("binaryExample")
+        .withPageSizeInMb(1)
+        .build();
+    binaryToCarbon(sourceImageFolder, writer, sufAnnotation, sufImage);
+  }
+
+  public static boolean binaryToCarbon(String sourceImageFolder, CarbonWriter writer,
+      String sufAnnotation, final String sufImage) throws Exception {
+    int num = 1;
+
+    byte[] originBinary = null;
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+
+      Object[] files = listFiles(sourceImageFolder, sufImage).toArray();
+
+      if (null != files) {
+        for (int i = 0; i < files.length; i++) {
+          // read image and encode to Hex
+          BufferedInputStream bis = new BufferedInputStream(
+              new FileInputStream(new File((String) files[i])));
+          originBinary = new byte[bis.available()];
+          while ((bis.read(originBinary)) != -1) {
+          }
+
+          String labelFileName = ((String) files[i]).split(sufImage)[0] + sufAnnotation;
+          BufferedInputStream txtBis = new BufferedInputStream(new FileInputStream(labelFileName));
+          String labelValue = null;
+          byte[] labelBinary = null;
+          labelBinary = new byte[txtBis.available()];
+          while ((txtBis.read(labelBinary)) != -1) {
+            labelValue = new String(labelBinary, "UTF-8");
+          }
+          // write data
+          writer.write(new Object[]{i, (String) files[i], originBinary,
+              labelFileName, labelValue});
+          bis.close();
+          txtBis.close();
+        }
+      }
+      writer.close();
+    }
+    return true;
+  }
+}
diff --git a/integration/spark-common-test/src/test/resources/binaryStringNullData.csv b/integration/spark-common-test/src/test/resources/binaryStringNullData.csv
new file mode 100644
index 0000000..8a7b595
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/binaryStringNullData.csv
@@ -0,0 +1,4 @@
+2|false|2.png|history|true
+3|false|3.png|biology|false
+3|false|3.png||false
+1|true|1.png|education|true
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/resources/binarydata.csv b/integration/spark-common-test/src/test/resources/binarydata.csv
new file mode 100644
index 0000000..ed642c2
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/binarydata.csv
@@ -0,0 +1,3 @@
+2,false,2.png,89504e470d0a1a0a0000000d49484452000002f5000000cf0806000000753c2e6f0000000473424954080808087c0864880000001974455874536f66747761726500676e6f6d652d73637265656e73686f74ef03bf3e0000200049444154789cecbdf99324b9b1e7f7012222efccbafb9ae9e9991e72488ad2d3aecc642bd3ff6f32993dadd993f62d8fe190ddd357dd5995779c807e001081888caaae6ecee370a4f4b6e8cc8a8cc0e170b87f01381c426badd9d18e76b4a31ded68473bdad18e76f48b25f9731760473bdad18e76b4a31ded68473bdad1df473b50bfa31ded68473bdad18e76b4a31dfdc26907ea [...]
+3,false,3.png,89504e470d0a1a0a0000000d4948445200000136000000a108060000004da893e60000000473424954080808087c0864880000001974455874536f66747761726500676e6f6d652d73637265656e73686f74ef03bf3e0000200049444154789cecbd77741dd77deffbd97b4ec54107480004c02a92a24452222552dda6e4a83ab665d9b22dc5762c3bb16f927b7373535ecacabd6bbd756f9297b7d6bd491c3b2f4e6c15ab595631d53b295194a8c2de3b09a277e000a7ceccdeef8f293800d109928875be5ecba0ce99d9b3f79cd9dff9f59fd05a6bf2c8238f3c7e8d202ff604f2c8238f3c661a7962cb238fff88 [...]
+1,true,1.png,89504e470d0a1a0a0000000d494844520000014a0000005008060000007f133c4c0000000473424954080808087c0864880000001974455874536f66747761726500676e6f6d652d73637265656e73686f74ef03bf3e0000200049444154789cedbd596c1cf79deffbadaaaeded8dd642fdcf755a24449a62c538b25d9892d8f6cc571ec646e723088279933987b33b81860705fe6615e2fce794b70e6006770700607671e06492e66e2716225926ccb32b5d35a487197c47de97defaebdea3e7457a99b5d4db229ca9273ea031816bb96aeaaaefad5ffff5bbe3f626d6d4d818181818141590896650d4369606060b [...]
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/resources/binarystringdata.csv b/integration/spark-common-test/src/test/resources/binarystringdata.csv
new file mode 100644
index 0000000..02121ca
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/binarystringdata.csv
@@ -0,0 +1,3 @@
+2|false|2.png|history|true
+3|false|3.png|biology|false
+1|true|1.png|education|true
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/resources/binarystringdata2.csv b/integration/spark-common-test/src/test/resources/binarystringdata2.csv
new file mode 100644
index 0000000..f3ea934
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/binarystringdata2.csv
@@ -0,0 +1,3 @@
+2|false|2.png|abc|true
+3|false|3.png|binary|false
+1|true|1.png|^Ayard duty^B|true
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveType.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveType.json
index 6d81ec7..d61ff14 100644
--- a/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveType.json
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveType.json
@@ -7,5 +7,6 @@
 	"boolField": false,
 	"dateField": "2019-03-02",
 	"timeField": "2019-02-12 03:03:34",
-	"decimalField" : 55.35
+	"decimalField" : 55.35,
+	"binaryField" : "abc"
 }
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
new file mode 100644
index 0000000..b2bda24
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -0,0 +1,1153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.binary
+
+import java.util.Arrays
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+
+import org.apache.commons.codec.binary.Hex
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.util.SparkUtil
+import org.scalatest.BeforeAndAfterAll
+
+/**
+  * Test cases for testing binary
+  */
+class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
+    override def beforeAll {
+    }
+
+    test("Create table and load data with binary column") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS binaryTable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+               | TBLPROPERTIES('SORT_COLUMNS'='')
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | INTO TABLE binaryTable
+               | OPTIONS('header'='false')
+             """.stripMargin)
+
+        val result = sql("desc formatted binaryTable").collect()
+        var flag = false
+        result.foreach { each =>
+            if ("binary".equals(each.get(1))) {
+                flag = true
+            }
+        }
+        assert(flag)
+
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
+        try {
+            val df = sql("SELECT * FROM binaryTable").collect()
+            assert(3 == df.length)
+            df.foreach { each =>
+                assert(5 == each.length)
+
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+
+                val bytes40 = each.getAs[Array[Byte]](3).slice(0, 40)
+                val binaryName = each(2).toString
+                val expectedBytes = Hex.encodeHex(firstBytes20.get(binaryName).get)
+                assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(), bytes40), "incorrect numeric value for flattened binaryField")
+
+                assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+
+                val df = sql("SELECT name,binaryField FROM binaryTable").collect()
+                assert(3 == df.length)
+                df.foreach { each =>
+                    assert(2 == each.length)
+                    val binaryName = each(0).toString
+                    val bytes40 = each.getAs[Array[Byte]](1).slice(0, 40)
+                    val expectedBytes = Hex.encodeHex(firstBytes20.get(binaryName).get)
+                    assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(), bytes40), "incorrect numeric value for flattened binaryField")
+                }
+            }
+        } catch {
+            case e: Exception =>
+                e.printStackTrace()
+                assert(false)
+        }
+    }
+
+    private val firstBytes20 = Map("1.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 1, 74),
+        "2.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 2, -11),
+        "3.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 1, 54)
+    )
+
+    test("Don't support sort_columns") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        val exception = intercept[Exception] {
+            sql(
+                s"""
+                   | CREATE TABLE IF NOT EXISTS binaryTable (
+                   |    id double,
+                   |    label boolean,
+                   |    name STRING,
+                   |    binaryField BINARY,
+                   |    autoLabel boolean)
+                   | STORED BY 'carbondata'
+                   | TBLPROPERTIES('SORT_COLUMNS'='binaryField')
+             """.stripMargin)
+        }
+        assert(exception.getMessage.contains("sort_columns is unsupported for binary datatype column"))
+    }
+
+    test("Unsupport LOCAL_DICTIONARY_INCLUDE for binary") {
+
+        sql("DROP TABLE IF EXISTS binaryTable")
+        val exception = intercept[MalformedCarbonCommandException] {
+            sql(
+                """
+                  | CREATE TABLE binaryTable(
+                  |     id int,
+                  |     name string,
+                  |     city string,
+                  |     age int,
+                  |     binaryField binary)
+                  | STORED BY 'org.apache.carbondata.format'
+                  | tblproperties('local_dictionary_enable'='true','local_dictionary_include'='binaryField')
+                """.stripMargin)
+        }
+        assert(exception.getMessage.contains(
+            "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE column: binaryfield is not a string/complex/varchar datatype column. " +
+                    "LOCAL_DICTIONARY_COLUMN should be no dictionary string/complex/varchar datatype column"))
+    }
+
+    test("Supports LOCAL_DICTIONARY_EXCLUDE for binary") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            """
+              | CREATE TABLE binaryTable(
+              |     id int,
+              |     name string,
+              |     city string,
+              |     age int,
+              |     binaryField binary)
+              | STORED BY 'org.apache.carbondata.format'
+              | tblproperties('local_dictionary_enable'='true','LOCAL_DICTIONARY_EXCLUDE'='binaryField')
+            """.stripMargin)
+        assert(true)
+    }
+
+    test("Unsupport inverted_index for binary") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        val exception = intercept[MalformedCarbonCommandException] {
+            sql(
+                """
+                  | CREATE TABLE binaryTable(
+                  |     id int,
+                  |     name string,
+                  |     city string,
+                  |     age int,
+                  |     binaryField binary)
+                  | STORED BY 'org.apache.carbondata.format'
+                  | tblproperties('inverted_index'='binaryField')
+                """.stripMargin)
+        }
+        assert(exception.getMessage.contains("INVERTED_INDEX column: binaryfield should be present in SORT_COLUMNS"))
+    }
+
+    test("Unsupport inverted_index and sort_columns for binary") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        val exception = intercept[MalformedCarbonCommandException] {
+            sql(
+                """
+                  | CREATE TABLE binaryTable(
+                  |     id int,
+                  |     name string,
+                  |     city string,
+                  |     age int,
+                  |     binaryField binary)
+                  | STORED BY 'org.apache.carbondata.format'
+                  | tblproperties('inverted_index'='binaryField','SORT_COLUMNS'='binaryField')
+                """.stripMargin)
+        }
+        assert(exception.getMessage.contains("sort_columns is unsupported for binary datatype column: binaryfield"))
+    }
+
+    test("COLUMN_META_CACHE doesn't support binary") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        val exception = intercept[Exception] {
+            sql(
+                s"""
+                   | CREATE TABLE IF NOT EXISTS binaryTable (
+                   |    id INT,
+                   |    label boolean,
+                   |    name STRING,
+                   |    binaryField BINARY,
+                   |    autoLabel boolean)
+                   | STORED BY 'carbondata'
+                   | TBLPROPERTIES('COLUMN_META_CACHE'='binaryField')
+             """.stripMargin)
+        }
+        assert(exception.getMessage.contains("binaryfield is a binary data type column and binary data type is not allowed for the option"))
+    }
+
+    test("RANGE_COLUMN doesn't support binary") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        val exception = intercept[Exception] {
+            sql(
+                s"""
+                   | CREATE TABLE IF NOT EXISTS binaryTable (
+                   |    id INT,
+                   |    label boolean,
+                   |    name STRING,
+                   |    binaryField BINARY,
+                   |    autoLabel boolean)
+                   | STORED BY 'carbondata'
+                   | TBLPROPERTIES('RANGE_COLUMN'='binaryField')
+             """.stripMargin)
+        }
+        assert(exception.getMessage.contains("RANGE_COLUMN doesn't support binary data type"))
+    }
+
+    test("Test carbon.column.compressor=zstd") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS binaryTable (
+               |    id INT,
+               |    label boolean,
+               |    name STRING,
+               |    binaryField BINARY,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+               | TBLPROPERTIES('carbon.column.compressor'='zstd')
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | INTO TABLE binaryTable
+               | OPTIONS('header'='false')
+             """.stripMargin)
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
+        val value = sql("SELECT * FROM binaryTable").collect()
+        value.foreach { each =>
+            assert(5 == each.length)
+            assert(1 == each.getAs[Int](0) || 2 == each.getAs[Int](0) || 3 == each.getAs[Int](0))
+            assert(".png".equals(each.getAs(2).toString.substring(1, 5)))
+            assert("89504e470d0a1a0a0000000d4948445200000".equals(new String(each.getAs[Array[Byte]](3).slice(0, 37))))
+        }
+    }
+
+    test("Test carbon.column.compressor=gzip") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS binaryTable (
+               |    id INT,
+               |    label boolean,
+               |    name STRING,
+               |    binaryField BINARY,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+               | TBLPROPERTIES('carbon.column.compressor'='gzip')
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | INTO TABLE binaryTable
+               | OPTIONS('header'='false')
+             """.stripMargin)
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
+        val value = sql("SELECT * FROM binaryTable").collect()
+        value.foreach { each =>
+            assert(5 == each.length)
+            assert(1 == each.getAs[Int](0) || 2 == each.getAs[Int](0) || 3 == each.getAs[Int](0))
+            assert(".png".equals(each.getAs(2).toString.substring(1, 5)))
+            assert("89504e470d0a1a0a0000000d4948445200000".equals(new String(each.getAs[Array[Byte]](3).slice(0, 37))))
+        }
+    }
+
+    test("Test carbon.column.compressor=snappy") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS binaryTable (
+               |    id INT,
+               |    label boolean,
+               |    name STRING,
+               |    binaryField BINARY,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+               | TBLPROPERTIES('carbon.column.compressor'='snappy')
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | INTO TABLE binaryTable
+               | OPTIONS('header'='false')
+             """.stripMargin)
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
+        val value = sql("SELECT * FROM binaryTable").collect()
+        value.foreach { each =>
+            assert(5 == each.length)
+            assert(1 == each.getAs[Int](0) || 2 == each.getAs[Int](0) || 3 == each.getAs[Int](0))
+            assert(".png".equals(each.getAs(2).toString.substring(1, 5)))
+            assert("89504e470d0a1a0a0000000d4948445200000".equals(new String(each.getAs[Array[Byte]](3).slice(0, 37))))
+        }
+    }
+
+    test("Support filter other column in binary table") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS binaryTable (
+               |    id INT,
+               |    label boolean,
+               |    name STRING,
+               |    binaryField BINARY,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+               | TBLPROPERTIES('carbon.column.compressor'='zstd')
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | INTO TABLE binaryTable
+               | OPTIONS('header'='false')
+             """.stripMargin)
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryTable where id =1"), Seq(Row(1)))
+
+
+        sql("insert into binaryTable values(1,true,'Bob','hello',false)")
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryTable where binaryField =cast('hello' as binary)"), Seq(Row(1)))
+    }
+
+    test("Test create table with buckets unsafe") {
+        CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS binaryTable (
+               |    id INT,
+               |    label boolean,
+               |    name STRING,
+               |    binaryField BINARY,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+               | TBLPROPERTIES('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='binaryField')
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | INTO TABLE binaryTable
+               | OPTIONS('header'='false')
+             """.stripMargin)
+
+        CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
+        val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "binaryTable")
+        if (table != null && table.getBucketingInfo("binarytable") != null) {
+            assert(true)
+        } else {
+            assert(false, "Bucketing info does not exist")
+        }
+    }
+
+    test("insert into for hive and carbon") {
+        sql("DROP TABLE IF EXISTS hiveTable")
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql("DROP TABLE IF EXISTS hiveTable2")
+        sql("DROP TABLE IF EXISTS carbontable2")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hivetable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | row format delimited fields terminated by ','
+             """.stripMargin)
+        sql("insert into hivetable values(1,true,'Bob','binary',false)")
+        sql("insert into hivetable values(2,false,'Xu','test',true)")
+        sql("insert into hivetable select 2,false,'Xu',cast('carbon' as binary),true")
+        val hiveResult = sql("SELECT * FROM hivetable")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+        sql("insert into carbontable values(1,true,'Bob','binary',false)")
+        sql("insert into carbontable values(2,false,'Xu','test',true)")
+        sql("insert into carbontable select 2,false,'Xu',cast('carbon' as binary),true")
+        val carbonResult = sql("SELECT * FROM carbontable")
+        checkAnswer(hiveResult, carbonResult)
+
+        sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable")
+        sql("CREATE TABLE carbontable2 AS SELECT * FROM hivetable")
+        val carbonResult2 = sql("SELECT * FROM carbontable2")
+        val hiveResult2 = sql("SELECT * FROM hivetable2")
+        checkAnswer(hiveResult2, carbonResult2)
+        checkAnswer(carbonResult, carbonResult2)
+        checkAnswer(hiveResult, hiveResult2)
+        assert(3 == carbonResult2.collect().length)
+        assert(3 == hiveResult2.collect().length)
+
+        sql("INSERT INTO hivetable2 SELECT * FROM carbontable")
+        sql("INSERT INTO carbontable2 SELECT * FROM hivetable")
+        val carbonResult3 = sql("SELECT * FROM carbontable2")
+        val hiveResult3 = sql("SELECT * FROM hivetable2")
+        checkAnswer(carbonResult3, hiveResult3)
+        assert(6 == carbonResult3.collect().length)
+        assert(6 == hiveResult3.collect().length)
+    }
+
+    test("Support filter for hive and carbon") {
+        sql("DROP TABLE IF EXISTS hiveTable")
+        sql("DROP TABLE IF EXISTS carbontable")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hivetable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | row format delimited fields terminated by ','
+             """.stripMargin)
+        sql("insert into hivetable values(1,true,'Bob','binary',false)")
+        sql("insert into hivetable values(2,false,'Xu','test',true)")
+        val hiveResult = sql("SELECT * FROM hivetable where binaryField=cast('binary' as binary)")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+        sql("insert into carbontable values(1,true,'Bob','binary',false)")
+        sql("insert into carbontable values(2,false,'Xu','test',true)")
+        val carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)")
+        checkAnswer(hiveResult, carbonResult)
+        assert(1 == carbonResult.collect().length)
+        carbonResult.collect().foreach { each =>
+            if (1 == each.get(0)) {
+                assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (2 == each.get(0)) {
+                assert("test".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        // filter with non string
+        val exception = intercept[Exception] {
+            sql("SELECT * FROM carbontable where binaryField=binary").collect()
+        }
+        assert(exception.getMessage.contains("cannot resolve '`binary`' given input columns"))
+
+        // filter with not equal
+        val hiveResult3 = sql("SELECT * FROM hivetable where binaryField!=cast('binary' as binary)")
+        val carbonResult3 = sql("SELECT * FROM carbontable where binaryField!=cast('binary' as binary)")
+        checkAnswer(hiveResult3, carbonResult3)
+        assert(1 == carbonResult3.collect().length)
+        carbonResult3.collect().foreach { each =>
+            assert(2 == each.get(0))
+            assert("test".equals(new String(each.getAs[Array[Byte]](3))))
+        }
+
+        // filter with in
+        val hiveResult4 = sql("SELECT * FROM hivetable where binaryField in (cast('binary' as binary))")
+        val carbonResult4 = sql("SELECT * FROM carbontable where binaryField in (cast('binary' as binary))")
+        checkAnswer(hiveResult4, carbonResult4)
+        assert(1 == carbonResult4.collect().length)
+        carbonResult4.collect().foreach { each =>
+            assert(1 == each.get(0))
+            assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
+        }
+
+        // filter with not in
+        val hiveResult5 = sql("SELECT * FROM hivetable where binaryField not in (cast('binary' as binary))")
+        val carbonResult5 = sql("SELECT * FROM carbontable where binaryField not in (cast('binary' as binary))")
+        checkAnswer(hiveResult5, carbonResult5)
+        assert(1 == carbonResult5.collect().length)
+        carbonResult5.collect().foreach { each =>
+            assert(2 == each.get(0))
+            assert("test".equals(new String(each.getAs[Array[Byte]](3))))
+        }
+    }
+
+    test("Support update and delete ") {
+        sql("DROP TABLE IF EXISTS carbontable")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+        sql("insert into carbontable values(1,true,'Bob','binary',false)")
+        sql("insert into carbontable values(2,false,'Xu','test',true)")
+        var carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)")
+        assert(1 == carbonResult.collect().length)
+        carbonResult.collect().foreach { each =>
+            if (1 == each.get(0)) {
+                assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (2 == each.get(0)) {
+                assert("test".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        // Update for binary in carbon
+        sql("UPDATE carbontable SET (name) = ('David') WHERE id = 1").show()
+        sql("UPDATE carbontable SET (binaryField) = ('carbon2') WHERE id = 1").show()
+
+        carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)")
+        carbonResult.collect().foreach { each =>
+            if (1 == each.get(0)) {
+                assert("carbon2".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (2 == each.get(0)) {
+                assert("test".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        // test cast string to binary, binary to string
+        val stringValue = sql("SELECT cast(binaryField as string) FROM carbontable WHERE id = 1").collect()
+        stringValue.foreach { each =>
+            assert("carbon2".equals(each.getAs(0)))
+        }
+        val binaryValue = sql("SELECT cast(name as binary) FROM carbontable WHERE id = 1").collect()
+        binaryValue.foreach { each =>
+            assert("David".equals(new String(each.getAs[Array[Byte]](0))))
+        }
+
+        // Test delete
+        sql("DELETE FROM carbontable WHERE id = 2").show()
+
+        carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)")
+        carbonResult.collect().foreach { each =>
+            if (1 == each.get(0)) {
+                assert("carbon2".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+    }
+
+    test("Create table and load data with binary column for hive and carbon, CTAS and insert int hive table select from carbon table") {
+        sql("DROP TABLE IF EXISTS hivetable")
+        sql("DROP TABLE IF EXISTS hivetable2")
+        sql("DROP TABLE IF EXISTS hivetable3")
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hivetable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | row format delimited fields terminated by '|'
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv'
+               | INTO TABLE hivetable
+             """.stripMargin)
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv'
+               | INTO TABLE carbontable
+               | OPTIONS('header'='false','DELIMITER'='|','bad_records_action'='fail')
+             """.stripMargin)
+
+        val hiveResult = sql("SELECT * FROM hivetable")
+        val carbonResult = sql("SELECT * FROM carbontable")
+        checkAnswer(hiveResult, carbonResult)
+        checkAnswer(sql("SELECT COUNT(*) FROM hivetable"), Seq(Row(3)))
+        try {
+            val carbonDF = carbonResult.collect()
+            assert(3 == carbonDF.length)
+            carbonDF.foreach { each =>
+                assert(5 == each.length)
+
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+
+                val value = new String(each.getAs[Array[Byte]](3))
+                assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value)
+                        || "\u0001education\u0002".equals(value) || "".equals(value))
+                assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+            }
+
+            val df = hiveResult.collect()
+            assert(3 == df.length)
+            df.foreach { each =>
+                assert(5 == each.length)
+
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+
+
+                val value = new String(each.getAs[Array[Byte]](3))
+                assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value)
+                        || "\u0001education\u0002".equals(value) || "".equals(value))
+                assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+            }
+
+            sql(
+                s"""
+                   | CREATE TABLE IF NOT EXISTS hivetable2 (
+                   |    id int,
+                   |    label boolean,
+                   |    name string,
+                   |    binaryField binary,
+                   |    autoLabel boolean)
+                   | row format delimited fields terminated by '|'
+             """.stripMargin)
+            sql("insert into hivetable2 select * from carbontable")
+            sql("create table hivetable3 as select * from carbontable")
+            val hiveResult2 = sql("SELECT * FROM hivetable2")
+            val hiveResult3 = sql("SELECT * FROM hivetable3")
+            checkAnswer(hiveResult, hiveResult2)
+            checkAnswer(hiveResult2, hiveResult3)
+        } catch {
+            case e: Exception =>
+                e.printStackTrace()
+                assert(false)
+        }
+    }
+
+    // TODO
+    ignore("Create table and load data with binary column for hive and carbon, CTAS and insert int hive table select from carbon table, for null value") {
+        sql("DROP TABLE IF EXISTS hivetable")
+        sql("DROP TABLE IF EXISTS hivetable2")
+        sql("DROP TABLE IF EXISTS hivetable3")
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hivetable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | row format delimited fields terminated by '|'
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryStringNullData.csv'
+               | INTO TABLE hivetable
+             """.stripMargin)
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryStringNullData.csv'
+               | INTO TABLE carbontable
+               | OPTIONS('header'='false','DELIMITER'='|','bad_records_action'='fail')
+             """.stripMargin)
+
+        val hiveResult = sql("SELECT * FROM hivetable")
+        val carbonResult = sql("SELECT * FROM carbontable")
+        checkAnswer(hiveResult, carbonResult)
+        checkAnswer(sql("SELECT COUNT(*) FROM hivetable"), Seq(Row(4)))
+        try {
+            val carbonDF = carbonResult.collect()
+            assert(4 == carbonDF.length)
+            carbonDF.foreach { each =>
+                assert(5 == each.length)
+
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+
+                val value = new String(each.getAs[Array[Byte]](3))
+                assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value)
+                        || "\u0001education\u0002".equals(value) || "".equals(value))
+                assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+            }
+
+            val df = hiveResult.collect()
+            assert(4 == df.length)
+            df.foreach { each =>
+                assert(5 == each.length)
+
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+
+
+                val value = new String(each.getAs[Array[Byte]](3))
+                assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value)
+                        || "\u0001education\u0002".equals(value) || "".equals(value))
+                assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+            }
+
+            sql(
+                s"""
+                   | CREATE TABLE IF NOT EXISTS hivetable2 (
+                   |    id int,
+                   |    label boolean,
+                   |    name string,
+                   |    binaryField binary,
+                   |    autoLabel boolean)
+                   | row format delimited fields terminated by '|'
+             """.stripMargin)
+            sql("insert into hivetable2 select * from carbontable")
+            sql("create table hivetable3 as select * from carbontable")
+            val hiveResult2 = sql("SELECT * FROM hivetable2")
+            val hiveResult3 = sql("SELECT * FROM hivetable3")
+            checkAnswer(hiveResult, hiveResult2)
+            checkAnswer(hiveResult2, hiveResult3)
+        } catch {
+            case e: Exception =>
+                e.printStackTrace()
+                assert(false)
+        }
+    }
+
+    test("insert into carbon as select from hive after hive load data") {
+        sql("DROP TABLE IF EXISTS hiveTable")
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql("DROP TABLE IF EXISTS hiveTable2")
+        sql("DROP TABLE IF EXISTS carbontable2")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hivetable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | row format delimited fields terminated by '|'
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv'
+               | INTO TABLE hivetable
+             """.stripMargin)
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+        sql("insert into carbontable select * from hivetable")
+        val carbonResult = sql("SELECT * FROM carbontable")
+        val hiveResult = sql("SELECT * FROM hivetable")
+
+        assert(3 == carbonResult.collect().length)
+        assert(3 == hiveResult.collect().length)
+        checkAnswer(hiveResult, carbonResult)
+        carbonResult.collect().foreach { each =>
+            if (2 == each.get(0)) {
+                assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (1 == each.get(0)) {
+                assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (3 == each.get(0)) {
+                assert("\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3)))
+                        || "".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable")
+        sql("CREATE TABLE carbontable2 STORED BY 'carbondata' AS SELECT * FROM hivetable")
+        val carbonResult2 = sql("SELECT * FROM carbontable2")
+        val hiveResult2 = sql("SELECT * FROM hivetable2")
+        checkAnswer(hiveResult2, carbonResult2)
+        checkAnswer(carbonResult, carbonResult2)
+        checkAnswer(hiveResult, hiveResult2)
+        assert(3 == carbonResult2.collect().length)
+        assert(3 == hiveResult2.collect().length)
+
+        sql("INSERT INTO hivetable2 SELECT * FROM carbontable")
+        sql("INSERT INTO carbontable2 SELECT * FROM hivetable")
+        val carbonResult3 = sql("SELECT * FROM carbontable2")
+        val hiveResult3 = sql("SELECT * FROM hivetable2")
+        checkAnswer(carbonResult3, hiveResult3)
+        assert(6 == carbonResult3.collect().length)
+        assert(6 == hiveResult3.collect().length)
+    }
+
+    test("compaction for binary") {
+        CarbonProperties.getInstance()
+                .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+                .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+                    CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+        for (i <- 0 until (3)) {
+            sql(
+                s"""
+                   | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv'
+                   | INTO TABLE carbontable
+                   | OPTIONS('header'='false','DELIMITER'='|')
+             """.stripMargin)
+        }
+        // 3 segments, no compaction
+        var segments = sql("SHOW SEGMENTS FOR TABLE carbontable")
+        var SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+        assert(!SegmentSequenceIds.contains("0.1"))
+        assert(SegmentSequenceIds.length == 3)
+        for (i <- 0 until (3)) {
+            sql("insert into carbontable values(1,true,'Bob','binary',false)")
+        }
+
+        // without auto compaction will not compact
+        segments = sql("SHOW SEGMENTS FOR TABLE carbontable")
+        SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+        assert(!SegmentSequenceIds.contains("0.1"))
+        assert(SegmentSequenceIds.length == 6)
+
+        // minor compaction
+        sql("alter table carbontable compact 'MINOR'")
+        segments = sql("SHOW SEGMENTS FOR TABLE carbontable")
+        SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+        assert(SegmentSequenceIds.contains("0.1"))
+        assert(!SegmentSequenceIds.contains("0.2"))
+        assert(SegmentSequenceIds.length == 7)
+
+        // major compaction
+        sql("alter table carbontable compact 'major'")
+        segments = sql("SHOW SEGMENTS FOR TABLE carbontable")
+        SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+        assert(SegmentSequenceIds.contains("0.2"))
+        assert(SegmentSequenceIds.contains("0.1"))
+        assert(SegmentSequenceIds.length == 8)
+
+        // clean files
+        segments = sql("CLEAN FILES FOR TABLE  carbontable")
+        segments = sql("SHOW SEGMENTS FOR TABLE carbontable")
+        SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+        assert(SegmentSequenceIds.contains("0.2"))
+        assert(!SegmentSequenceIds.contains("0.1"))
+        assert(SegmentSequenceIds.length == 1)
+
+        CarbonProperties.getInstance()
+                .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+        for (i <- 0 until (4)) {
+            sql("insert into carbontable values(1,true,'Bob','binary',false)")
+        }
+        // auto compaction
+        segments = sql("SHOW SEGMENTS FOR TABLE carbontable")
+        SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+        assert(SegmentSequenceIds.contains("6.1"))
+        assert(!SegmentSequenceIds.contains("0.1"))
+        assert(SegmentSequenceIds.contains("0.2"))
+        assert(SegmentSequenceIds.length == 6)
+
+        // check the data
+        val carbonResult = sql("SELECT * FROM carbontable")
+        carbonResult.collect().foreach { each =>
+            if (2 == each.get(0)) {
+                assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (1 == each.get(0)) {
+                assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3)))
+                        || "binary".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (3 == each.get(0)) {
+                assert("\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3)))
+                        || "".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        CarbonProperties.getInstance()
+                .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+                    CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+    }
+
+    test("alter table for binary") {
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql("DROP TABLE IF EXISTS binarytable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+
+
+        sql("insert into carbontable values(1,true,'Bob')")
+
+        sql(
+            s"""
+               | alter table carbontable add columns (
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | TBLPROPERTIES('DEFAULT.VALUE.binaryField'='binary','DEFAULT.VALUE.autoLabel'='true')
+            """.stripMargin)
+
+        var carbonResult = sql("SELECT * FROM carbontable")
+        carbonResult.collect().foreach { each =>
+            if (1 == each.get(0)) {
+                assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv'
+               | INTO TABLE carbontable
+               | OPTIONS('header'='false','DELIMITER'='|')
+             """.stripMargin)
+
+
+        sql("insert into carbontable values(1,true,'Bob','binary',false)")
+
+        carbonResult = sql("SELECT * FROM carbontable")
+        carbonResult.collect().foreach { each =>
+            if (2 == each.get(0)) {
+                assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (1 == each.get(0)) {
+                assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3)))
+                        || "binary".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (3 == each.get(0)) {
+                assert("\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3)))
+                        || "".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        var result = sql("show tables")
+        result.collect().foreach { each =>
+            assert(!"binarytable".equalsIgnoreCase(each.getAs[String](1)))
+        }
+
+        // rename
+        sql(
+            s"""
+               | alter table carbontable RENAME TO binarytable
+            """.stripMargin)
+        result = sql("show tables")
+        assert(result.collect().exists { each =>
+            "binarytable".equalsIgnoreCase(each.getAs[String](1))
+        })
+
+        // add columns after rename
+        sql(
+            s"""
+               | alter table binarytable add columns (
+               |    binaryField2 binary,
+               |    autoLabel2 boolean)
+               | TBLPROPERTIES('DEFAULT.VALUE.binaryField2'='binary','DEFAULT.VALUE.autoLabel2'='true')
+            """.stripMargin)
+        sql("insert into binarytable values(1,true,'Bob','binary',false,'binary',false)")
+
+        carbonResult = sql("SELECT * FROM binarytable")
+        carbonResult.collect().foreach { each =>
+            if (2 == each.get(0)) {
+                assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (1 == each.get(0)) {
+                assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3)))
+                        || "binary".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (3 == each.get(0)) {
+                assert(null == each.getAs[Array[Byte]](3)
+                        || "\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        // drop columns after rename
+        sql(
+            s"""
+               | alter table binarytable drop columns (
+               |    binaryField2,
+               |    autoLabel2)
+            """.stripMargin)
+        sql("insert into binarytable values(1,true,'Bob','binary',false)")
+
+        carbonResult = sql("SELECT * FROM binarytable")
+        carbonResult.collect().foreach { each =>
+            if (2 == each.get(0)) {
+                assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (1 == each.get(0)) {
+                assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3)))
+                        || "binary".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (3 == each.get(0)) {
+                assert("\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3)))
+                        || "".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        // change data type
+        val e = intercept[Exception] {
+            sql(s"alter table binarytable CHANGE binaryField binaryField3 STRING ")
+        }
+        assert(e.getMessage.contains("operation failed for default.binarytable: Alter table data type change operation failed: Given column binaryfield with data type BINARY cannot be modified. Only Int and Decimal data types are allowed for modification"))
+    }
+
+    ignore("Create table and load data with binary column for hive: test encode without \u0001") {
+        sql("DROP TABLE IF EXISTS hivetable")
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hivetable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | row format delimited fields terminated by '|'
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata2.csv'
+               | INTO TABLE hivetable
+             """.stripMargin)
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata2.csv'
+               | INTO TABLE carbontable
+               | OPTIONS('header'='false','DELIMITER'='|')
+             """.stripMargin)
+
+        val hiveResult = sql("SELECT * FROM hivetable")
+        val carbonResult = sql("SELECT * FROM carbontable")
+        // TODO
+        checkAnswer(hiveResult, carbonResult)
+
+        checkAnswer(sql("SELECT COUNT(*) FROM hivetable"), Seq(Row(3)))
+        try {
+            val carbonDF = carbonResult.collect()
+            assert(3 == carbonDF.length)
+            carbonDF.foreach { each =>
+                assert(5 == each.length)
+
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+
+                val value = new String(each.getAs[Array[Byte]](3))
+                // assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value) || "\u0001education\u0002".equals(value))
+                assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+            }
+
+            val df = hiveResult.collect()
+            assert(3 == df.length)
+            df.foreach { each =>
+                assert(5 == each.length)
+
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+
+
+                val value = new String(each.getAs[Array[Byte]](3))
+                // assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value) || "\u0001education\u0002".equals(value))
+                assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+            }
+        } catch {
+            case e: Exception =>
+                e.printStackTrace()
+                assert(false)
+        }
+    }
+
+    override def afterAll: Unit = {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql("DROP TABLE IF EXISTS hiveTable")
+    }
+}
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 001964a..92a49dd 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -17,8 +17,6 @@
 package org.apache.carbondata.spark.testsuite.allqueries
 
 
-import java.util
-
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForBinary.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForBinary.scala
new file mode 100644
index 0000000..28e3f32
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForBinary.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.sdk.util.BinaryUtil
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.util.SparkUtil
+import org.scalatest.BeforeAndAfterAll
+
+
+class TestNonTransactionalCarbonTableForBinary extends QueryTest with BeforeAndAfterAll {
+
+    var writerPath = new File(this.getClass.getResource("/").getPath
+            + "../../target/SparkCarbonFileFormat/WriterOutput/")
+            .getCanonicalPath
+    var outputPath = writerPath + 2
+    //getCanonicalPath gives path with \, but the code expects /.
+    writerPath = writerPath.replace("\\", "/")
+
+    var sdkPath = new File(this.getClass.getResource("/").getPath + "../../../../store/sdk/")
+            .getCanonicalPath
+
+    def buildTestBinaryData(): Any = {
+        FileUtils.deleteDirectory(new File(writerPath))
+
+        val sourceImageFolder = sdkPath + "/src/test/resources/image/flowers"
+        val sufAnnotation = ".txt"
+        BinaryUtil.binaryToCarbon(sourceImageFolder, writerPath, sufAnnotation, ".jpg")
+    }
+
+    def cleanTestData() = {
+        FileUtils.deleteDirectory(new File(writerPath))
+        FileUtils.deleteDirectory(new File(outputPath))
+    }
+
+    override def beforeAll(): Unit = {
+        CarbonProperties.getInstance()
+                .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+                    CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+        buildTestBinaryData()
+
+        FileUtils.deleteDirectory(new File(outputPath))
+        sql("DROP TABLE IF EXISTS sdkOutputTable")
+    }
+
+    override def afterAll(): Unit = {
+        cleanTestData()
+        sql("DROP TABLE IF EXISTS sdkOutputTable")
+    }
+
+    test("test read image carbon with external table, generate by sdk, CTAS") {
+        sql("DROP TABLE IF EXISTS binaryCarbon")
+        sql("DROP TABLE IF EXISTS binaryCarbon3")
+        if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+            sql(s"CREATE EXTERNAL TABLE binaryCarbon STORED BY 'carbondata' LOCATION '$writerPath'")
+            sql(s"CREATE TABLE binaryCarbon3 STORED BY 'carbondata' LOCATION '$outputPath'" + " AS SELECT * FROM binaryCarbon")
+
+            checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon"),
+                Seq(Row(3)))
+            checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon3"),
+                Seq(Row(3)))
+
+            val result = sql("desc formatted binaryCarbon").collect()
+            var flag = false
+            result.foreach { each =>
+                if ("binary".equals(each.get(1))) {
+                    flag = true
+                }
+            }
+            assert(flag)
+            val value = sql("SELECT * FROM binaryCarbon").collect()
+            assert(3 == value.length)
+            value.foreach { each =>
+                val byteArray = each.getAs[Array[Byte]](2)
+                assert(new String(byteArray).startsWith("����\u0000\u0010JFIF"))
+            }
+
+            val value3 = sql("SELECT * FROM binaryCarbon3").collect()
+            assert(3 == value3.length)
+            value3.foreach { each =>
+                val byteArray = each.getAs[Array[Byte]](2)
+                assert(new String(byteArray).startsWith("����\u0000\u0010JFIF"))
+            }
+            sql("DROP TABLE IF EXISTS binaryCarbon")
+            sql("DROP TABLE IF EXISTS binaryCarbon3")
+        }
+    }
+
+    test("Don't support insert into partition table") {
+        sql("DROP TABLE IF EXISTS binaryCarbon")
+        sql("DROP TABLE IF EXISTS binaryCarbon2")
+        sql("DROP TABLE IF EXISTS binaryCarbon3")
+        sql("DROP TABLE IF EXISTS binaryCarbon4")
+        if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+            sql(s"CREATE TABLE binaryCarbon USING CARBON LOCATION '$writerPath'")
+            sql(
+                s"""
+                   | CREATE TABLE binaryCarbon2(
+                   |    binaryId INT,
+                   |    binaryName STRING,
+                   |    binary BINARY,
+                   |    labelName STRING,
+                   |    labelContent STRING
+                   |) STORED BY 'carbondata'""".stripMargin)
+            sql(
+                s"""
+                   | CREATE TABLE binaryCarbon3(
+                   |    binaryId INT,
+                   |    binaryName STRING,
+                   |    labelName STRING,
+                   |    labelContent STRING
+                   |)  partitioned by ( binary BINARY) STORED BY 'carbondata'""".stripMargin)
+
+            sql("insert into binaryCarbon2 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
+            val carbonResult2 = sql("SELECT * FROM binaryCarbon2")
+
+            sql("create table binaryCarbon4 STORED BY 'carbondata' select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
+            val carbonResult4 = sql("SELECT * FROM binaryCarbon4")
+            val carbonResult = sql("SELECT * FROM binaryCarbon")
+
+            assert(3 == carbonResult.collect().length)
+            assert(1 == carbonResult4.collect().length)
+            assert(1 == carbonResult2.collect().length)
+            checkAnswer(carbonResult4, carbonResult2)
+
+            try {
+                sql("insert into binaryCarbon3 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
+                assert(false)
+            } catch {
+                case e: Exception =>
+                    e.printStackTrace()
+                    assert(true)
+            }
+            sql("DROP TABLE IF EXISTS binaryCarbon")
+            sql("DROP TABLE IF EXISTS binaryCarbon2")
+            sql("DROP TABLE IF EXISTS binaryCarbon3")
+            sql("DROP TABLE IF EXISTS binaryCarbon4")
+        }
+    }
+}
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
index 862c72a..d485235 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
@@ -93,12 +93,14 @@ class TestNonTransactionalCarbonTableJsonWriter extends QueryTest with BeforeAnd
   private def writeCarbonFileFromJsonRowInput(jsonRow: String,
       carbonSchema: Schema) = {
     try {
-      var options: util.Map[String, String] = Map("bAd_RECords_action" -> "FAIL", "quotechar" -> "\"").asJava
+      val options: util.Map[String, String] = Map("bAd_RECords_action" -> "FAIL", "quotechar" -> "\"").asJava
       val writer = CarbonWriter.builder
-        .outputPath(writerPath)
-        .uniqueIdentifier(System.currentTimeMillis())
-        .withLoadOptions(options)
-        .withJsonInput(carbonSchema).writtenBy("TestNonTransactionalCarbonTableJsonWriter").build()
+              .outputPath(writerPath)
+              .uniqueIdentifier(System.currentTimeMillis())
+              .withLoadOptions(options)
+              .withJsonInput(carbonSchema)
+              .writtenBy("TestNonTransactionalCarbonTableJsonWriter")
+              .build()
       writer.write(jsonRow)
       writer.close()
     }
@@ -347,4 +349,29 @@ class TestNonTransactionalCarbonTableJsonWriter extends QueryTest with BeforeAnd
     assert(new File(writerPath).listFiles().length > 0)
     FileUtils.deleteDirectory(new File(writerPath))
   }
+
+  // test : Schema length is lesser than array length
+  test("Read Json for binary") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    var dataPath: String = null
+    dataPath = resourcesPath + "/jsonFiles/data/allPrimitiveType.json"
+    val fields = new Array[Field](3)
+    fields(0) = new Field("stringField", DataTypes.STRING)
+    fields(1) = new Field("intField", DataTypes.INT)
+    fields(2) = new Field("binaryField", DataTypes.BINARY)
+    val jsonRow = readFromFile(dataPath)
+    writeCarbonFileFromJsonRowInput(jsonRow, new Schema(fields))
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    sql("select * from sdkOutputTable").show()
+    checkAnswer(sql("select * from sdkOutputTable"),
+      Seq(Row("ajantha\"bhat\"", 26, "abc".getBytes())))
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).listFiles().length > 0)
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
 }
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
index 9689f3d..60952e4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.spark.testsuite.standardpartition
 
-import java.io.File
-
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.test.util.QueryTest
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 4ec66a7..4b29e77 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -648,12 +648,21 @@ object CarbonScalaUtil {
                      !x.dataType.get.equalsIgnoreCase("STRUCT") &&
                      !x.dataType.get.equalsIgnoreCase("MAP") &&
                      !x.dataType.get.equalsIgnoreCase("ARRAY"))) {
-        val errorMsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE column: " +
-                       dictColm.trim +
-                       " is not a string/complex/varchar datatype column. LOCAL_DICTIONARY_COLUMN" +
-                       " should be no dictionary string/complex/varchar datatype column." +
-                       "Please check the DDL."
-        throw new MalformedCarbonCommandException(errorMsg)
+        if (fields.exists(x => x.column.equalsIgnoreCase(dictColm)
+                && x.dataType.get.equalsIgnoreCase("BINARY"))
+                && tableProperties.get("local_dictionary_exclude").nonEmpty
+                && tableProperties.get("local_dictionary_exclude").get.contains(dictColm)
+                && (tableProperties.get("local_dictionary_include").isEmpty
+                || (!tableProperties.get("local_dictionary_include").get.contains(dictColm)))) {
+          LOGGER.info("Local_dictionary_exclude supports binary")
+        } else {
+          val errorMsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE column: " +
+                  dictColm.trim +
+                  " is not a string/complex/varchar datatype column. LOCAL_DICTIONARY_COLUMN" +
+                  " should be no dictionary string/complex/varchar datatype column." +
+                  "Please check the DDL."
+          throw new MalformedCarbonCommandException(errorMsg)
+        }
       }
     }
 
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index b0a236f..8050e5f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -44,6 +44,7 @@ object DataTypeConverterUtil {
       case FIXED_DECIMAL(_, _) => DataTypes.createDefaultDecimalType
       case "timestamp" => DataTypes.TIMESTAMP
       case "date" => DataTypes.DATE
+      case "binary" => DataTypes.BINARY
       case "array" => DataTypes.createDefaultArrayType
       case "struct" => DataTypes.createDefaultStructType
       case "map" => DataTypes.createDefaultMapType
@@ -68,6 +69,7 @@ object DataTypeConverterUtil {
       case FIXED_DECIMALTYPE(_, _) => DataTypes.createDefaultDecimalType
       case "timestamptype" => DataTypes.TIMESTAMP
       case "datetype" => DataTypes.DATE
+      case "binarytype" => DataTypes.BINARY
       case others =>
         if (others != null && others.startsWith("arraytype")) {
           DataTypes.createDefaultArrayType()
@@ -105,6 +107,7 @@ object DataTypeConverterUtil {
       case "decimal" => ThriftDataType.DECIMAL
       case "date" => ThriftDataType.DATE
       case "timestamp" => ThriftDataType.TIMESTAMP
+      case "binary" => ThriftDataType.BINARY
       case "array" => ThriftDataType.ARRAY
       case "struct" => ThriftDataType.STRUCT
       case "map" => ThriftDataType.MAP
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 3cb068f..3e80ea6 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -172,6 +172,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val BOOLEAN = carbonKeyWord("BOOLEAN")
   protected val LONG = carbonKeyWord("LONG")
   protected val BIGINT = carbonKeyWord("BIGINT")
+  protected val BINARY = carbonKeyWord("BINARY")
   protected val ARRAY = carbonKeyWord("ARRAY")
   protected val STRUCT = carbonKeyWord("STRUCT")
   protected val MAP = carbonKeyWord("MAP")
@@ -421,6 +422,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
               s"$column is a complex type column and complex type is not allowed for " +
               s"the option(s): ${ CarbonCommonConstants.COLUMN_META_CACHE }"
             throw new MalformedCarbonCommandException(errorMessage)
+          } else if (dimFieldToBeCached.nonEmpty && DataTypes.BINARY.getName
+                  .equalsIgnoreCase(dimFieldToBeCached(0).dataType.get)) {
+            val errorMessage =
+              s"$column is a binary data type column and binary data type is not allowed for " +
+                      s"the option(s): ${CarbonCommonConstants.COLUMN_META_CACHE}"
+            throw new MalformedCarbonCommandException(errorMessage)
           }
         }
       }
@@ -802,6 +809,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         val errorMsg = "range_column: " + rangeColumn +
                        " does not exist in table. Please check the create table statement."
         throw new MalformedCarbonCommandException(errorMsg)
+      } else if (DataTypes.BINARY.getName.equalsIgnoreCase(rangeField.get.dataType.get)) {
+        throw new MalformedCarbonCommandException(
+          "RANGE_COLUMN doesn't support binary data type:" + rangeColumn)
       } else {
         tableProperties.put(CarbonCommonConstants.RANGE_COLUMN, rangeField.get.column)
       }
@@ -877,8 +887,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         dimFields += field
       } else if (isDetectAsDimentionDataType(field.dataType.get)) {
         dimFields += field
-        // consider all String cols as noDicitonaryDims by default
-        if (DataTypes.STRING.getName.equalsIgnoreCase(field.dataType.get)) {
+        // consider all String and binary cols as noDicitonaryDims by default
+        if ((DataTypes.STRING.getName.equalsIgnoreCase(field.dataType.get)) ||
+            (DataTypes.BINARY.getName.equalsIgnoreCase(field.dataType.get))) {
           noDictionaryDims :+= field.column
         }
       } else if (sortKeyDimsTmp.exists(x => x.equalsIgnoreCase(field.column)) &&
@@ -943,7 +954,14 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
    * @param dimensionDatatype
    */
   def isDetectAsDimentionDataType(dimensionDatatype: String): Boolean = {
-    val dimensionType = Array("string", "array", "struct", "map", "timestamp", "date", "char")
+    val dimensionType = Array("string",
+      "array",
+      "struct",
+      "map",
+      "timestamp",
+      "date",
+      "char",
+      "binary")
     dimensionType.exists(x => dimensionDatatype.toLowerCase.contains(x))
   }
 
@@ -959,7 +977,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
    * detects whether datatype is part of sort_column
    */
   private def isDataTypeSupportedForSortColumn(columnDataType: String): Boolean = {
-    val dataTypes = Array("array", "struct", "map", "double", "float", "decimal")
+    val dataTypes = Array("array", "struct", "map", "double", "float", "decimal", "binary")
     dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
   }
 
@@ -967,7 +985,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
    * detects whether datatype is part of dictionary_exclude
    */
   def isDataTypeSupportedForDictionary_Exclude(columnDataType: String): Boolean = {
-    val dataTypes = Array("string", "timestamp", "int", "long", "bigint", "struct", "array", "map")
+    val dataTypes =
+      Array("string", "timestamp", "int", "long", "bigint", "struct", "array", "map", "binary")
     dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
   }
 
@@ -1296,6 +1315,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     INT ^^^ "int" | DOUBLE ^^^ "double" | FLOAT ^^^ "double" | decimalType |
     DATE ^^^ "date" | charType
 
+  protected lazy val miscType = BINARY ^^^ "binary"
+
   /**
    * Matching the char data type and returning the same.
    */
@@ -1318,7 +1339,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   }
 
   protected lazy val nestedType: Parser[Field] = structFieldType | arrayFieldType | mapFieldType |
-                                                 primitiveFieldType
+                                                 primitiveFieldType | miscFieldType
 
   lazy val anyFieldDef: Parser[Field] =
     (ident | stringLit) ~ (":".? ~> nestedType) ~ (IN ~> (ident | stringLit)).? ^^ {
@@ -1344,6 +1365,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         Field("unknown", Some(e1), Some("unknown"), Some(null))
     }
 
+  protected lazy val miscFieldType: Parser[Field] =
+    miscType ^^ {
+      case e1 =>
+        Field("unknown", Some(e1), Some("unknown"), Some(null))
+    }
+
   protected lazy val arrayFieldType: Parser[Field] =
     ((ARRAY ^^^ "array") ~> "<" ~> nestedType <~ ">") ^^ {
       case e1 =>
diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml
index c39aad8..3052f9b 100644
--- a/integration/spark-datasource/pom.xml
+++ b/integration/spark-datasource/pom.xml
@@ -85,7 +85,6 @@
   </dependencies>
 
   <build>
-    <testSourceDirectory>src/test/scala</testSourceDirectory>
     <resources>
       <resource>
         <directory>src/resources</directory>
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
index 99fac45..41b378d 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
@@ -132,6 +132,8 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
       return DataTypes.TimestampType;
     } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
       return DataTypes.DateType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BINARY) {
+      return DataTypes.BinaryType;
     } else {
       return null;
     }
@@ -167,7 +169,8 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
         if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN
             || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT
             || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT
-            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BINARY) {
           fields[i] = new StructField(carbonColumn.getColName(),
               convertCarbonToSparkDataType(dataType), true, null);
         } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 34e7c23..0fb8b4b 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -265,7 +265,7 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
       DataType dataType = msr.getMeasure().getDataType();
       if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT
           || dataType == DataTypes.LONG || dataType == DataTypes.FLOAT
-          || dataType == DataTypes.BYTE) {
+          || dataType == DataTypes.BYTE || dataType == DataTypes.BINARY) {
         fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
             CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
             null);
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
index 71dba3d..8bdb512 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -63,6 +63,7 @@ object CarbonSparkDataSourceUtil {
         case CarbonDataTypes.FLOAT => FloatType
         case CarbonDataTypes.BOOLEAN => BooleanType
         case CarbonDataTypes.TIMESTAMP => TimestampType
+        case CarbonDataTypes.BINARY => BinaryType
         case CarbonDataTypes.DATE => DateType
         case CarbonDataTypes.VARCHAR => StringType
       }
@@ -84,6 +85,7 @@ object CarbonSparkDataSourceUtil {
       case DateType => CarbonDataTypes.DATE
       case BooleanType => CarbonDataTypes.BOOLEAN
       case TimestampType => CarbonDataTypes.TIMESTAMP
+      case BinaryType => CarbonDataTypes.BINARY
       case ArrayType(elementType, _) =>
         CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
       case StructType(fields) =>
@@ -195,7 +197,13 @@ object CarbonSparkDataSourceUtil {
       } else {
         dataTypeOfAttribute
       }
-      new CarbonLiteralExpression(value, dataType)
+      val dataValue = if (dataTypeOfAttribute.equals(CarbonDataTypes.BINARY)
+              && Option(value).isDefined) {
+        new String(value.asInstanceOf[Array[Byte]])
+      } else {
+        value
+      }
+      new CarbonLiteralExpression(dataValue, dataType)
     }
 
     createFilter(predicate)
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 6819a4c..5f62362 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -219,6 +219,8 @@ class SparkCarbonFileFormat extends FileFormat
           fieldTypes(i).dataType match {
             case StringType =>
               data(i) = row.getString(i)
+            case BinaryType =>
+              data(i) = row.getBinary(i)
             case d: DecimalType =>
               data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
             case s: StructType =>
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
index cb07e04..2ea3d43 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
@@ -82,6 +82,7 @@ private[spark] object SparkTypeConverter {
         case CarbonDataTypes.DOUBLE => DoubleType
         case CarbonDataTypes.FLOAT => FloatType
         case CarbonDataTypes.BYTE => ByteType
+        case CarbonDataTypes.BINARY => BinaryType
         case CarbonDataTypes.BOOLEAN => BooleanType
         case CarbonDataTypes.TIMESTAMP => TimestampType
         case CarbonDataTypes.DATE => DateType
diff --git a/integration/spark-datasource/src/test/java/org/apache/carbondata/sdk/util/BinaryUtil.java b/integration/spark-datasource/src/test/java/org/apache/carbondata/sdk/util/BinaryUtil.java
new file mode 100644
index 0000000..2de5df3
--- /dev/null
+++ b/integration/spark-datasource/src/test/java/org/apache/carbondata/sdk/util/BinaryUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.util;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.sdk.file.CarbonWriter;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.sdk.file.Schema;
+
+import java.io.*;
+
+import static org.apache.carbondata.sdk.file.utils.SDKUtil.listFiles;
+
+public class BinaryUtil {
+  public static void binaryToCarbon(String sourceImageFolder, String outputPath,
+                                    String sufAnnotation, final String sufImage) throws Exception {
+    Field[] fields = new Field[5];
+    fields[0] = new Field("binaryId", DataTypes.INT);
+    fields[1] = new Field("binaryName", DataTypes.STRING);
+    fields[2] = new Field("binary", DataTypes.BINARY);
+    fields[3] = new Field("labelName", DataTypes.STRING);
+    fields[4] = new Field("labelContent", DataTypes.STRING);
+    CarbonWriter writer = CarbonWriter
+        .builder()
+        .outputPath(outputPath)
+        .withCsvInput(new Schema(fields))
+        .withBlockSize(256)
+        .writtenBy("binaryExample")
+        .withPageSizeInMb(1)
+        .build();
+    binaryToCarbon(sourceImageFolder, writer, sufAnnotation, sufImage);
+  }
+
+  public static boolean binaryToCarbon(String sourceImageFolder, CarbonWriter writer,
+      String sufAnnotation, final String sufImage) throws Exception {
+    int num = 1;
+
+    byte[] originBinary = null;
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+
+      Object[] files = listFiles(sourceImageFolder, sufImage).toArray();
+
+      if (null != files) {
+        for (int i = 0; i < files.length; i++) {
+          // read image and encode to Hex
+          BufferedInputStream bis = new BufferedInputStream(
+              new FileInputStream(new File((String) files[i])));
+          originBinary = new byte[bis.available()];
+          while ((bis.read(originBinary)) != -1) {
+          }
+
+          String labelFileName = ((String) files[i]).split(sufImage)[0] + sufAnnotation;
+          BufferedInputStream txtBis = new BufferedInputStream(new FileInputStream(labelFileName));
+          String labelValue = null;
+          byte[] labelBinary = null;
+          labelBinary = new byte[txtBis.available()];
+          while ((txtBis.read(labelBinary)) != -1) {
+            labelValue = new String(labelBinary, "UTF-8");
+          }
+          // write data
+          writer.write(new Object[]{i, (String) files[i], originBinary,
+              labelFileName, labelValue});
+          bis.close();
+          txtBis.close();
+        }
+      }
+      writer.close();
+    }
+    return true;
+  }
+
+}
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
new file mode 100644
index 0000000..064efc2
--- /dev/null
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.datasource
+
+import java.io.File
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.sdk.util.BinaryUtil
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.carbondata.datasource.TestUtil._
+import org.apache.spark.util.SparkUtil
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
+
+    var writerPath = new File(this.getClass.getResource("/").getPath
+            + "../../target/SparkCarbonFileFormat/WriterOutput/")
+            .getCanonicalPath
+    var resourcesPath = new File(this.getClass.getResource("/").getPath
+            + "../../../spark-common-test/src/test/resources/")
+            .getCanonicalPath
+    var outputPath = writerPath + 2
+    //getCanonicalPath gives path with \, but the code expects /.
+    writerPath = writerPath.replace("\\", "/")
+
+    var sdkPath = new File(this.getClass.getResource("/").getPath + "../../../../store/sdk/")
+            .getCanonicalPath
+
+    def buildTestBinaryData(): Any = {
+        FileUtils.deleteDirectory(new File(writerPath))
+        FileUtils.deleteDirectory(new File(outputPath))
+
+        val sourceImageFolder = sdkPath + "/src/test/resources/image/flowers"
+        val sufAnnotation = ".txt"
+        BinaryUtil.binaryToCarbon(sourceImageFolder, writerPath, sufAnnotation, ".jpg")
+    }
+
+    def cleanTestData() = {
+        FileUtils.deleteDirectory(new File(writerPath))
+        FileUtils.deleteDirectory(new File(outputPath))
+    }
+
+    import spark._
+
+    override def beforeAll(): Unit = {
+        CarbonProperties.getInstance()
+                .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+                    CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+        buildTestBinaryData()
+
+        FileUtils.deleteDirectory(new File(outputPath))
+        sql("DROP TABLE IF EXISTS sdkOutputTable")
+    }
+
+    override def afterAll(): Unit = {
+        cleanTestData()
+        sql("DROP TABLE IF EXISTS sdkOutputTable")
+    }
+
+    test("Test direct sql read carbon") {
+        assert(new File(writerPath).exists())
+        checkAnswer(
+            sql(s"SELECT COUNT(*) FROM carbon.`$writerPath`"),
+            Seq(Row(3)))
+    }
+
+    test("Test read image carbon with spark carbon file format, generate by sdk, CTAS") {
+        sql("DROP TABLE IF EXISTS binaryCarbon")
+        sql("DROP TABLE IF EXISTS binaryCarbon3")
+        FileUtils.deleteDirectory(new File(outputPath))
+        if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+            sql(s"CREATE TABLE binaryCarbon USING CARBON OPTIONS(PATH '$writerPath')")
+            sql(s"CREATE TABLE binaryCarbon3 USING CARBON OPTIONS(PATH '$outputPath')" + " AS SELECT * FROM binaryCarbon")
+        } else {
+            sql(s"CREATE TABLE binaryCarbon USING CARBON LOCATION '$writerPath'")
+            sql(s"CREATE TABLE binaryCarbon3 USING CARBON LOCATION '$outputPath'" + " AS SELECT * FROM binaryCarbon")
+        }
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon"),
+            Seq(Row(3)))
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon3"),
+            Seq(Row(3)))
+        sql("DROP TABLE IF EXISTS binaryCarbon")
+        sql("DROP TABLE IF EXISTS binaryCarbon3")
+        FileUtils.deleteDirectory(new File(outputPath))
+    }
+
+    test("Don't support sort_columns") {
+        import spark._
+        sql("DROP TABLE IF EXISTS binaryTable")
+        val exception = intercept[Exception] {
+            sql(
+                s"""
+                   | CREATE TABLE binaryTable (
+                   |    id DOUBLE,
+                   |    label BOOLEAN,
+                   |    name STRING,
+                   |    image BINARY,
+                   |    autoLabel BOOLEAN)
+                   | using carbon
+                   | options('SORT_COLUMNS'='image')
+       """.stripMargin)
+            sql("SELECT COUNT(*) FROM binaryTable").show()
+        }
+        assert(exception.getCause.getMessage.contains("sort columns not supported for array, struct, map, double, float, decimal, varchar, binary"))
+    }
+
+    test("Don't support long_string_columns for binary") {
+        import spark._
+        sql("DROP TABLE IF EXISTS binaryTable")
+        val exception = intercept[Exception] {
+            sql(
+                s"""
+                   | CREATE TABLE binaryTable (
+                   |    id DOUBLE,
+                   |    label BOOLEAN,
+                   |    name STRING,
+                   |    image BINARY,
+                   |    autoLabel BOOLEAN)
+                   | using carbon
+                   | options('long_string_columns'='image')
+       """.stripMargin)
+            sql("SELECT COUNT(*) FROM binaryTable").show()
+        }
+        assert(exception.getCause.getMessage.contains("long string column : image is not supported for data type: BINARY"))
+    }
+
+    test("Don't support insert into partition table") {
+        if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+            sql("DROP TABLE IF EXISTS binaryCarbon")
+            sql("DROP TABLE IF EXISTS binaryCarbon2")
+            sql("DROP TABLE IF EXISTS binaryCarbon3")
+            sql("DROP TABLE IF EXISTS binaryCarbon4")
+            sql(s"CREATE TABLE binaryCarbon USING CARBON LOCATION '$writerPath'")
+            sql(
+                s"""
+                   | CREATE TABLE binaryCarbon2(
+                   |    binaryId INT,
+                   |    binaryName STRING,
+                   |    binary BINARY,
+                   |    labelName STRING,
+                   |    labelContent STRING
+                   |) USING CARBON""".stripMargin)
+            sql(
+                s"""
+                   | CREATE TABLE binaryCarbon3(
+                   |    binaryId INT,
+                   |    binaryName STRING,
+                   |    binary BINARY,
+                   |    labelName STRING,
+                   |    labelContent STRING
+                   |) USING CARBON partitioned by (binary) """.stripMargin)
+            sql("select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0").show()
+
+            sql("insert into binaryCarbon2 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
+            val carbonResult2 = sql("SELECT * FROM binaryCarbon2")
+
+            sql("create table binaryCarbon4 using carbon select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
+            val carbonResult4 = sql("SELECT * FROM binaryCarbon4")
+            val carbonResult = sql("SELECT * FROM binaryCarbon")
+
+            assert(3 == carbonResult.collect().length)
+            assert(1 == carbonResult4.collect().length)
+            assert(1 == carbonResult2.collect().length)
+            checkAnswer(carbonResult4, carbonResult2)
+
+            try {
+                sql("insert into binaryCarbon3 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
+                assert(false)
+            } catch {
+                case e: Exception =>
+                    e.printStackTrace()
+                    assert(true)
+            }
+            sql("DROP TABLE IF EXISTS binaryCarbon")
+            sql("DROP TABLE IF EXISTS binaryCarbon2")
+            sql("DROP TABLE IF EXISTS binaryCarbon3")
+            sql("DROP TABLE IF EXISTS binaryCarbon4")
+        }
+    }
+
+    test("Test unsafe as false") {
+        CarbonProperties.getInstance()
+                .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
+        FileUtils.deleteDirectory(new File(outputPath))
+        sql("DROP TABLE IF EXISTS binaryCarbon")
+        sql("DROP TABLE IF EXISTS binaryCarbon3")
+        if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+            sql(s"CREATE TABLE binaryCarbon USING CARBON OPTIONS(PATH '$writerPath')")
+            sql(s"CREATE TABLE binaryCarbon3 USING CARBON OPTIONS(PATH '$outputPath')" + " AS SELECT * FROM binaryCarbon")
+        } else {
+            sql(s"CREATE TABLE binaryCarbon USING CARBON LOCATION '$writerPath'")
+            sql(s"CREATE TABLE binaryCarbon3 USING CARBON LOCATION '$outputPath'" + " AS SELECT * FROM binaryCarbon")
+        }
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon"),
+            Seq(Row(3)))
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon3"),
+            Seq(Row(3)))
+        sql("DROP TABLE IF EXISTS binaryCarbon")
+        sql("DROP TABLE IF EXISTS binaryCarbon3")
+
+        FileUtils.deleteDirectory(new File(outputPath))
+        CarbonProperties.getInstance()
+                .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+                    CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+    }
+
+    test("insert into for hive and carbon, CTAS") {
+        sql("DROP TABLE IF EXISTS hiveTable")
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql("DROP TABLE IF EXISTS hiveTable2")
+        sql("DROP TABLE IF EXISTS carbontable2")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hivetable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    image binary,
+               |    autoLabel boolean)
+               | row format delimited fields terminated by ','
+             """.stripMargin)
+        sql("insert into hivetable values(1,true,'Bob','binary',false)")
+        sql("insert into hivetable values(2,false,'Xu','test',true)")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    image binary,
+               |    autoLabel boolean)
+               | using carbon
+             """.stripMargin)
+        sql("insert into carbontable values(1,true,'Bob','binary',false)")
+        sql("insert into carbontable values(2,false,'Xu','test',true)")
+        val carbonResult = sql("SELECT * FROM carbontable")
+        val hiveResult = sql("SELECT * FROM hivetable")
+
+        assert(2 == carbonResult.collect().length)
+        assert(2 == hiveResult.collect().length)
+        checkAnswer(hiveResult, carbonResult)
+        carbonResult.collect().foreach { each =>
+            if (1 == each.get(0)) {
+                assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (2 == each.get(0)) {
+                assert("test".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable")
+        sql("CREATE TABLE carbontable2  USING CARBON AS SELECT * FROM hivetable")
+        val carbonResult2 = sql("SELECT * FROM carbontable2")
+        val hiveResult2 = sql("SELECT * FROM hivetable2")
+        checkAnswer(hiveResult2, carbonResult2)
+        checkAnswer(carbonResult, carbonResult2)
+        checkAnswer(hiveResult, hiveResult2)
+        assert(2 == carbonResult2.collect().length)
+        assert(2 == hiveResult2.collect().length)
+
+        sql("INSERT INTO hivetable2 SELECT * FROM carbontable")
+        sql("INSERT INTO carbontable2 SELECT * FROM hivetable")
+        val carbonResult3 = sql("SELECT * FROM carbontable2")
+        val hiveResult3 = sql("SELECT * FROM hivetable2")
+        checkAnswer(carbonResult3, hiveResult3)
+        assert(4 == carbonResult3.collect().length)
+        assert(4 == hiveResult3.collect().length)
+    }
+
+    test("insert into for parquet and carbon, CTAS") {
+        sql("DROP TABLE IF EXISTS parquetTable")
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql("DROP TABLE IF EXISTS parquetTable2")
+        sql("DROP TABLE IF EXISTS carbontable2")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS parquettable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    image binary,
+               |    autoLabel boolean)
+               | using parquet
+             """.stripMargin)
+        sql("insert into parquettable values(1,true,'Bob','binary',false)")
+        sql("insert into parquettable values(2,false,'Xu','test',true)")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    image binary,
+               |    autoLabel boolean)
+               | using carbon
+             """.stripMargin)
+        sql("insert into carbontable values(1,true,'Bob','binary',false)")
+        sql("insert into carbontable values(2,false,'Xu','test',true)")
+        val carbonResult = sql("SELECT * FROM carbontable")
+        val parquetResult = sql("SELECT * FROM parquettable")
+
+        assert(2 == carbonResult.collect().length)
+        assert(2 == parquetResult.collect().length)
+        checkAnswer(parquetResult, carbonResult)
+        carbonResult.collect().foreach { each =>
+            if (1 == each.get(0)) {
+                assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (2 == each.get(0)) {
+                assert("test".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        sql("CREATE TABLE parquettable2 AS SELECT * FROM carbontable")
+        sql("CREATE TABLE carbontable2  USING CARBON AS SELECT * FROM parquettable")
+        val carbonResult2 = sql("SELECT * FROM carbontable2")
+        val parquetResult2 = sql("SELECT * FROM parquettable2")
+        checkAnswer(parquetResult2, carbonResult2)
+        checkAnswer(carbonResult, carbonResult2)
+        checkAnswer(parquetResult, parquetResult2)
+        assert(2 == carbonResult2.collect().length)
+        assert(2 == parquetResult2.collect().length)
+
+        sql("INSERT INTO parquettable2 SELECT * FROM carbontable")
+        sql("INSERT INTO carbontable2 SELECT * FROM parquettable")
+        val carbonResult3 = sql("SELECT * FROM carbontable2")
+        val parquetResult3 = sql("SELECT * FROM parquettable2")
+        checkAnswer(carbonResult3, parquetResult3)
+        assert(4 == carbonResult3.collect().length)
+        assert(4 == parquetResult3.collect().length)
+    }
+
+    test("insert into carbon as select from hive after hive load data") {
+        sql("DROP TABLE IF EXISTS hiveTable")
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql("DROP TABLE IF EXISTS hiveTable2")
+        sql("DROP TABLE IF EXISTS carbontable2")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hivetable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    image binary,
+               |    autoLabel boolean)
+               | row format delimited fields terminated by '|'
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv'
+               | INTO TABLE hivetable
+             """.stripMargin)
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    image binary,
+               |    autoLabel boolean)
+               | using carbon
+             """.stripMargin)
+        sql("insert into carbontable select * from hivetable")
+        val carbonResult = sql("SELECT * FROM carbontable")
+        val hiveResult = sql("SELECT * FROM hivetable")
+
+        assert(3 == carbonResult.collect().length)
+        assert(3 == hiveResult.collect().length)
+        checkAnswer(hiveResult, carbonResult)
+        carbonResult.collect().foreach { each =>
+            if (2 == each.get(0)) {
+                assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (1 == each.get(0)) {
+                assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (3 == each.get(0)) {
+                assert("".equals(new String(each.getAs[Array[Byte]](3)))
+                        || "\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable")
+        sql("CREATE TABLE carbontable2  USING CARBON AS SELECT * FROM hivetable")
+        val carbonResult2 = sql("SELECT * FROM carbontable2")
+        val hiveResult2 = sql("SELECT * FROM hivetable2")
+        checkAnswer(hiveResult2, carbonResult2)
+        checkAnswer(carbonResult, carbonResult2)
+        checkAnswer(hiveResult, hiveResult2)
+        assert(3 == carbonResult2.collect().length)
+        assert(3 == hiveResult2.collect().length)
+
+        sql("INSERT INTO hivetable2 SELECT * FROM carbontable")
+        sql("INSERT INTO carbontable2 SELECT * FROM hivetable")
+        val carbonResult3 = sql("SELECT * FROM carbontable2")
+        val hiveResult3 = sql("SELECT * FROM hivetable2")
+        checkAnswer(carbonResult3, hiveResult3)
+        assert(6 == carbonResult3.collect().length)
+        assert(6 == hiveResult3.collect().length)
+    }
+
+    test("filter for hive and carbon") {
+        sql("DROP TABLE IF EXISTS hiveTable")
+        sql("DROP TABLE IF EXISTS carbontable")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hivetable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    image binary,
+               |    autoLabel boolean)
+               | row format delimited fields terminated by ','
+             """.stripMargin)
+        sql("insert into hivetable values(1,true,'Bob','binary',false)")
+        sql("insert into hivetable values(2,false,'Xu','test',true)")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    image binary,
+               |    autoLabel boolean)
+               | using carbon
+             """.stripMargin)
+        sql("insert into carbontable values(1,true,'Bob','binary',false)")
+        sql("insert into carbontable values(2,false,'Xu','test',true)")
+
+        // filter with equal
+        val hiveResult = sql("SELECT * FROM hivetable where image=cast('binary' as binary)")
+        val carbonResult = sql("SELECT * FROM carbontable where image=cast('binary' as binary)")
+
+        checkAnswer(hiveResult, carbonResult)
+        assert(1 == carbonResult.collect().length)
+        carbonResult.collect().foreach { each =>
+            assert(1 == each.get(0))
+            assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
+        }
+
+        // filter with non string
+        val exception = intercept[Exception] {
+            sql("SELECT * FROM carbontable where image=binary").collect()
+        }
+        assert(exception.getMessage.contains("cannot resolve '`binary`' given input columns"))
+
+        // filter with not equal
+        val hiveResult3 = sql("SELECT * FROM hivetable where image!=cast('binary' as binary)")
+        val carbonResult3 = sql("SELECT * FROM carbontable where image!=cast('binary' as binary)")
+        checkAnswer(hiveResult3, carbonResult3)
+        assert(1 == carbonResult3.collect().length)
+        carbonResult3.collect().foreach { each =>
+            assert(2 == each.get(0))
+            assert("test".equals(new String(each.getAs[Array[Byte]](3))))
+        }
+
+        // filter with in
+        val hiveResult4 = sql("SELECT * FROM hivetable where image in (cast('binary' as binary))")
+        val carbonResult4 = sql("SELECT * FROM carbontable where image in (cast('binary' as binary))")
+        checkAnswer(hiveResult4, carbonResult4)
+        assert(1 == carbonResult4.collect().length)
+        carbonResult4.collect().foreach { each =>
+            assert(1 == each.get(0))
+            assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
+        }
+
+        // filter with not in
+        val hiveResult5 = sql("SELECT * FROM hivetable where image not in (cast('binary' as binary))")
+        val carbonResult5 = sql("SELECT * FROM carbontable where image not in (cast('binary' as binary))")
+        checkAnswer(hiveResult5, carbonResult5)
+        assert(1 == carbonResult5.collect().length)
+        carbonResult5.collect().foreach { each =>
+            assert(2 == each.get(0))
+            assert("test".equals(new String(each.getAs[Array[Byte]](3))))
+        }
+    }
+
+    test("Spark DataSource don't support update, delete") {
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql("DROP TABLE IF EXISTS carbontable2")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | using carbon
+             """.stripMargin)
+        sql("insert into carbontable values(1,true,'Bob','binary',false)")
+        sql("insert into carbontable values(2,false,'Xu','test',true)")
+
+        val carbonResult = sql("SELECT * FROM carbontable")
+
+        carbonResult.collect().foreach { each =>
+            if (1 == each.get(0)) {
+                assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
+            } else if (2 == each.get(0)) {
+                assert("test".equals(new String(each.getAs[Array[Byte]](3))))
+            } else {
+                assert(false)
+            }
+        }
+
+        var exception = intercept[Exception] {
+            sql("UPDATE carbontable SET binaryField = 'binary2' WHERE id = 1").show()
+        }
+        assert(exception.getMessage.contains("mismatched input 'UPDATE' expecting"))
+
+        exception = intercept[Exception] {
+            sql("DELETE FROM carbontable WHERE id = 1").show()
+        }
+        assert(exception.getMessage.contains("Operation not allowed: DELETE FROM"))
+    }
+
+}
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index d25e675..19cf99f 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -1779,6 +1779,32 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     assert(ex.getMessage.contains("column: abc specified in inverted index columns does not exist in schema"))
   }
 
+  var writerPath = new File(this.getClass.getResource("/").getPath
+          + "../../target/SparkCarbonFileFormat/WriterOutput/")
+          .getCanonicalPath
+
+  test("Don't support load for datasource") {
+    import spark._
+    sql("DROP TABLE IF EXISTS binaryCarbon")
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      sql(
+        s"""
+           | CREATE TABLE binaryCarbon(
+           |    binaryId INT,
+           |    binaryName STRING,
+           |    binary BINARY,
+           |    labelName STRING,
+           |    labelContent STRING
+           |) USING CARBON  """.stripMargin)
+
+      val exception = intercept[Exception] {
+        sql(s"load data local inpath '$writerPath' into table binaryCarbon")
+      }
+      assert(exception.getMessage.contains("LOAD DATA is not supported for datasource tables"))
+    }
+    sql("DROP TABLE IF EXISTS binaryCarbon")
+  }
+
   override protected def beforeAll(): Unit = {
     drop
     createParquetTable
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 9835938..b84a7b0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -167,7 +167,13 @@ object CarbonFilters {
       } else {
         dataTypeOfAttribute
       }
-      new CarbonLiteralExpression(value, dataType)
+      val dataValue = if (dataTypeOfAttribute.equals(CarbonDataTypes.BINARY)
+              && Option(value).isDefined) {
+        new String(value.asInstanceOf[Array[Byte]])
+      } else {
+        value
+      }
+      new CarbonLiteralExpression(dataValue, dataType)
     }
 
     createFilter(predicate)
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
new file mode 100644
index 0000000..766cfeb
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.nio.charset.Charset;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Converter for binary
+ */
+public class BinaryFieldConverterImpl implements FieldConverter {
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(BinaryFieldConverterImpl.class.getName());
+
+  private int index;
+  private DataType dataType;
+  private CarbonDimension dimension;
+  private String nullformat;
+  private boolean isEmptyBadRecord;
+  private DataField dataField;
+  public BinaryFieldConverterImpl(DataField dataField, String nullformat, int index,
+      boolean isEmptyBadRecord) {
+    this.dataType = dataField.getColumn().getDataType();
+    this.dimension = (CarbonDimension) dataField.getColumn();
+    this.nullformat = nullformat;
+    this.index = index;
+    this.isEmptyBadRecord = isEmptyBadRecord;
+    this.dataField = dataField;
+  }
+
+  @Override
+  public void convert(CarbonRow row, BadRecordLogHolder logHolder)
+      throws CarbonDataLoadingException {
+    if (row.getObject(index) instanceof String) {
+      row.update((((String) row.getObject(index)))
+          .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)), index);
+    } else if (row.getObject(index) instanceof byte[]) {
+      row.update(row.getObject(index), index);
+    } else {
+      throw new CarbonDataLoadingException("Binary only support String and byte[] data type");
+    }
+  }
+
+  @Override
+  public Object convert(Object value, BadRecordLogHolder logHolder)
+      throws RuntimeException {
+    return null;
+  }
+
+  @Override
+  public void clear() {
+  }
+}
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 3b4df75..a6c61b4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -119,6 +119,8 @@ public class FieldEncoderFactory {
         return new ComplexFieldConverterImpl(
             createComplexDataType(dataField, absoluteTableIdentifier,
                 client, useOnePass, localCache, index, nullFormat, isEmptyBadRecord), index);
+      } else if (dataField.getColumn().getDataType() == DataTypes.BINARY) {
+        return new BinaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
       } else {
         // if the no dictionary column is a numeric column and no need to convert to binary
         // then treat it is as measure col
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index 8a0f8ea..2157c60 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -400,6 +400,11 @@ public class SortStepRowHandler implements Serializable {
       byte[] decimalBytes = new byte[len];
       rowBuffer.get(decimalBytes);
       tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
+    } else if (DataTypes.BINARY == tmpDataType) {
+      int len = rowBuffer.getInt();
+      byte[] bytes = new byte[len];
+      rowBuffer.get(bytes);
+      tmpContent = bytes;
     } else {
       throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
     }
@@ -847,6 +852,10 @@ public class SortStepRowHandler implements Serializable {
       byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
       reUsableByteArrayDataOutputStream.writeShort((short) decimalBytes.length);
       reUsableByteArrayDataOutputStream.write(decimalBytes);
+    } else if (DataTypes.BINARY == tmpDataType) {
+      byte[] bytes = (byte[]) tmpValue;
+      reUsableByteArrayDataOutputStream.writeInt(bytes.length);
+      reUsableByteArrayDataOutputStream.write(bytes);
     } else {
       throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
     }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 76c5613..a6e4b34 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -292,7 +292,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       Object[] nonDictArray = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       for (int i = 0; i < noDictDataTypesList.size(); i++) {
         DataType columnType = noDictDataTypesList.get(i);
-        if ((columnType == DataTypes.STRING) || (columnType == DataTypes.VARCHAR)) {
+        if ((columnType == DataTypes.STRING) || (columnType == DataTypes.VARCHAR) || (columnType
+            == DataTypes.BINARY)) {
           currentElementLength = ((byte[]) nonDictArray[i]).length;
           noDictColumnPageSize[bucketCounter] += currentElementLength;
           canSnappyHandleThisRow(noDictColumnPageSize[bucketCounter]);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index a201679..ebd21e8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -121,6 +121,8 @@ public class TablePage {
         DataType dataType = DataTypes.STRING;
         if (DataTypes.VARCHAR == spec.getSchemaDataType()) {
           dataType = DataTypes.VARCHAR;
+        } else if (DataTypes.BINARY == spec.getSchemaDataType()) {
+          dataType = DataTypes.BINARY;
         }
         ColumnPageEncoderMeta columnPageEncoderMeta =
             new ColumnPageEncoderMeta(spec, dataType, columnCompressor);
@@ -147,7 +149,7 @@ public class TablePage {
           }
         }
         // set the stats collector according to the data type of the columns
-        if (DataTypes.VARCHAR == dataType) {
+        if (DataTypes.VARCHAR == dataType || DataTypes.BINARY == dataType) {
           page.setStatsCollector(LVLongStringStatsCollector.newInstance());
         } else if (DataTypeUtil.isPrimitiveColumn(spec.getSchemaDataType())) {
           if (spec.getSchemaDataType() == DataTypes.TIMESTAMP) {
@@ -216,7 +218,7 @@ public class TablePage {
       dictDimensionPages[i].putData(rowId, keys[i]);
     }
 
-    // 2. convert noDictionary columns and complex columns and varchar columns.
+    // 2. convert noDictionary columns and complex columns and varchar, binary columns.
     int noDictionaryCount = noDictDimensionPages.length;
     int complexColumnCount = complexDimensionPages.length;
     if (noDictionaryCount > 0 || complexColumnCount > 0) {
@@ -225,8 +227,8 @@ public class TablePage {
           tableSpec.getNoDictionaryDimensionSpec();
       Object[] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       for (int i = 0; i < noDictAndComplex.length; i++) {
-        if (noDictionaryDimensionSpec.get(i).getSchemaDataType()
-            == DataTypes.VARCHAR) {
+        if (noDictionaryDimensionSpec.get(i).getSchemaDataType() == DataTypes.VARCHAR
+            || noDictionaryDimensionSpec.get(i).getSchemaDataType() == DataTypes.BINARY) {
           byte[] valueWithLength = addIntLengthToByteArray((byte[]) noDictAndComplex[i]);
           noDictDimensionPages[i].putData(rowId, valueWithLength);
         } else if (i < noDictionaryCount) {
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 272332d..90348d2d 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -16,6 +16,7 @@
 
   <properties>
     <dev.path>${basedir}/../../dev</dev.path>
+    <dep.jackson.version>2.6.5</dep.jackson.version>
   </properties>
 
   <dependencies>
@@ -76,7 +77,7 @@
                 <exclude>META-INF/*.DSA</exclude>
                 <exclude>META-INF/*.RSA</exclude>
                 <exclude>META-INF/vfs-providers.xml</exclude>
-                <exclude>io/netty/**</exclude>
+                <!--<exclude>io/netty/**</exclude>-->
               </excludes>
             </filter>
           </filters>
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
index a8899a7..aa5c671 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
@@ -65,7 +65,7 @@ class CSVCarbonWriter extends CarbonWriter {
   @Override
   public void write(Object object) throws IOException {
     try {
-      writable.set((String[]) object);
+      writable.set((Object[]) object);
       recordWriter.write(NullWritable.get(), writable);
     } catch (Exception e) {
       throw new IOException(e);
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index 9666cfa..e5c0680 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -144,7 +144,6 @@ public class CarbonReader<T> {
 
   /**
    * Return a new {@link CarbonReaderBuilder} instance
-   * Default value of table name is table + tablePath + time
    *
    * @param tablePath table path
    * @return CarbonReaderBuilder object
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index cfae2ae..7569926 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -113,6 +113,7 @@ public class CarbonWriterBuilder {
 
   /**
    * sets the list of columns for which inverted index needs to generated
+   *
    * @param invertedIndexColumns is a string array of columns for which inverted index needs to
    * generated.
    * If it is null or an empty array, inverted index will be generated for none of the columns
@@ -156,30 +157,30 @@ public class CarbonWriterBuilder {
 
   /**
    * To support the load options for sdk writer
-   * @param options key,value pair of load options.
-   * supported keys values are
-   * a. bad_records_logger_enable -- true (write into separate logs), false
-   * b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT
-   * c. bad_record_path -- path
-   * d. dateformat -- same as JAVA SimpleDateFormat
-   * e. timestampformat -- same as JAVA SimpleDateFormat
-   * f. complex_delimiter_level_1 -- value to Split the complexTypeData
-   * g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
-   * h. quotechar
-   * i. escapechar
-   *
-   * Default values are as follows.
-   *
-   * a. bad_records_logger_enable -- "false"
-   * b. bad_records_action -- "FAIL"
-   * c. bad_record_path -- ""
-   * d. dateformat -- "" , uses from carbon.properties file
-   * e. timestampformat -- "", uses from carbon.properties file
-   * f. complex_delimiter_level_1 -- "\001"
-   * g. complex_delimiter_level_2 -- "\002"
-   * h. quotechar -- "\""
-   * i. escapechar -- "\\"
    *
+   * @param options key,value pair of load options.
+   *                supported keys values are
+   *                a. bad_records_logger_enable -- true (write into separate logs), false
+   *                b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT
+   *                c. bad_record_path -- path
+   *                d. dateformat -- same as JAVA SimpleDateFormat
+   *                e. timestampformat -- same as JAVA SimpleDateFormat
+   *                f. complex_delimiter_level_1 -- value to Split the complexTypeData
+   *                g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
+   *                h. quotechar
+   *                i. escapechar
+   *                <p>
+   *                Default values are as follows.
+   *                <p>
+   *                a. bad_records_logger_enable -- "false"
+   *                b. bad_records_action -- "FAIL"
+   *                c. bad_record_path -- ""
+   *                d. dateformat -- "" , uses from carbon.properties file
+   *                e. timestampformat -- "", uses from carbon.properties file
+   *                f. complex_delimiter_level_1 -- "\001"
+   *                g. complex_delimiter_level_2 -- "\002"
+   *                h. quotechar -- "\""
+   *                i. escapechar -- "\\"
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
@@ -279,7 +280,7 @@ public class CarbonWriterBuilder {
     Set<String> supportedOptions = new HashSet<>(Arrays
         .asList("table_blocksize", "table_blocklet_size", "local_dictionary_threshold",
             "local_dictionary_enable", "sort_columns", "sort_scope", "long_string_columns",
-            "inverted_index","table_page_size_inmb"));
+            "inverted_index", "table_page_size_inmb"));
 
     for (String key : options.keySet()) {
       if (!supportedOptions.contains(key.toLowerCase())) {
@@ -613,11 +614,11 @@ public class CarbonWriterBuilder {
 
   private void validateLongStringColumns(Schema carbonSchema, Set<String> longStringColumns) {
     // long string columns must be string or varchar type
-    for (Field field :carbonSchema.getFields()) {
+    for (Field field : carbonSchema.getFields()) {
       if (longStringColumns.contains(field.getFieldName().toLowerCase()) && (
           (field.getDataType() != DataTypes.STRING) && field.getDataType() != DataTypes.VARCHAR)) {
         throw new RuntimeException(
-            "long string column : " + field.getFieldName() + "is not supported for data type: "
+            "long string column : " + field.getFieldName() + " is not supported for data type: "
                 + field.getDataType());
       }
     }
@@ -662,7 +663,7 @@ public class CarbonWriterBuilder {
       for (Field field : schema.getFields()) {
         if (null != field) {
           if (field.getDataType() == DataTypes.STRING ||
-              field.getDataType() == DataTypes.DATE  ||
+              field.getDataType() == DataTypes.DATE ||
               field.getDataType() == DataTypes.TIMESTAMP) {
             sortColumnsList.add(field.getFieldName());
           }
@@ -704,7 +705,7 @@ public class CarbonWriterBuilder {
     // differentiated to any level
     AtomicInteger valIndex = new AtomicInteger(0);
     // Check if any of the columns specified in sort columns are missing from schema.
-    for (String sortColumn: sortColumnsList) {
+    for (String sortColumn : sortColumnsList) {
       boolean exists = false;
       for (Field field : fields) {
         if (field.getFieldName().equalsIgnoreCase(sortColumn)) {
@@ -718,7 +719,7 @@ public class CarbonWriterBuilder {
       }
     }
     // Check if any of the columns specified in inverted index are missing from schema.
-    for (String invertedIdxColumn: invertedIdxColumnsList) {
+    for (String invertedIdxColumn : invertedIdxColumnsList) {
       boolean exists = false;
       for (Field field : fields) {
         if (field.getFieldName().equalsIgnoreCase(invertedIdxColumn)) {
@@ -744,10 +745,11 @@ public class CarbonWriterBuilder {
           // unsupported types for ("array", "struct", "double", "float", "decimal")
           if (field.getDataType() == DataTypes.DOUBLE || field.getDataType() == DataTypes.FLOAT
               || DataTypes.isDecimal(field.getDataType()) || field.getDataType().isComplexType()
-              || field.getDataType() == DataTypes.VARCHAR) {
+              || field.getDataType() == DataTypes.VARCHAR
+              || field.getDataType() == DataTypes.BINARY) {
             String errorMsg =
-                "sort columns not supported for array, struct, map, double, float, decimal,"
-                    + "varchar";
+                "sort columns not supported for array, struct, map, double, float, decimal, "
+                    + "varchar, binary";
             throw new RuntimeException(errorMsg);
           }
         }
@@ -814,7 +816,7 @@ public class CarbonWriterBuilder {
     if (schema == null) {
       return null;
     }
-    Field[] fields =  schema.getFields();
+    Field[] fields = schema.getFields();
     for (int i = 0; i < fields.length; i++) {
       if (fields[i] != null) {
         if (longStringColumns != null) {
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
index f7fceda..ab375f8 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
@@ -78,6 +78,8 @@ public class Field {
       this.type = DataTypes.FLOAT;
     } else if (type.equalsIgnoreCase("double")) {
       this.type = DataTypes.DOUBLE;
+    } else if (type.equalsIgnoreCase("binary")) {
+      this.type = DataTypes.BINARY;
     } else if (type.equalsIgnoreCase("array")) {
       this.type = DataTypes.createDefaultArrayType();
     } else if (type.equalsIgnoreCase("struct")) {
@@ -114,6 +116,8 @@ public class Field {
       this.type = DataTypes.FLOAT;
     } else if (type.equalsIgnoreCase("double")) {
       this.type = DataTypes.DOUBLE;
+    } else if (type.equalsIgnoreCase("binary")) {
+      this.type = DataTypes.BINARY;
     } else if (type.equalsIgnoreCase("array")) {
       this.type = DataTypes.createArrayType(fields.get(0).getDataType());
     } else if (type.equalsIgnoreCase("struct")) {
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
index 5f65539..d40e05d 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
@@ -41,7 +41,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
  * Writer Implementation to write Json Record to carbondata file.
  * json writer requires the path of json file and carbon schema.
  */
-@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter {
+@InterfaceAudience.User
+public class JsonCarbonWriter extends CarbonWriter {
   private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
   private TaskAttemptContext context;
   private ObjectArrayWritable writable;
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/RowUtil.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/RowUtil.java
index fdf3cfc..acbf525 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/RowUtil.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/RowUtil.java
@@ -148,4 +148,15 @@ public class RowUtil implements Serializable {
     return ((BigDecimal) data[ordinal]).toString();
   }
 
+  /**
+   * get binary data type data by ordinal
+   *
+   * @param data carbon row data
+   * @param ordinal the data index of Row
+   * @return byte data type data
+   */
+  public static byte[] getBinary(Object[] data, int ordinal) {
+    return (byte[]) data[ordinal];
+  }
+
 }
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
new file mode 100644
index 0000000..9fec185
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class SDKUtil {
+  public static ArrayList listFiles(String sourceImageFolder, final String suf) throws Exception {
+    return listFiles(sourceImageFolder, suf, new Configuration(true));
+  }
+
+  public static ArrayList listFiles(String sourceImageFolder,
+                                    final String suf, Configuration conf) throws Exception {
+    final String sufImageFinal = suf;
+    ArrayList result = new ArrayList();
+    CarbonFile[] fileList = FileFactory.getCarbonFile(sourceImageFolder).listFiles();
+    for (int i = 0; i < fileList.length; i++) {
+      if (fileList[i].isDirectory()) {
+        result.addAll(listFiles(fileList[i].getCanonicalPath(), sufImageFinal, conf));
+      } else if (fileList[i].getCanonicalPath().endsWith(sufImageFinal)) {
+        result.add(fileList[i].getCanonicalPath());
+      }
+    }
+    return result;
+  }
+
+
+  public static Object[] getSplitList(String path, String suf,
+                                      int numOfSplit, Configuration conf) throws Exception {
+    List fileList = listFiles(path, suf, conf);
+    List splitList = new ArrayList<List>();
+    if (numOfSplit < fileList.size()) {
+      // If maxSplits is less than the no. of files
+      // Split the reader into maxSplits splits with each
+      // element containing >= 1 CarbonRecordReader objects
+      float filesPerSplit = (float) fileList.size() / numOfSplit;
+      for (int i = 0; i < numOfSplit; ++i) {
+        splitList.add(fileList.subList(
+            (int) Math.ceil(i * filesPerSplit),
+            (int) Math.ceil(((i + 1) * filesPerSplit))));
+      }
+    } else {
+      // If maxSplits is greater than the no. of files
+      // Split the reader into <num_files> splits with each
+      // element contains exactly 1 CarbonRecordReader object
+      for (int i = 0; i < fileList.size(); ++i) {
+        splitList.add((fileList.subList(i, i + 1)));
+      }
+    }
+    return splitList.toArray();
+  }
+
+  public static Object[] getSplitList(String path, String suf,
+                                      int numOfSplit) throws Exception {
+    return getSplitList(path, suf, numOfSplit, new Configuration());
+  }
+
+}
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index 156ca5f..27b4e3a 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -139,7 +139,6 @@ public class CSVCarbonWriterTest {
 
   @Test
   public void testAllPrimitiveDataType() throws IOException {
-    // TODO: write all data type and read by CarbonRecordReader to verify the content
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
 
@@ -159,15 +158,16 @@ public class CSVCarbonWriterTest {
       CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CSVCarbonWriterTest").build();
 
       for (int i = 0; i < 100; i++) {
-        String[] row = new String[]{
+        Object[] row = new Object[]{
             "robot" + (i % 10),
-            String.valueOf(i),
-            String.valueOf(i),
-            String.valueOf(Long.MAX_VALUE - i),
-            String.valueOf((double) i / 2),
-            String.valueOf(true),
+            i,
+            i,
+            (Long.MAX_VALUE - i),
+            ((double) i / 2),
+            true,
             "2019-03-02",
-            "2019-02-12 03:03:34"
+            "2019-02-12 03:03:34",
+            "1.234567"
         };
         writer.write(row);
       }
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index f09581a..6a3578c 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -23,8 +23,9 @@ import java.sql.Timestamp;
 import java.util.*;
 
 import org.apache.avro.generic.GenericData;
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.log4j.Logger;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
@@ -553,27 +554,27 @@ public class CarbonReaderTest extends TestCase {
     CarbonWriter carbonWriter = null;
     try {
       carbonWriter = builder.outputPath(path1).uniqueIdentifier(12345)
-  .withCsvInput(schema).writtenBy("CarbonReaderTest").build();
+          .withCsvInput(schema).writtenBy("CarbonReaderTest").build();
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
     }
-    carbonWriter.write(new String[] { "MNO", "100" });
+    carbonWriter.write(new String[]{"MNO", "100"});
     carbonWriter.close();
 
-    Field[] fields1 = new Field[] { new Field("p1", "string"),
-         new Field("p2", "int") };
+    Field[] fields1 = new Field[]{new Field("p1", "string"),
+        new Field("p2", "int")};
     Schema schema1 = new Schema(fields1);
     CarbonWriterBuilder builder1 = CarbonWriter.builder();
     CarbonWriter carbonWriter1 = null;
     try {
       carbonWriter1 = builder1.outputPath(path2).uniqueIdentifier(12345)
-   .withCsvInput(schema1).writtenBy("CarbonReaderTest").build();
+          .withCsvInput(schema1).writtenBy("CarbonReaderTest").build();
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
     }
-    carbonWriter1.write(new String[] { "PQR", "200" });
+    carbonWriter1.write(new String[]{"PQR", "200"});
     carbonWriter1.close();
 
     try {
@@ -592,9 +593,9 @@ public class CarbonReaderTest extends TestCase {
             .build();
 
     while (reader1.hasNext()) {
-       Object[] row1 = (Object[]) reader1.readNextRow();
-       System.out.println(row1[0]);
-       System.out.println(row1[1]);
+      Object[] row1 = (Object[]) reader1.readNextRow();
+      System.out.println(row1[0]);
+      System.out.println(row1[1]);
     }
     reader1.close();
 
@@ -785,7 +786,8 @@ public class CarbonReaderTest extends TestCase {
     TestUtil.writeFilesAndVerify(100, new Schema(fields), path);
 
     File[] dataFiles = new File(path).listFiles(new FilenameFilter() {
-      @Override public boolean accept(File dir, String name) {
+      @Override
+      public boolean accept(File dir, String name) {
         return name.endsWith("carbondata");
       }
     });
@@ -1020,7 +1022,8 @@ public class CarbonReaderTest extends TestCase {
     }
 
     File[] dataFiles2 = new File(path).listFiles(new FilenameFilter() {
-      @Override public boolean accept(File dir, String name) {
+      @Override
+      public boolean accept(File dir, String name) {
         return name.endsWith("carbondata");
       }
     });
@@ -1131,7 +1134,8 @@ public class CarbonReaderTest extends TestCase {
     }
 
     File[] dataFiles1 = new File(path).listFiles(new FilenameFilter() {
-      @Override public boolean accept(File dir, String name) {
+      @Override
+      public boolean accept(File dir, String name) {
         return name.endsWith("carbondata");
       }
     });
@@ -1139,7 +1143,8 @@ public class CarbonReaderTest extends TestCase {
     assertTrue(versionDetails.contains("SDK_1.0.0 in version: "));
 
     File[] dataFiles2 = new File(path).listFiles(new FilenameFilter() {
-      @Override public boolean accept(File dir, String name) {
+      @Override
+      public boolean accept(File dir, String name) {
         return name.endsWith("carbonindex");
       }
     });
@@ -1357,26 +1362,26 @@ public class CarbonReaderTest extends TestCase {
     FileUtils.deleteDirectory(new File(path));
 
     String mySchema =
-        "{ "+
-            "  \"name\": \"address\", "+
-            "  \"type\": \"record\", "+
-            "  \"fields\": [ "+
-            "    { "+
-            "      \"name\": \"name\", "+
-            "      \"type\": \"string\" "+
-            "    }, "+
-            "    { "+
-            "      \"name\": \"age\", "+
-            "      \"type\": \"int\" "+
-            "    }, "+
-            "    { "+
-            "      \"name\": \"mapRecord\", "+
-            "      \"type\": { "+
-            "        \"type\": \"map\", "+
-            "        \"values\": \"string\" "+
-            "      } "+
-            "    } "+
-            "  ] "+
+        "{ " +
+            "  \"name\": \"address\", " +
+            "  \"type\": \"record\", " +
+            "  \"fields\": [ " +
+            "    { " +
+            "      \"name\": \"name\", " +
+            "      \"type\": \"string\" " +
+            "    }, " +
+            "    { " +
+            "      \"name\": \"age\", " +
+            "      \"type\": \"int\" " +
+            "    }, " +
+            "    { " +
+            "      \"name\": \"mapRecord\", " +
+            "      \"type\": { " +
+            "        \"type\": \"map\", " +
+            "        \"values\": \"string\" " +
+            "      } " +
+            "    } " +
+            "  ] " +
             "} ";
 
     String json =
@@ -1400,8 +1405,8 @@ public class CarbonReaderTest extends TestCase {
     String name = "bob";
     int age = 10;
     Object[] mapKeValue = new Object[2];
-    mapKeValue[0] = new Object[] { "city", "street" };
-    mapKeValue[1] = new Object[] { "bangalore", "k-lane" };
+    mapKeValue[0] = new Object[]{"city", "street"};
+    mapKeValue[1] = new Object[]{"bangalore", "k-lane"};
     int i = 0;
     while (reader.hasNext()) {
       Object[] row = (Object[]) reader.readNextRow();
@@ -1416,9 +1421,9 @@ public class CarbonReaderTest extends TestCase {
 
   @Test
   public void testReadWithFilterOfnonTransactionalwithsubfolders() throws IOException, InterruptedException {
-    String path1 = "./testWriteFiles/1/"+System.nanoTime();
-    String path2 = "./testWriteFiles/2/"+System.nanoTime();
-    String path3 = "./testWriteFiles/3/"+System.nanoTime();
+    String path1 = "./testWriteFiles/1/" + System.nanoTime();
+    String path2 = "./testWriteFiles/2/" + System.nanoTime();
+    String path3 = "./testWriteFiles/3/" + System.nanoTime();
     FileUtils.deleteDirectory(new File("./testWriteFiles"));
 
     Field[] fields = new Field[2];
@@ -1543,7 +1548,7 @@ public class CarbonReaderTest extends TestCase {
     }
   }
 
-   @Test
+  @Test
   public void testReadNextRowWithRowUtil() {
     String path = "./carbondata";
     try {
@@ -1661,7 +1666,7 @@ public class CarbonReaderTest extends TestCase {
         Assert.fail(e.getMessage());
       }
     }
-   }
+  }
 
   @Test
   public void testReadNextRowWithProjectionAndRowUtil() {
@@ -1737,7 +1742,7 @@ public class CarbonReaderTest extends TestCase {
         assertEquals(RowUtil.getFloat(data, 11), (float) 1.23);
         i++;
       }
-      assert  (i == 10);
+      assert (i == 10);
       reader.close();
     } catch (Throwable e) {
       e.printStackTrace();
@@ -1821,7 +1826,7 @@ public class CarbonReaderTest extends TestCase {
         assertEquals(RowUtil.getFloat(data, 11), new Float("1.23"));
         i++;
       }
-      assert(i==10);
+      assert (i == 10);
       reader.close();
     } catch (Throwable e) {
       e.printStackTrace();
@@ -2076,7 +2081,7 @@ public class CarbonReaderTest extends TestCase {
         .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
 
     String path = "./testSdkWriteWhenArrayOfStringIsEmpty";
-    String[] rec = { "aaa", "bbb", "aaa@cdf.com", "", "", "mmm", "" };
+    String[] rec = {"aaa", "bbb", "aaa@cdf.com", "", "", "mmm", ""};
     Field[] fields = new Field[7];
     fields[0] = new Field("stringField", DataTypes.STRING);
     fields[1] = new Field("varcharField", DataTypes.VARCHAR);
@@ -2320,4 +2325,97 @@ public class CarbonReaderTest extends TestCase {
       FileUtils.deleteDirectory(new File(path));
     }
   }
+
+  @Test
+  public void testWriteWithDifferentDataType() {
+    String path = "./carbondata";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+
+      Field[] fields = new Field[13];
+      fields[0] = new Field("stringField", DataTypes.STRING);
+      fields[1] = new Field("shortField", DataTypes.SHORT);
+      fields[2] = new Field("intField", DataTypes.INT);
+      fields[3] = new Field("longField", DataTypes.LONG);
+      fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+      fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+      fields[6] = new Field("dateField", DataTypes.DATE);
+      fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+      fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+      fields[9] = new Field("varcharField", DataTypes.VARCHAR);
+      fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
+      fields[11] = new Field("floatField", DataTypes.FLOAT);
+      fields[12] = new Field("binaryField", DataTypes.BINARY);
+      Map<String, String> map = new HashMap<>();
+      map.put("complex_delimiter_level_1", "#");
+      CarbonWriter writer = CarbonWriter.builder()
+          .outputPath(path)
+          .withLoadOptions(map)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("CarbonReaderTest")
+          .build();
+      byte[] value = "Binary".getBytes();
+      for (int i = 0; i < 10; i++) {
+        Object[] row2 = new Object[]{
+            "robot" + (i % 10),
+            i % 10000,
+            i,
+            (Long.MAX_VALUE - i),
+            ((double) i / 2),
+            (true),
+            "2019-03-02",
+            "2019-02-12 03:03:34",
+            12.345,
+            "varchar",
+            "Hello#World#From#Carbon",
+            1.23,
+            value
+        };
+        writer.write(row2);
+      }
+      writer.close();
+
+      // Read data
+      CarbonReader reader = CarbonReader
+          .builder(path, "_temp")
+          .withRowRecordReader()
+          .build();
+
+      int i = 0;
+      while (reader.hasNext()) {
+        Object[] data = (Object[]) reader.readNextRow();
+
+        assert (RowUtil.getString(data, 0).equals("robot" + i));
+        assertEquals(RowUtil.getInt(data, 1), 17957);
+        Assert.assertEquals(new String(value), new String(RowUtil.getBinary(data, 3)));
+        assert (RowUtil.getVarchar(data, 4).equals("varchar"));
+        Object[] arr = RowUtil.getArray(data, 5);
+        assert (arr[0].equals("Hello"));
+        assert (arr[1].equals("World"));
+        assert (arr[2].equals("From"));
+        assert (arr[3].equals("Carbon"));
+        assertEquals(RowUtil.getShort(data, 6), i);
+        assertEquals(RowUtil.getInt(data, 7), i);
+        assertEquals(RowUtil.getLong(data, 8), Long.MAX_VALUE - i);
+        assertEquals(RowUtil.getDouble(data, 9), ((double) i) / 2);
+        assert (RowUtil.getBoolean(data, 10));
+        assert (RowUtil.getDecimal(data, 11).equals("12.35"));
+        assertEquals(RowUtil.getFloat(data, 12), (float) 1.23);
+
+        i++;
+      }
+      assert (i == 10);
+      reader.close();
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(path));
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+  }
 }
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
new file mode 100644
index 0000000..e69a981
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import junit.framework.TestCase;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.util.BinaryUtil;
+
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.imageio.ImageIO;
+import javax.imageio.ImageReadParam;
+import javax.imageio.ImageReader;
+import javax.imageio.ImageTypeSpecifier;
+import javax.imageio.stream.FileImageInputStream;
+import javax.imageio.stream.ImageInputStream;
+import java.awt.color.ColorSpace;
+import java.awt.image.BufferedImage;
+import java.io.*;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.carbondata.sdk.file.utils.SDKUtil.listFiles;
+
+public class ImageTest extends TestCase {
+
+  @Test
+  public void testWriteWithByteArrayDataType() throws IOException, InvalidLoadOptionException, InterruptedException {
+    String imagePath = "./src/test/resources/image/carbondatalogo.jpg";
+    int num = 1;
+    int rows = 10;
+    String path = "./target/binary";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[5];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("image1", DataTypes.BINARY);
+    fields[3] = new Field("image2", DataTypes.BINARY);
+    fields[4] = new Field("image3", DataTypes.BINARY);
+
+    byte[] originBinary = null;
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      CarbonWriter writer = CarbonWriter
+          .builder()
+          .outputPath(path)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("SDKS3Example")
+          .withPageSizeInMb(1)
+          .build();
+
+      for (int i = 0; i < rows; i++) {
+        // read image and encode to Hex
+        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(imagePath));
+        originBinary = new byte[bis.available()];
+        while ((bis.read(originBinary)) != -1) {
+        }
+        // write data
+        writer.write(new Object[]{"robot" + (i % 10), i, originBinary, originBinary, originBinary});
+        bis.close();
+      }
+      writer.close();
+    }
+
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .build();
+
+    System.out.println("\nData:");
+    int i = 0;
+    while (i < 20 && reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+
+      byte[] outputBinary = (byte[]) row[1];
+      byte[] outputBinary2 = (byte[]) row[2];
+      byte[] outputBinary3 = (byte[]) row[3];
+      System.out.println(row[0] + " " + row[1] + " image1 size:" + outputBinary.length
+          + " image2 size:" + outputBinary2.length + " image3 size:" + outputBinary3.length);
+
+      for (int k = 0; k < 3; k++) {
+
+        byte[] originBinaryTemp = (byte[]) row[1 + k];
+        // validate output binary data and origin binary data
+        assert (originBinaryTemp.length == outputBinary.length);
+        for (int j = 0; j < originBinaryTemp.length; j++) {
+          assert (originBinaryTemp[j] == outputBinary[j]);
+        }
+
+        // save image, user can compare the save image and original image
+        String destString = "./target/binary/image" + k + "_" + i + ".jpg";
+        BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(destString));
+        bos.write(originBinaryTemp);
+        bos.close();
+      }
+      i++;
+    }
+    System.out.println("\nFinished");
+    reader.close();
+  }
+
+  @Test
+  public void testWriteBinaryWithSort() {
+    int num = 1;
+    String path = "./target/binary";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[5];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("image1", DataTypes.BINARY);
+    fields[3] = new Field("image2", DataTypes.BINARY);
+    fields[4] = new Field("image3", DataTypes.BINARY);
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      try {
+        CarbonWriter
+            .builder()
+            .outputPath(path)
+            .withCsvInput(new Schema(fields))
+            .writtenBy("SDKS3Example")
+            .withPageSizeInMb(1)
+            .withTableProperty("sort_columns", "image1")
+            .build();
+        assert (false);
+      } catch (Exception e) {
+        assert (e.getMessage().contains("sort columns not supported for array, struct, map, double, float, decimal, varchar, binary"));
+      }
+    }
+  }
+
+  @Test
+  public void testWriteBinaryWithLong_string_columns() {
+    int num = 1;
+    String path = "./target/binary";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[5];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("image1", DataTypes.BINARY);
+    fields[3] = new Field("image2", DataTypes.BINARY);
+    fields[4] = new Field("image3", DataTypes.BINARY);
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      try {
+        CarbonWriter
+            .builder()
+            .outputPath(path)
+            .withCsvInput(new Schema(fields))
+            .writtenBy("SDKS3Example")
+            .withPageSizeInMb(1)
+            .withTableProperty("long_string_columns", "image1")
+            .build();
+        assert (false);
+      } catch (Exception e) {
+        assert (e.getMessage().contains("long string column : image1 is not supported for data type: BINARY"));
+      }
+    }
+  }
+
+  @Test
+  public void testWriteBinaryWithInverted_index() {
+    int num = 1;
+    String path = "./target/binary";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[5];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("image1", DataTypes.BINARY);
+    fields[3] = new Field("image2", DataTypes.BINARY);
+    fields[4] = new Field("image3", DataTypes.BINARY);
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      try {
+        CarbonWriter
+            .builder()
+            .outputPath(path)
+            .withCsvInput(new Schema(fields))
+            .writtenBy("SDKS3Example")
+            .withPageSizeInMb(1)
+            .withTableProperty("inverted_index", "image1")
+            .build();
+        // TODO: should throw exception
+        //        assert(false);
+      } catch (Exception e) {
+        System.out.println(e.getMessage());
+        assert (e.getMessage().contains("INVERTED_INDEX column: image1 should be present in SORT_COLUMNS"));
+      }
+    }
+  }
+
+  @Test
+  public void testWriteWithNull() throws IOException, InvalidLoadOptionException {
+    String imagePath = "./src/test/resources/image/carbondatalogo.jpg";
+    int num = 1;
+    int rows = 10;
+    String path = "./target/binary";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[5];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("image1", DataTypes.BINARY);
+    fields[3] = new Field("image2", DataTypes.BINARY);
+    fields[4] = new Field("image3", DataTypes.BINARY);
+
+    byte[] originBinary = null;
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      CarbonWriter writer = CarbonWriter
+          .builder()
+          .outputPath(path)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("SDKS3Example")
+          .withPageSizeInMb(1)
+          .withLoadOption("bad_records_action", "force")
+          .build();
+
+      for (int i = 0; i < rows; i++) {
+        // read image and encode to Hex
+        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(imagePath));
+        originBinary = new byte[bis.available()];
+        while ((bis.read(originBinary)) != -1) {
+        }
+        // write data
+        writer.write(new Object[]{"robot" + (i % 10), i, originBinary, originBinary, 1});
+        bis.close();
+      }
+      try {
+        writer.close();
+      } catch (Exception e) {
+        assert (e.getMessage().contains("Binary only support String and byte[] data type"));
+      }
+    }
+
+  }
+
+  @Test
+  public void testBinaryWithOrWithoutFilter() throws IOException, InvalidLoadOptionException, InterruptedException, DecoderException {
+    String imagePath = "./src/test/resources/image/carbondatalogo.jpg";
+    int num = 1;
+    int rows = 1;
+    String path = "./target/binary";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("image", DataTypes.BINARY);
+
+    byte[] originBinary = null;
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      CarbonWriter writer = CarbonWriter
+          .builder()
+          .outputPath(path)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("SDKS3Example")
+          .withPageSizeInMb(1)
+          .build();
+
+      for (int i = 0; i < rows; i++) {
+        // read image and encode to Hex
+        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(imagePath));
+        char[] hexValue = null;
+        originBinary = new byte[bis.available()];
+        while ((bis.read(originBinary)) != -1) {
+          hexValue = Hex.encodeHex(originBinary);
+        }
+        // write data
+        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf(hexValue)});
+        bis.close();
+      }
+      writer.close();
+    }
+
+    // Read data with filter
+    EqualToExpression equalToExpression = new EqualToExpression(
+        new ColumnExpression("name", DataTypes.STRING),
+        new LiteralExpression("robot0", DataTypes.STRING));
+
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .filter(equalToExpression)
+        .build();
+
+    System.out.println("\nData:");
+    int i = 0;
+    while (i < 20 && reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+
+      byte[] outputBinary = Hex.decodeHex(new String((byte[]) row[1]).toCharArray());
+      System.out.println(row[0] + " " + row[2] + " image size:" + outputBinary.length);
+
+      // validate output binary data and origin binary data
+      assert (originBinary.length == outputBinary.length);
+      for (int j = 0; j < originBinary.length; j++) {
+        assert (originBinary[j] == outputBinary[j]);
+      }
+      String value = new String(outputBinary);
+      Assert.assertTrue(value.startsWith("�PNG"));
+      // save image, user can compare the save image and original image
+      String destString = "./target/binary/image" + i + ".jpg";
+      BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(destString));
+      bos.write(outputBinary);
+      bos.close();
+      i++;
+    }
+    System.out.println("\nFinished");
+    reader.close();
+
+    CarbonReader reader2 = CarbonReader
+        .builder(path, "_temp")
+        .build();
+
+    System.out.println("\nData:");
+    i = 0;
+    while (i < 20 && reader2.hasNext()) {
+      Object[] row = (Object[]) reader2.readNextRow();
+
+      byte[] outputBinary = Hex.decodeHex(new String((byte[]) row[1]).toCharArray());
+      System.out.println(row[0] + " " + row[2] + " image size:" + outputBinary.length);
+
+      // validate output binary data and origin binary data
+      assert (originBinary.length == outputBinary.length);
+      for (int j = 0; j < originBinary.length; j++) {
+        assert (originBinary[j] == outputBinary[j]);
+      }
+
+      // save image, user can compare the save image and original image
+      String destString = "./target/binary/image" + i + ".jpg";
+      BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(destString));
+      bos.write(outputBinary);
+      bos.close();
+      i++;
+    }
+    reader2.close();
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    System.out.println("\nFinished");
+  }
+
+  @Test
+  public void testBinaryWithManyImages() throws IOException, InvalidLoadOptionException, InterruptedException {
+    int num = 1;
+    String path = "./target/flowers";
+    Field[] fields = new Field[5];
+    fields[0] = new Field("binaryId", DataTypes.INT);
+    fields[1] = new Field("binaryName", DataTypes.STRING);
+    fields[2] = new Field("binary", "Binary");
+    fields[3] = new Field("labelName", DataTypes.STRING);
+    fields[4] = new Field("labelContent", DataTypes.STRING);
+
+    String imageFolder = "./src/test/resources/image/flowers";
+
+    byte[] originBinary = null;
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      CarbonWriter writer = CarbonWriter
+          .builder()
+          .outputPath(path)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("SDKS3Example")
+          .withPageSizeInMb(1)
+          .build();
+      File file = new File(imageFolder);
+      File[] files = file.listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          if (name == null) {
+            return false;
+          }
+          return name.endsWith(".jpg");
+        }
+      });
+
+      if (null != files) {
+        for (int i = 0; i < files.length; i++) {
+          // read image and encode to Hex
+          BufferedInputStream bis = new BufferedInputStream(new FileInputStream(files[i]));
+          char[] hexValue = null;
+          originBinary = new byte[bis.available()];
+          while ((bis.read(originBinary)) != -1) {
+            hexValue = Hex.encodeHex(originBinary);
+          }
+
+          String txtFileName = files[i].getCanonicalPath().split(".jpg")[0] + ".txt";
+          BufferedInputStream txtBis = new BufferedInputStream(new FileInputStream(txtFileName));
+          String txtValue = null;
+          byte[] txtBinary = null;
+          txtBinary = new byte[txtBis.available()];
+          while ((txtBis.read(txtBinary)) != -1) {
+            txtValue = new String(txtBinary, "UTF-8");
+          }
+          // write data
+          System.out.println(files[i].getCanonicalPath());
+          writer.write(new String[]{String.valueOf(i), files[i].getCanonicalPath(), String.valueOf(hexValue),
+              txtFileName, txtValue});
+          bis.close();
+        }
+      }
+      writer.close();
+    }
+
+    CarbonReader reader = CarbonReader
+        .builder(path)
+        .build();
+
+    System.out.println("\nData:");
+    int i = 0;
+    while (i < 20 && reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+
+      byte[] outputBinary = (byte[]) row[1];
+      System.out.println(row[0] + " " + row[2] + " image size:" + outputBinary.length);
+
+      // save image, user can compare the save image and original image
+      String destString = "./target/flowers/image" + i + ".jpg";
+      BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(destString));
+      bos.write(outputBinary);
+      bos.close();
+      i++;
+    }
+    System.out.println("\nFinished");
+    reader.close();
+  }
+
+  public void testWriteTwoImageColumn() throws Exception {
+    String imagePath = "./src/test/resources/image/vocForSegmentationClass";
+    String path = "./target/vocForSegmentationClass";
+    int num = 1;
+    Field[] fields = new Field[4];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("rawImage", DataTypes.BINARY);
+    fields[3] = new Field("segmentationClass", DataTypes.BINARY);
+
+    byte[] originBinary = null;
+    byte[] originBinary2 = null;
+
+    Object[] files = listFiles(imagePath, ".jpg").toArray();
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      CarbonWriter writer = CarbonWriter
+          .builder()
+          .outputPath(path)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("SDKS3Example")
+          .withPageSizeInMb(1)
+          .build();
+
+      for (int i = 0; i < files.length; i++) {
+        // read image and encode to Hex
+        String filePath = (String) files[i];
+        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(filePath));
+        originBinary = new byte[bis.available()];
+        while ((bis.read(originBinary)) != -1) {
+        }
+
+        BufferedInputStream bis2 = new BufferedInputStream(new FileInputStream(filePath.replace(".jpg", ".png")));
+        originBinary2 = new byte[bis2.available()];
+        while ((bis2.read(originBinary2)) != -1) {
+        }
+
+        // write data
+        writer.write(new Object[]{"robot" + (i % 10), i, originBinary, originBinary2});
+        bis.close();
+        bis2.close();
+      }
+      writer.close();
+    }
+
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .build();
+
+    System.out.println("\nData:");
+    int i = 0;
+    while (i < 20 && reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+
+      byte[] outputBinary = (byte[]) row[1];
+      byte[] outputBinary2 = (byte[]) row[2];
+      System.out.println(row[0] + " " + row[3] + " image1 size:" + outputBinary.length
+          + " image2 size:" + outputBinary2.length);
+
+      for (int k = 0; k < 2; k++) {
+
+        byte[] originBinaryTemp = (byte[]) row[1 + k];
+
+        // save image, user can compare the save image and original image
+        String destString = null;
+        if (k == 0) {
+          destString = "./target/vocForSegmentationClass/image" + k + "_" + i + ".jpg";
+        } else {
+          destString = "./target/vocForSegmentationClass/image" + k + "_" + i + ".png";
+        }
+        BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(destString));
+        bos.write(originBinaryTemp);
+        bos.close();
+      }
+      i++;
+    }
+    System.out.println("\nFinished");
+    reader.close();
+  }
+
+  @Test
+  public void testWriteWithByteArrayDataTypeAndManyImagesTxt()
+      throws Exception {
+    long startWrite = System.nanoTime();
+    String sourceImageFolder = "./src/test/resources/image/flowers";
+    String outputPath = "./target/flowers";
+    String preDestPath = "./target/flowers/image";
+    String sufAnnotation = ".txt";
+    BinaryUtil.binaryToCarbon(sourceImageFolder, outputPath, sufAnnotation, ".jpg");
+    BinaryUtil.carbonToBinary(outputPath, preDestPath);
+    long endWrite = System.nanoTime();
+    System.out.println("write time is " + (endWrite - startWrite) / 1000000000.0 + "s");
+  }
+
+  @Test
+  public void testWriteWithByteArrayDataTypeAndManyImagesXml()
+      throws Exception {
+    long startWrite = System.nanoTime();
+    String sourceImageFolder = "./src/test/resources/image/voc";
+
+    String outputPath = "./target/voc";
+    String preDestPath = "./target/voc/image";
+    String sufAnnotation = ".xml";
+    BinaryUtil.binaryToCarbon(sourceImageFolder, outputPath, sufAnnotation, ".jpg");
+    BinaryUtil.carbonToBinary(outputPath, preDestPath);
+    long endWrite = System.nanoTime();
+    System.out.println("write time is " + (endWrite - startWrite) / 1000000000.0 + "s");
+    ReadPerformance();
+  }
+
+  public void ReadPerformance() throws Exception {
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "2048");
+
+    long start = System.nanoTime();
+    int i = 0;
+    String path = "./target/voc";
+    CarbonReader reader2 = CarbonReader
+        .builder(path)
+        .withBatch(1000)
+        .build();
+
+    System.out.println("\nData2:");
+    i = 0;
+    while (reader2.hasNext()) {
+      Object[] rows = reader2.readNextBatchRow();
+
+      for (int j = 0; j < rows.length; j++) {
+        Object[] row = (Object[]) rows[j];
+        i++;
+        if (0 == i % 1000) {
+          System.out.println(i);
+        }
+        for (int k = 0; k < row.length; k++) {
+          Object column = row[k];
+        }
+      }
+    }
+
+    System.out.println(i);
+    reader2.close();
+    long end = System.nanoTime();
+    System.out.println("all time is " + (end - start) / 1000000000.0);
+    System.out.println("\nFinished");
+  }
+
+  @Test
+  public void testWriteWithByteArrayDataTypeAndManyImagesTxt3()
+      throws Exception {
+    String sourceImageFolder = "./src/test/resources/image/flowers";
+    String outputPath = "./target/flowers2";
+    String preDestPath = "./target/flowers2/image";
+    String sufAnnotation = ".txt";
+    try {
+      FileUtils.deleteDirectory(new File(outputPath));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    binaryToCarbonWithHWD(sourceImageFolder, outputPath, preDestPath, sufAnnotation, ".jpg", 2000);
+    try {
+      FileUtils.deleteDirectory(new File(outputPath));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testNumberOfFiles() throws Exception {
+    String sourceImageFolder = "./src/test/resources/image/flowers";
+    List result = listFiles(sourceImageFolder, ".jpg");
+    assertEquals(3, result.size());
+  }
+
+  public void binaryToCarbonWithHWD(String sourceImageFolder, String outputPath, String preDestPath,
+                                    String sufAnnotation, final String sufImage, int numToWrite)
+      throws Exception {
+    int num = 1;
+    Field[] fields = new Field[7];
+    fields[0] = new Field("height", DataTypes.INT);
+    fields[1] = new Field("width", DataTypes.INT);
+    fields[2] = new Field("depth", DataTypes.INT);
+    fields[3] = new Field("binaryName", DataTypes.STRING);
+    fields[4] = new Field("binary", DataTypes.BINARY);
+    fields[5] = new Field("labelName", DataTypes.STRING);
+    fields[6] = new Field("labelContent", DataTypes.STRING);
+
+    byte[] originBinary = null;
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+
+      Object[] files = listFiles(sourceImageFolder, sufImage).toArray();
+
+      int index = 0;
+
+      if (null != files) {
+        CarbonWriter writer = CarbonWriter
+            .builder()
+            .outputPath(outputPath)
+            .withCsvInput(new Schema(fields))
+            .withBlockSize(256)
+            .writtenBy("SDKS3Example")
+            .withPageSizeInMb(1)
+            .build();
+
+        for (int i = 0; i < files.length; i++) {
+          if (0 == index % numToWrite) {
+            writer.close();
+            writer = CarbonWriter
+                .builder()
+                .outputPath(outputPath)
+                .withCsvInput(new Schema(fields))
+                .withBlockSize(256)
+                .writtenBy("SDKS3Example")
+                .withPageSizeInMb(1)
+                .build();
+          }
+          index++;
+
+          // read image and encode to Hex
+          File file = new File((String) files[i]);
+          System.out.println(file.getCanonicalPath());
+          BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
+          int depth = 0;
+          boolean isGray;
+          boolean hasAlpha;
+          BufferedImage bufferedImage = null;
+          try {
+            bufferedImage = ImageIO.read(file);
+            isGray = bufferedImage.getColorModel().getColorSpace().getType() == ColorSpace.TYPE_GRAY;
+            hasAlpha = bufferedImage.getColorModel().hasAlpha();
+
+            if (isGray) {
+              depth = 1;
+            } else if (hasAlpha) {
+              depth = 4;
+            } else {
+              depth = 3;
+            }
+
+          } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println(i);
+            ImageInputStream stream = new FileImageInputStream(new File(file.getCanonicalPath()));
+            Iterator<ImageReader> iter = ImageIO.getImageReaders(stream);
+
+            Exception lastException = null;
+            while (iter.hasNext()) {
+              ImageReader reader = null;
+              try {
+                reader = (ImageReader) iter.next();
+                ImageReadParam param = reader.getDefaultReadParam();
+                reader.setInput(stream, true, true);
+                Iterator<ImageTypeSpecifier> imageTypes = reader.getImageTypes(0);
+
+                while (imageTypes.hasNext()) {
+                  ImageTypeSpecifier imageTypeSpecifier = imageTypes.next();
+                  System.out.println(imageTypeSpecifier.getColorModel().getColorSpace().getType());
+                  int bufferedImageType = imageTypeSpecifier.getBufferedImageType();
+                  if (bufferedImageType == BufferedImage.TYPE_BYTE_GRAY) {
+                    param.setDestinationType(imageTypeSpecifier);
+                    break;
+                  }
+                }
+                bufferedImage = reader.read(0, param);
+                isGray = bufferedImage.getColorModel().getColorSpace().getType() == ColorSpace.TYPE_GRAY;
+                hasAlpha = bufferedImage.getColorModel().hasAlpha();
+
+                if (isGray) {
+                  depth = 1;
+                } else if (hasAlpha) {
+                  depth = 4;
+                } else {
+                  depth = 3;
+                }
+                if (null != bufferedImage) break;
+              } catch (Exception e2) {
+                lastException = e2;
+              } finally {
+                if (null != reader) reader.dispose();
+              }
+            }
+            // If you don't have an image at the end of all readers
+            if (null == bufferedImage) {
+              if (null != lastException) {
+                throw lastException;
+              }
+            }
+          } finally {
+            originBinary = new byte[bis.available()];
+            while ((bis.read(originBinary)) != -1) {
+            }
+
+            String txtFileName = file.getCanonicalPath().split(sufImage)[0] + sufAnnotation;
+            BufferedInputStream txtBis = new BufferedInputStream(new FileInputStream(txtFileName));
+            String txtValue = null;
+            byte[] txtBinary = null;
+            txtBinary = new byte[txtBis.available()];
+            while ((txtBis.read(txtBinary)) != -1) {
+              txtValue = new String(txtBinary, "UTF-8");
+            }
+            // write data
+            writer.write(new Object[]{bufferedImage.getHeight(), bufferedImage.getWidth(), depth, file.getCanonicalPath(), originBinary,
+                txtFileName, txtValue.replace("\n", "")});
+            bis.close();
+          }
+        }
+        writer.close();
+      }
+    }
+
+    CarbonReader reader = CarbonReader
+        .builder(outputPath)
+        .build();
+
+    System.out.println("\nData:");
+    int i = 0;
+    while (i < 20 && reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+
+      byte[] outputBinary = (byte[]) row[1];
+      System.out.println(row[2] + " " + row[3] + " " + row[4] + " " + row[5] + " image size:" + outputBinary.length + " " + row[0]);
+
+      // save image, user can compare the save image and original image
+      String destString = preDestPath + i + sufImage;
+      BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(destString));
+      bos.write(outputBinary);
+      bos.close();
+      i++;
+    }
+    System.out.println("\nFinished");
+    reader.close();
+  }
+
+}
diff --git a/store/sdk/src/test/java/org/apache/carbondata/util/BinaryUtil.java b/store/sdk/src/test/java/org/apache/carbondata/util/BinaryUtil.java
new file mode 100644
index 0000000..073f704
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/util/BinaryUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.util;
+
+import java.io.*;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.sdk.file.CarbonReader;
+import org.apache.carbondata.sdk.file.CarbonWriter;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.sdk.file.Schema;
+
+import static org.apache.carbondata.sdk.file.utils.SDKUtil.listFiles;
+
+public class BinaryUtil {
+  public static void binaryToCarbon(String sourceImageFolder, String outputPath,
+                                    String sufAnnotation, final String sufImage) throws Exception {
+    Field[] fields = new Field[5];
+    fields[0] = new Field("binaryId", DataTypes.INT);
+    fields[1] = new Field("binaryName", DataTypes.STRING);
+    fields[2] = new Field("binary", DataTypes.BINARY);
+    fields[3] = new Field("labelName", DataTypes.STRING);
+    fields[4] = new Field("labelContent", DataTypes.STRING);
+    CarbonWriter writer = CarbonWriter
+        .builder()
+        .outputPath(outputPath)
+        .withCsvInput(new Schema(fields))
+        .withBlockSize(256)
+        .writtenBy("binaryExample")
+        .withPageSizeInMb(1)
+        .build();
+    binaryToCarbon(sourceImageFolder, writer, sufAnnotation, sufImage);
+  }
+
+  public static boolean binaryToCarbon(String sourceImageFolder, CarbonWriter writer,
+      String sufAnnotation, final String sufImage) throws Exception {
+    int num = 1;
+
+    byte[] originBinary = null;
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+
+      Object[] files = listFiles(sourceImageFolder, sufImage).toArray();
+
+      if (null != files) {
+        for (int i = 0; i < files.length; i++) {
+          // read image and encode to Hex
+          BufferedInputStream bis = new BufferedInputStream(
+              new FileInputStream(new File((String) files[i])));
+          originBinary = new byte[bis.available()];
+          while ((bis.read(originBinary)) != -1) {
+          }
+
+          String labelFileName = ((String) files[i]).split(sufImage)[0] + sufAnnotation;
+          BufferedInputStream txtBis = new BufferedInputStream(new FileInputStream(labelFileName));
+          String labelValue = null;
+          byte[] labelBinary = null;
+          labelBinary = new byte[txtBis.available()];
+          while ((txtBis.read(labelBinary)) != -1) {
+            labelValue = new String(labelBinary, "UTF-8");
+          }
+          // write data
+          writer.write(new Object[]{i, (String) files[i], originBinary,
+              labelFileName, labelValue});
+          bis.close();
+          txtBis.close();
+        }
+      }
+      writer.close();
+    }
+    return true;
+  }
+
+  public static boolean carbonToBinary(String carbonPath, String outputPath)
+      throws IOException, InterruptedException {
+    CarbonReader reader = CarbonReader
+        .builder(carbonPath)
+        .build();
+    return carbonToBinary(reader, outputPath);
+  }
+
+  public static boolean carbonToBinary(CarbonReader reader, String outputPath)
+      throws IOException, InterruptedException {
+    System.out.println("\nData:");
+    int i = 0;
+    while (i < 20 && reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+
+      byte[] outputBinary = (byte[]) row[1];
+      System.out.println(row[0] + " " + row[2] + " image size:" + outputBinary.length);
+
+      // save image, user can compare the save image and original image
+      String originalPath = (String) row[0];
+      int index = originalPath.lastIndexOf("/");
+      File file = new File(outputPath);
+      if (!file.exists()) {
+        assert file.mkdir();
+      }
+      String destString = outputPath + originalPath.substring(index, originalPath.length());
+      BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(destString));
+      bos.write(outputBinary);
+      bos.close();
+      i++;
+    }
+    System.out.println("number of reading: " + i);
+    System.out.println("\nFinished");
+    reader.close();
+    return true;
+  }
+}
diff --git a/store/sdk/src/test/resources/image/carbondatalogo.jpg b/store/sdk/src/test/resources/image/carbondatalogo.jpg
new file mode 100644
index 0000000..3469469
Binary files /dev/null and b/store/sdk/src/test/resources/image/carbondatalogo.jpg differ
diff --git a/store/sdk/src/test/resources/image/flowers/10686568196_b1915544a8.jpg b/store/sdk/src/test/resources/image/flowers/10686568196_b1915544a8.jpg
new file mode 100644
index 0000000..12937a0
Binary files /dev/null and b/store/sdk/src/test/resources/image/flowers/10686568196_b1915544a8.jpg differ
diff --git a/store/sdk/src/test/resources/image/flowers/10686568196_b1915544a8.txt b/store/sdk/src/test/resources/image/flowers/10686568196_b1915544a8.txt
new file mode 100644
index 0000000..12f7d78
--- /dev/null
+++ b/store/sdk/src/test/resources/image/flowers/10686568196_b1915544a8.txt
@@ -0,0 +1 @@
+tulips
\ No newline at end of file
diff --git a/store/sdk/src/test/resources/image/flowers/10712722853_5632165b04.jpg b/store/sdk/src/test/resources/image/flowers/10712722853_5632165b04.jpg
new file mode 100644
index 0000000..48591bf
Binary files /dev/null and b/store/sdk/src/test/resources/image/flowers/10712722853_5632165b04.jpg differ
diff --git a/store/sdk/src/test/resources/image/flowers/10712722853_5632165b04.txt b/store/sdk/src/test/resources/image/flowers/10712722853_5632165b04.txt
new file mode 100644
index 0000000..84bd766
--- /dev/null
+++ b/store/sdk/src/test/resources/image/flowers/10712722853_5632165b04.txt
@@ -0,0 +1 @@
+daisy
\ No newline at end of file
diff --git a/store/sdk/src/test/resources/image/flowers/subfolder/10841136265_af473efc60.jpg b/store/sdk/src/test/resources/image/flowers/subfolder/10841136265_af473efc60.jpg
new file mode 100644
index 0000000..0822034
Binary files /dev/null and b/store/sdk/src/test/resources/image/flowers/subfolder/10841136265_af473efc60.jpg differ
diff --git a/store/sdk/src/test/resources/image/flowers/subfolder/10841136265_af473efc60.txt b/store/sdk/src/test/resources/image/flowers/subfolder/10841136265_af473efc60.txt
new file mode 100644
index 0000000..84bd766
--- /dev/null
+++ b/store/sdk/src/test/resources/image/flowers/subfolder/10841136265_af473efc60.txt
@@ -0,0 +1 @@
+daisy
\ No newline at end of file
diff --git a/store/sdk/src/test/resources/image/voc/2007_000027.jpg b/store/sdk/src/test/resources/image/voc/2007_000027.jpg
new file mode 100755
index 0000000..fe9ba8c
Binary files /dev/null and b/store/sdk/src/test/resources/image/voc/2007_000027.jpg differ
diff --git a/store/sdk/src/test/resources/image/voc/2007_000027.xml b/store/sdk/src/test/resources/image/voc/2007_000027.xml
new file mode 100755
index 0000000..576da53
--- /dev/null
+++ b/store/sdk/src/test/resources/image/voc/2007_000027.xml
@@ -0,0 +1,63 @@
+<annotation>
+	<folder>VOC2012</folder>
+	<filename>2007_000027.jpg</filename>
+	<source>
+		<database>The VOC2007 Database</database>
+		<annotation>PASCAL VOC2007</annotation>
+		<image>flickr</image>
+	</source>
+	<size>
+		<width>486</width>
+		<height>500</height>
+		<depth>3</depth>
+	</size>
+	<segmented>0</segmented>
+	<object>
+		<name>person</name>
+		<pose>Unspecified</pose>
+		<truncated>0</truncated>
+		<difficult>0</difficult>
+		<bndbox>
+			<xmin>174</xmin>
+			<ymin>101</ymin>
+			<xmax>349</xmax>
+			<ymax>351</ymax>
+		</bndbox>
+		<part>
+			<name>head</name>
+			<bndbox>
+				<xmin>169</xmin>
+				<ymin>104</ymin>
+				<xmax>209</xmax>
+				<ymax>146</ymax>
+			</bndbox>
+		</part>
+		<part>
+			<name>hand</name>
+			<bndbox>
+				<xmin>278</xmin>
+				<ymin>210</ymin>
+				<xmax>297</xmax>
+				<ymax>233</ymax>
+			</bndbox>
+		</part>
+		<part>
+			<name>foot</name>
+			<bndbox>
+				<xmin>273</xmin>
+				<ymin>333</ymin>
+				<xmax>297</xmax>
+				<ymax>354</ymax>
+			</bndbox>
+		</part>
+		<part>
+			<name>foot</name>
+			<bndbox>
+				<xmin>319</xmin>
+				<ymin>307</ymin>
+				<xmax>340</xmax>
+				<ymax>326</ymax>
+			</bndbox>
+		</part>
+	</object>
+</annotation>
diff --git a/store/sdk/src/test/resources/image/voc/2007_000032.jpg b/store/sdk/src/test/resources/image/voc/2007_000032.jpg
new file mode 100755
index 0000000..b111b5a
Binary files /dev/null and b/store/sdk/src/test/resources/image/voc/2007_000032.jpg differ
diff --git a/store/sdk/src/test/resources/image/voc/2007_000032.xml b/store/sdk/src/test/resources/image/voc/2007_000032.xml
new file mode 100755
index 0000000..779abb6
--- /dev/null
+++ b/store/sdk/src/test/resources/image/voc/2007_000032.xml
@@ -0,0 +1,63 @@
+<annotation>
+	<folder>VOC2012</folder>
+	<filename>2007_000032.jpg</filename>
+	<source>
+		<database>The VOC2007 Database</database>
+		<annotation>PASCAL VOC2007</annotation>
+		<image>flickr</image>
+	</source>
+	<size>
+		<width>500</width>
+		<height>281</height>
+		<depth>3</depth>
+	</size>
+	<segmented>1</segmented>
+	<object>
+		<name>aeroplane</name>
+		<pose>Frontal</pose>
+		<truncated>0</truncated>
+		<difficult>0</difficult>
+		<bndbox>
+			<xmin>104</xmin>
+			<ymin>78</ymin>
+			<xmax>375</xmax>
+			<ymax>183</ymax>
+		</bndbox>
+	</object>
+	<object>
+		<name>aeroplane</name>
+		<pose>Left</pose>
+		<truncated>0</truncated>
+		<difficult>0</difficult>
+		<bndbox>
+			<xmin>133</xmin>
+			<ymin>88</ymin>
+			<xmax>197</xmax>
+			<ymax>123</ymax>
+		</bndbox>
+	</object>
+	<object>
+		<name>person</name>
+		<pose>Rear</pose>
+		<truncated>0</truncated>
+		<difficult>0</difficult>
+		<bndbox>
+			<xmin>195</xmin>
+			<ymin>180</ymin>
+			<xmax>213</xmax>
+			<ymax>229</ymax>
+		</bndbox>
+	</object>
+	<object>
+		<name>person</name>
+		<pose>Rear</pose>
+		<truncated>0</truncated>
+		<difficult>0</difficult>
+		<bndbox>
+			<xmin>26</xmin>
+			<ymin>189</ymin>
+			<xmax>44</xmax>
+			<ymax>238</ymax>
+		</bndbox>
+	</object>
+</annotation>
diff --git a/store/sdk/src/test/resources/image/voc/2007_000033.jpg b/store/sdk/src/test/resources/image/voc/2007_000033.jpg
new file mode 100755
index 0000000..01f478f
Binary files /dev/null and b/store/sdk/src/test/resources/image/voc/2007_000033.jpg differ
diff --git a/store/sdk/src/test/resources/image/voc/2007_000033.xml b/store/sdk/src/test/resources/image/voc/2007_000033.xml
new file mode 100755
index 0000000..61899d6
--- /dev/null
+++ b/store/sdk/src/test/resources/image/voc/2007_000033.xml
@@ -0,0 +1,51 @@
+<annotation>
+	<folder>VOC2012</folder>
+	<filename>2007_000033.jpg</filename>
+	<source>
+		<database>The VOC2007 Database</database>
+		<annotation>PASCAL VOC2007</annotation>
+		<image>flickr</image>
+	</source>
+	<size>
+		<width>500</width>
+		<height>366</height>
+		<depth>3</depth>
+	</size>
+	<segmented>1</segmented>
+	<object>
+		<name>aeroplane</name>
+		<pose>Unspecified</pose>
+		<truncated>0</truncated>
+		<difficult>0</difficult>
+		<bndbox>
+			<xmin>9</xmin>
+			<ymin>107</ymin>
+			<xmax>499</xmax>
+			<ymax>263</ymax>
+		</bndbox>
+	</object>
+	<object>
+		<name>aeroplane</name>
+		<pose>Left</pose>
+		<truncated>0</truncated>
+		<difficult>0</difficult>
+		<bndbox>
+			<xmin>421</xmin>
+			<ymin>200</ymin>
+			<xmax>482</xmax>
+			<ymax>226</ymax>
+		</bndbox>
+	</object>
+	<object>
+		<name>aeroplane</name>
+		<pose>Left</pose>
+		<truncated>1</truncated>
+		<difficult>0</difficult>
+		<bndbox>
+			<xmin>325</xmin>
+			<ymin>188</ymin>
+			<xmax>411</xmax>
+			<ymax>223</ymax>
+		</bndbox>
+	</object>
+</annotation>
diff --git a/store/sdk/src/test/resources/image/voc/2007_000039.jpg b/store/sdk/src/test/resources/image/voc/2007_000039.jpg
new file mode 100755
index 0000000..1a3b717
Binary files /dev/null and b/store/sdk/src/test/resources/image/voc/2007_000039.jpg differ
diff --git a/store/sdk/src/test/resources/image/voc/2007_000039.xml b/store/sdk/src/test/resources/image/voc/2007_000039.xml
new file mode 100755
index 0000000..bc73f4e
--- /dev/null
+++ b/store/sdk/src/test/resources/image/voc/2007_000039.xml
@@ -0,0 +1,27 @@
+<annotation>
+	<folder>VOC2012</folder>
+	<filename>2007_000039.jpg</filename>
+	<source>
+		<database>The VOC2007 Database</database>
+		<annotation>PASCAL VOC2007</annotation>
+		<image>flickr</image>
+	</source>
+	<size>
+		<width>500</width>
+		<height>375</height>
+		<depth>3</depth>
+	</size>
+	<segmented>1</segmented>
+	<object>
+		<name>tvmonitor</name>
+		<pose>Frontal</pose>
+		<truncated>0</truncated>
+		<difficult>0</difficult>
+		<bndbox>
+			<xmin>156</xmin>
+			<ymin>89</ymin>
+			<xmax>344</xmax>
+			<ymax>279</ymax>
+		</bndbox>
+	</object>
+</annotation>
diff --git a/store/sdk/src/test/resources/image/voc/2009_001444.jpg b/store/sdk/src/test/resources/image/voc/2009_001444.jpg
new file mode 100755
index 0000000..f01c62c
Binary files /dev/null and b/store/sdk/src/test/resources/image/voc/2009_001444.jpg differ
diff --git a/store/sdk/src/test/resources/image/voc/2009_001444.xml b/store/sdk/src/test/resources/image/voc/2009_001444.xml
new file mode 100755
index 0000000..9a68cbc
--- /dev/null
+++ b/store/sdk/src/test/resources/image/voc/2009_001444.xml
@@ -0,0 +1,28 @@
+<annotation>
+	<filename>2009_001444.jpg</filename>
+	<folder>VOC2012</folder>
+	<object>
+		<name>cat</name>
+		<bndbox>
+			<xmax>344</xmax>
+			<xmin>1</xmin>
+			<ymax>388</ymax>
+			<ymin>1</ymin>
+		</bndbox>
+		<difficult>0</difficult>
+		<occluded>0</occluded>
+		<pose>Unspecified</pose>
+		<truncated>1</truncated>
+	</object>
+	<segmented>1</segmented>
+	<size>
+		<depth>3</depth>
+		<height>388</height>
+		<width>500</width>
+	</size>
+	<source>
+		<annotation>PASCAL VOC2009</annotation>
+		<database>The VOC2009 Database</database>
+		<image>flickr</image>
+	</source>
+</annotation>
diff --git a/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000032.jpg b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000032.jpg
new file mode 100755
index 0000000..b111b5a
Binary files /dev/null and b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000032.jpg differ
diff --git a/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000032.png b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000032.png
new file mode 100755
index 0000000..1f7181c
Binary files /dev/null and b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000032.png differ
diff --git a/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000033.jpg b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000033.jpg
new file mode 100755
index 0000000..01f478f
Binary files /dev/null and b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000033.jpg differ
diff --git a/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000033.png b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000033.png
new file mode 100755
index 0000000..bbeb3f4
Binary files /dev/null and b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000033.png differ
diff --git a/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000042.jpg b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000042.jpg
new file mode 100755
index 0000000..2188d51
Binary files /dev/null and b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000042.jpg differ
diff --git a/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000042.png b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000042.png
new file mode 100755
index 0000000..73b6059
Binary files /dev/null and b/store/sdk/src/test/resources/image/vocForSegmentationClass/2007_000042.png differ
diff --git a/tools/cli/src/test/java/org/apache/carbondata/tool/CarbonCliTest.java b/tools/cli/src/test/java/org/apache/carbondata/tool/CarbonCliTest.java
index 708844f..87d2e11 100644
--- a/tools/cli/src/test/java/org/apache/carbondata/tool/CarbonCliTest.java
+++ b/tools/cli/src/test/java/org/apache/carbondata/tool/CarbonCliTest.java
@@ -22,12 +22,11 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonVersionConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.sdk.file.Field;
-import org.apache.carbondata.sdk.file.Schema;
-import org.apache.carbondata.sdk.file.TestUtil;
+import org.apache.carbondata.sdk.file.*;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
@@ -38,6 +37,7 @@ import org.junit.Test;
 public class CarbonCliTest {
 
   private String path = "./CarbonCliTest";
+  private String pathBinary = "./CarbonCliTestBinary";
 
   private String buildLines(String... lines) {
     ByteArrayOutputStream expectedOut = null;
@@ -67,6 +67,35 @@ public class CarbonCliTest {
     TestUtil.writeFilesAndVerify(5000000, new Schema(fields), path, new String[]{"name"}, 3, 8);
   }
 
+  public void buildBinaryData(int rows, Schema schema, String path, String[] sortColumns,
+                              int blockletSize, int blockSize)
+      throws IOException, InvalidLoadOptionException {
+
+    CarbonWriterBuilder builder = CarbonWriter.builder()
+        .outputPath(path);
+    if (sortColumns != null) {
+      builder = builder.sortBy(sortColumns);
+    }
+    if (blockletSize != -1) {
+      builder = builder.withBlockletSize(blockletSize);
+    }
+    if (blockSize != -1) {
+      builder = builder.withBlockSize(blockSize);
+    }
+
+    CarbonWriter writer = builder.withCsvInput(schema).writtenBy("TestUtil").build();
+
+    for (int i = 0; i < rows; i++) {
+      writer.write(new String[]{
+          "robot" + (i % 10), String.valueOf(i % 3000000), String.valueOf((double) i / 2)});
+    }
+    for (int i = 0; i < rows; i++) {
+      writer.write(new String[]{
+          "robot" + (i % 10), String.valueOf(i % 3000000), String.valueOf("robot" + i / 2)});
+    }
+    writer.close();
+  }
+
   @Test
   public void testInvalidCmd() {
     String[] args = {"-cmd", "DD", "-p", path};
@@ -231,9 +260,35 @@ public class CarbonCliTest {
     System.out.println(output);
   }
 
+  @Test
+  public void testBinary() throws IOException, InvalidLoadOptionException {
+    FileUtils.deleteDirectory(new File(pathBinary));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("binaryField", DataTypes.BINARY);
+
+    buildBinaryData(5000000, new Schema(fields), pathBinary, new String[]{"name"}, 3, 8);
+    String[] args = {"-cmd", "summary", "-p", pathBinary};
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    PrintStream stream = new PrintStream(out);
+    CarbonCli.run(args, stream);
+
+    String[] args2 = {"-cmd", "summary", "-p", pathBinary, "-s"};
+    out = new ByteArrayOutputStream();
+    stream = new PrintStream(out);
+    CarbonCli.run(args2, stream);
+    String output = new String(out.toByteArray());
+
+    Assert.assertTrue(output.contains("binaryfield") && output.contains("BINARY"));
+    FileUtils.deleteDirectory(new File(pathBinary));
+  }
+
+
   @After
   public void after() throws IOException {
     FileUtils.deleteDirectory(new File(path));
+    FileUtils.deleteDirectory(new File(pathBinary));
   }
 
 }