You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/05/30 10:17:09 UTC

[carbondata] branch master updated: [CARBONDATA-3336] Support configurable decode for loading binary data, support base64 and Hex decode.

This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dda02d  [CARBONDATA-3336] Support configurable decode for loading binary data, support base64 and Hex decode.
3dda02d is described below

commit 3dda02d44c4dca12c99e16df4f29dd3e8f2e6dc1
Author: xubo245 <xu...@huawei.com>
AuthorDate: Tue Apr 23 15:45:25 2019 +0800

    [CARBONDATA-3336] Support configurable decode for loading binary data, support base64 and Hex decode.
    
    Support configurable decode for loading binary data, support base64 and Hex decode.
    1. support configurable decode for loading
    2. test datamap: mv, preaggregate, timeseries, bloomfilter, lucene
    3. test datamap and configurable decode
    
    Default non decoder for loading binary data, this PR support base64 and hex decoder
    
    This closes #3188
---
 .../core/constants/CarbonLoadOptionConstants.java  |  13 ++
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   |  59 +++++
 .../src/test/resources/binaryDataBase64.csv        |   3 +
 .../{binarydata.csv => binaryDataHex.csv}          |   0
 .../testsuite/binary/TestBinaryDataType.scala      | 247 ++++++++++++++++++--
 .../preaggregate/TestPreAggStreaming.scala         |  11 +
 .../testsuite/dataload/TestLoadDataFrame.scala     |  42 ++++
 .../testsuite/datamap/TestDataMapCommand.scala     | 257 +++++++++++++++++++--
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |   1 +
 .../datasources/CarbonSparkDataSourceUtil.scala    |   4 +
 .../SparkCarbonDataSourceBinaryTest.scala          |  37 ++-
 .../datasource/SparkCarbonDataSourceTest.scala     |  69 +++++-
 .../apache/spark/sql/CarbonDataFrameWriter.scala   |   1 +
 .../processing/loading/DataLoadProcessBuilder.java |   2 +
 .../converter/impl/BinaryFieldConverterImpl.java   |  26 +--
 .../converter/impl/FieldEncoderFactory.java        |  54 ++++-
 .../loading/converter/impl/RowConverterImpl.java   |   9 +-
 .../converter/impl/binary/Base64BinaryDecoder.java |  42 ++++
 .../converter/impl/binary/BinaryDecoder.java       |  29 +++
 .../impl/binary/DefaultBinaryDecoder.java          |  32 +++
 .../converter/impl/binary/HexBinaryDecoder.java    |  34 +++
 .../processing/loading/model/CarbonLoadModel.java  |  15 ++
 .../loading/model/CarbonLoadModelBuilder.java      |  17 ++
 .../processing/util/CarbonLoaderUtil.java          |   9 +
 .../carbondata/sdk/file/CarbonWriterBuilder.java   |  10 +-
 .../org/apache/carbondata/sdk/file/ImageTest.java  | 108 ++++++++-
 26 files changed, 1068 insertions(+), 63 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index 225a8aa..3bcb06f 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -172,4 +172,17 @@ public final class CarbonLoadOptionConstants {
 
   public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT = "0";
 
+
+  /**
+   * carbon binary decoder when writing string data to binary, like decode base64, Hex
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_BINARY_DECODER = "carbon.binary.decoder";
+
+  public static final String CARBON_OPTIONS_BINARY_DECODER_DEFAULT = "";
+
+  public static final String CARBON_OPTIONS_BINARY_DECODER_BASE64 = "base64";
+
+  public static final String CARBON_OPTIONS_BINARY_DECODER_HEX = "hex";
+
 }
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 62e320e..5e12ad3 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -970,6 +970,65 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test binary on mv") {
+    val querySQL = "select x19,x20,sum(x18) from all_table group by x19, x20"
+    val querySQL2 = "select x19,x20,sum(x18) from all_table where x20=cast('binary2' as binary ) group by x19, x20"
+
+    sql("drop datamap if exists all_table_mv")
+    sql("drop table if exists all_table")
+
+    sql(
+      """
+        | create table all_table(x1 bigint,x2 bigint,
+        | x3 string,x4 bigint,x5 bigint,x6 int,x7 string,x8 int, x9 int,x10 bigint,
+        | x11 bigint, x12 bigint,x13 bigint,x14 bigint,x15 bigint,x16 bigint,
+        | x17 bigint,x18 bigint,x19 bigint,x20 binary) stored by 'carbondata'""".stripMargin)
+    sql("insert into all_table select 1,1,null,1,1,1,null,1,1,1,1,1,1,1,1,1,1,1,1,'binary1'")
+    sql("insert into all_table select 1,1,null,1,1,1,null,1,1,1,1,1,1,1,1,1,1,12,2,'binary2'")
+    sql("insert into all_table select 1,1,null,1,1,1,null,1,1,1,1,1,1,1,1,1,1,1,2,'binary2'")
+
+    sql("create datamap all_table_mv on table all_table using 'mv' as " + querySQL)
+    sql("rebuild datamap all_table_mv")
+
+    var frame = sql(querySQL)
+    var analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "all_table_mv"))
+    assert(2 == frame.collect().size)
+    frame.collect().foreach { each =>
+      if (1 == each.get(0)) {
+        assert("binary1".equals(new String(each.getAs[Array[Byte]](1))))
+        assert(1 == each.get(2))
+      } else if (2 == each.get(0)) {
+        assert("binary2".equals(new String(each.getAs[Array[Byte]](1))))
+        assert(13 == each.get(2))
+      } else {
+        assert(false)
+      }
+    }
+
+    frame = sql(querySQL2)
+    analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "all_table_mv"))
+    assert(1 == frame.collect().size)
+    frame.collect().foreach { each =>
+      if (2 == each.get(0)) {
+        assert("binary2".equals(new String(each.getAs[Array[Byte]](1))))
+        assert(13 == each.get(2))
+      } else {
+        assert(false)
+      }
+    }
+
+    sql("drop table if exists all_table")
+  }
+
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName + "_table"))
+  }
+
   def drop(): Unit = {
     sql("drop table IF EXISTS fact_table1")
     sql("drop table IF EXISTS fact_table2")
diff --git a/integration/spark-common-test/src/test/resources/binaryDataBase64.csv b/integration/spark-common-test/src/test/resources/binaryDataBase64.csv
new file mode 100644
index 0000000..602d1f9
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/binaryDataBase64.csv
@@ -0,0 +1,3 @@
+2,false,2.png,iVBORw0KGgoAAAANSUhEUgAAAvUAAADPCAYAAAB1PC5vAAAABHNCSVQICAgIfAhkiAAAABl0RVh0U29mdHdhcmUAZ25vbWUtc2NyZWVuc2hvdO8Dvz4AACAASURBVHic7L35kyS5sef3ASIi78y6+5rp6ZkeckiK0tOuzGQr0/9vMpk9rdmT9i2P4ZDd01fdWZV3nIB+ABCBiIyqrm7O43Ck9LbozIqMwOFwuH8BOBxCa63Z0Y52tKMd7WhHO9rRjnb0iyX5cxdgRzva0Y52tKMd7WhHO9rR30c7UL+jHe1oRzva0Y52tKMd/cJpB+p3tKMd7WhHO9rRjna0o1847UD9jna0ox3taEc72tGOdvQLpx2o39GOdrSjHe1oRzva0Y5+4bQD9Tva0Y52tKMd7WhHO9rRL5x2oH5HO9rRjna0ox3taEc7+oXTDtTvaEc72tGOdrSjHe1oR79w2oH6He1oRzva0Y [...]
+3,false,3.png,iVBORw0KGgoAAAANSUhEUgAAATYAAAChCAYAAABNqJPmAAAABHNCSVQICAgIfAhkiAAAABl0RVh0U29mdHdhcmUAZ25vbWUtc2NyZWVuc2hvdO8Dvz4AACAASURBVHic7L13dB3Xfe/72XtOxUEHSAAEwCqSokRSIiVS3abkqDq2ZdmyLcV2LDuxb5J7c3NTXsrKvWu9dW+Sl7fWvUkcOy9ObBWrWVYx1TspUZSowt47CaJ34ACnzsze748pOADRCZKIdb5ey6DOmdmz95zZ3/n1n9Baa/LII488fo0gL/YE8sgjjzxmGnliyyOP/4jQGpTt/FsptLJBq5zv1MWb2yxA4GJPII888pgitAYhsOO9qO5mrI4z6FQcgmECc+ox5sxHFlcgRBANCCEu9owvOPLElkce/5GgNdpMYx7dTnLrswTaT6AGe1FmBiEDqFgJumYJgdW3ELn6DkS4wCfCTxNE3nmQRx [...]
+1,true,1.png,iVBORw0KGgoAAAANSUhEUgAAAUoAAABQCAYAAAB/EzxMAAAABHNCSVQICAgIfAhkiAAAABl0RVh0U29mdHdhcmUAZ25vbWUtc2NyZWVuc2hvdO8Dvz4AACAASURBVHic7b1ZbBz3ne/7raqu3tjdZC/c91WiREmmLFOLJdmJLY9sxXHsZG5yMIgnmTOYezO4GGBwX+ZhXi/OeUtw5gBncHAGB2ceBkkuZuJxYiWSbMsytdNaSHGXxH3pfe+uveo+dFepm11NsinKknPqAxgWu5auqq761f//W74/Ym1tTYGBgYGBQVkIlmUNQ2lgYGCwCeTzPgADAwODFx3DUBoYGBhsgWEoDQwMDLbAMJQGBgYGW2AYSgMDA4MtMAylgYGBwRYYhtLAwMBgC0zP+wAMDL7JKIIAhWEhcxwIigLpdIKgK3+s5HQGMscBslyyjCBJUF7Pzo+R4yEzDBRBAEHTIB1VIEw7f/Q [...]
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/resources/binarydata.csv b/integration/spark-common-test/src/test/resources/binaryDataHex.csv
similarity index 100%
rename from integration/spark-common-test/src/test/resources/binarydata.csv
rename to integration/spark-common-test/src/test/resources/binaryDataHex.csv
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index b2bda24..89c89dc 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -17,17 +17,14 @@
 package org.apache.carbondata.integration.spark.testsuite.binary
 
 import java.util.Arrays
-
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
-
-import org.apache.commons.codec.binary.Hex
+import org.apache.commons.codec.binary.{Base64, Hex}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.util.SparkUtil
 import org.scalatest.BeforeAndAfterAll
 
 /**
@@ -52,7 +49,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
              """.stripMargin)
         sql(
             s"""
-               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
                | INTO TABLE binaryTable
                | OPTIONS('header'='false')
              """.stripMargin)
@@ -80,7 +77,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
                 val bytes40 = each.getAs[Array[Byte]](3).slice(0, 40)
                 val binaryName = each(2).toString
                 val expectedBytes = Hex.encodeHex(firstBytes20.get(binaryName).get)
-                assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(), bytes40), "incorrect numeric value for flattened binaryField")
+                assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(), bytes40), "incorrect value for binary data")
 
                 assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
 
@@ -91,7 +88,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
                     val binaryName = each(0).toString
                     val bytes40 = each.getAs[Array[Byte]](1).slice(0, 40)
                     val expectedBytes = Hex.encodeHex(firstBytes20.get(binaryName).get)
-                    assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(), bytes40), "incorrect numeric value for flattened binaryField")
+                    assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(), bytes40), "incorrect value for binary data")
                 }
             }
         } catch {
@@ -248,7 +245,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
              """.stripMargin)
         sql(
             s"""
-               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
                | INTO TABLE binaryTable
                | OPTIONS('header'='false')
              """.stripMargin)
@@ -277,7 +274,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
              """.stripMargin)
         sql(
             s"""
-               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
                | INTO TABLE binaryTable
                | OPTIONS('header'='false')
              """.stripMargin)
@@ -306,7 +303,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
              """.stripMargin)
         sql(
             s"""
-               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
                | INTO TABLE binaryTable
                | OPTIONS('header'='false')
              """.stripMargin)
@@ -335,7 +332,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
              """.stripMargin)
         sql(
             s"""
-               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
                | INTO TABLE binaryTable
                | OPTIONS('header'='false')
              """.stripMargin)
@@ -362,7 +359,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
              """.stripMargin)
         sql(
             s"""
-               | LOAD DATA LOCAL INPATH '$resourcesPath/binarydata.csv'
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
                | INTO TABLE binaryTable
                | OPTIONS('header'='false')
              """.stripMargin)
@@ -1068,7 +1065,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
         assert(e.getMessage.contains("operation failed for default.binarytable: Alter table data type change operation failed: Given column binaryfield with data type BINARY cannot be modified. Only Int and Decimal data types are allowed for modification"))
     }
 
-    ignore("Create table and load data with binary column for hive: test encode without \u0001") {
+    test("Create table and load data with binary column for hive: test encode with base64") {
         sql("DROP TABLE IF EXISTS hivetable")
         sql("DROP TABLE IF EXISTS carbontable")
         sql(
@@ -1079,11 +1076,11 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
                |    name string,
                |    binaryField binary,
                |    autoLabel boolean)
-               | row format delimited fields terminated by '|'
+               | row format delimited fields terminated by ','
              """.stripMargin)
         sql(
             s"""
-               | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata2.csv'
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataBase64.csv'
                | INTO TABLE hivetable
              """.stripMargin)
 
@@ -1099,14 +1096,13 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
              """.stripMargin)
         sql(
             s"""
-               | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata2.csv'
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataBase64.csv'
                | INTO TABLE carbontable
-               | OPTIONS('header'='false','DELIMITER'='|')
+               | OPTIONS('header'='false','DELIMITER'=',','binary_decoder'='baSe64')
              """.stripMargin)
 
         val hiveResult = sql("SELECT * FROM hivetable")
         val carbonResult = sql("SELECT * FROM carbontable")
-        // TODO
         checkAnswer(hiveResult, carbonResult)
 
         checkAnswer(sql("SELECT COUNT(*) FROM hivetable"), Seq(Row(3)))
@@ -1120,8 +1116,8 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
                 assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
                 assert(each(2).toString.contains(".png"))
 
-                val value = new String(each.getAs[Array[Byte]](3))
-                // assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value) || "\u0001education\u0002".equals(value))
+                val value = each.getAs[Array[Byte]](3).slice(0, 10)
+                assert(new String(Base64.encodeBase64(value)).equals("iVBORw0KGgoAAA=="))
                 assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
             }
 
@@ -1135,9 +1131,191 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
                 assert(each(2).toString.contains(".png"))
 
 
-                val value = new String(each.getAs[Array[Byte]](3))
-                // assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value) || "\u0001education\u0002".equals(value))
+                val value = each.getAs[Array[Byte]](3).slice(0, 10)
+                assert(new String(Base64.encodeBase64(value)).equals("iVBORw0KGgoAAA=="))
+                assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+            }
+        } catch {
+            case e: Exception =>
+                e.printStackTrace()
+                assert(false)
+        }
+    }
+
+    test("Create table and load data with binary column for hive: test encode with base64 and streaming = true") {
+        sql("DROP TABLE IF EXISTS hivetable")
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hivetable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | row format delimited fields terminated by ','
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataBase64.csv'
+               | INTO TABLE hivetable
+             """.stripMargin)
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+               | tblproperties('streaming'='true')
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataBase64.csv'
+               | INTO TABLE carbontable
+               | OPTIONS('header'='false','DELIMITER'=',','binary_decoder'='baSe64')
+             """.stripMargin)
+
+        val hiveResult = sql("SELECT * FROM hivetable")
+        val carbonResult = sql("SELECT * FROM carbontable")
+        checkAnswer(hiveResult, carbonResult)
+
+        checkAnswer(sql("SELECT COUNT(*) FROM hivetable"), Seq(Row(3)))
+        try {
+            val carbonDF = carbonResult.collect()
+            assert(3 == carbonDF.length)
+            carbonDF.foreach { each =>
+                assert(5 == each.length)
+
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+
+                val value = each.getAs[Array[Byte]](3).slice(0, 10)
+                assert(new String(Base64.encodeBase64(value)).equals("iVBORw0KGgoAAA=="))
+                assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+            }
+
+            val df = hiveResult.collect()
+            assert(3 == df.length)
+            df.foreach { each =>
+                assert(5 == each.length)
+
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+
+
+                val value = each.getAs[Array[Byte]](3).slice(0, 10)
+                assert(new String(Base64.encodeBase64(value)).equals("iVBORw0KGgoAAA=="))
+                assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+            }
+        } catch {
+            case e: Exception =>
+                e.printStackTrace()
+                assert(false)
+        }
+    }
+
+    test("Create table and load data with binary column for hive: test encode without \u0001 and not base64") {
+        // Carbon will throw exception if the data is not base64 when carbon set binary_decoder is base64
+        // hive will save as original data if the data is not base64
+        sql("DROP TABLE IF EXISTS hivetable")
+        sql("DROP TABLE IF EXISTS carbontable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hivetable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | row format delimited fields terminated by '|'
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata2.csv'
+               | INTO TABLE hivetable
+             """.stripMargin)
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbontable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+        val e = intercept[Exception] {
+            sql(
+                s"""
+                   | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata2.csv'
+                   | INTO TABLE carbontable
+                   | OPTIONS('header'='false','DELIMITER'='|','binary_decoder'='baSe64')
+             """.stripMargin)
+        }
+        assert(e.getMessage.contains("Binary decoder is base64, but data is not base64"))
+    }
+
+    test("Create table and load data with binary column with Hex decode") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS binaryTable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+               | TBLPROPERTIES('SORT_COLUMNS'='')
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
+               | INTO TABLE binaryTable
+               | OPTIONS('header'='false','binary_decoder'='hex')
+             """.stripMargin)
+
+        val result = sql("desc formatted binaryTable").collect()
+        var flag = false
+        result.foreach { each =>
+            if ("binary".equals(each.get(1))) {
+                flag = true
+            }
+        }
+        assert(flag)
+
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
+        try {
+            val df = sql("SELECT * FROM binaryTable").collect()
+            assert(3 == df.length)
+            df.foreach { each =>
+                assert(5 == each.length)
+
+                assert(Integer.valueOf(each(0).toString) > 0)
+                assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true")))
+                assert(each(2).toString.contains(".png"))
+
+                val bytes20 = each.getAs[Array[Byte]](3).slice(0, 20)
+                val binaryName = each(2).toString
+                assert(Arrays.equals(firstBytes20.get(binaryName).get, bytes20), "incorrect value for binary data")
+
                 assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true")))
+
+                val df = sql("SELECT name,binaryField FROM binaryTable").collect()
+                assert(3 == df.length)
+                df.foreach { each =>
+                    assert(2 == each.length)
+                    val binaryName = each(0).toString
+                    val bytes20 = each.getAs[Array[Byte]](1).slice(0, 20)
+                    assert(Arrays.equals(firstBytes20.get(binaryName).get, bytes20), "incorrect value for binary data")
+                }
             }
         } catch {
             case e: Exception =>
@@ -1146,6 +1324,31 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
         }
     }
 
+    test("Create table and load data with binary column with invalid value") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS binaryTable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    autoLabel boolean)
+               | STORED BY 'carbondata'
+               | TBLPROPERTIES('SORT_COLUMNS'='')
+             """.stripMargin)
+        val e = intercept[Exception] {
+            sql(
+                s"""
+                   | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
+                   | INTO TABLE binaryTable
+                   | OPTIONS('header'='false','binary_decoder'='he')
+             """.stripMargin)
+        }
+        assert(e.getMessage().contains(
+            "Binary decoder only support Base64, Hex or no decode for string, don't support he"))
+    }
+
     override def afterAll: Unit = {
         sql("DROP TABLE IF EXISTS binaryTable")
         sql("DROP TABLE IF EXISTS hiveTable")
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala
index 262c8b8..28b7546 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala
@@ -39,6 +39,9 @@ class TestPreAggStreaming extends QueryTest with BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTableStreamingOne")
     sql("CREATE TABLE origin(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format' tblproperties('streaming'='true')")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table origin")
+    sql("CREATE TABLE binary_stream(id int, label boolean, name string,image binary,autoLabel boolean) STORED BY 'org.apache.carbondata.format' tblproperties('streaming'='true')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataBase64.csv' into table binary_stream OPTIONS('header'='false','DELIMITER'=',','binary_decoder'='baSe64')")
+    sql("create datamap agg0 on table binary_stream using 'preaggregate' as select name from binary_stream group by name")
   }
 
   test("Test Pre Agg Streaming with project column and group by") {
@@ -47,6 +50,13 @@ class TestPreAggStreaming extends QueryTest with BeforeAndAfterAll {
     checkAnswer(df, sql("select name from origin group by name"))
   }
 
+  test("Test binary with stream and preaggregate") {
+    val df = sql("select name from binary_stream group by name")
+    df.collect()
+    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+    checkAnswer(df, sql("select name from binary_stream group by name"))
+  }
+
   test("Test Pre Agg Streaming table Agg Sum Aggregation") {
     val df = sql("select name, sum(age) from maintable group by name")
     assert(validateStreamingTablePlan(df.queryExecution.analyzed))
@@ -124,6 +134,7 @@ class TestPreAggStreaming extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists mainTable")
     sql("drop table if exists mainTableStreamingOne")
     sql("drop table if exists origin")
+    sql("drop table if exists binary_stream")
   }
 
   override def afterAll: Unit = {
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index e7494d6..867fdef 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -392,6 +392,48 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
 
   }
 
+  test("test load data with binary_decoder in df") {
+    val spark = sqlContext.sparkSession
+    try {
+      sql("DROP TABLE IF EXISTS carbon_table")
+      val rdd = spark.sparkContext.parallelize(1 to 3)
+              .map(x => Row("a" + x % 10, "b", x, "YWJj".getBytes()))
+      val customSchema = StructType(Array(
+        StructField("c1", StringType),
+        StructField("c2", StringType),
+        StructField("number", IntegerType),
+        StructField("c4", BinaryType)))
+
+      val df = spark.createDataFrame(rdd, customSchema);
+      // Saves dataFrame to carbondata file
+      df.write.format("carbondata")
+              .option("binary_decoder", "base64")
+              .option("tableName", "carbon_table")
+              .save()
+
+      val carbonDF = spark.read
+              .format("carbondata")
+              .option("tableName", "carbon_table")
+              .schema(customSchema)
+              .load()
+
+      assert(carbonDF.schema.map(_.name) === Seq("c1", "c2", "number", "c4"))
+      // "YWJj" is base64 decode data of "abc"
+      checkAnswer(carbonDF, Seq(Row("a1", "b", 1, "abc".getBytes()),
+        Row("a2", "b", 2, "abc".getBytes()),
+        Row("a3", "b", 3, "abc".getBytes())))
+
+      val carbonDF2 = carbonDF.drop("c1")
+      assert(carbonDF2.schema.map(_.name) === Seq("c2", "number", "c4"))
+    } catch {
+      case e: Exception =>
+        e.printStackTrace()
+        assert(false)
+    } finally {
+      sql("DROP TABLE IF EXISTS carbon_table")
+    }
+  }
+
   override def afterAll {
     dropTable
   }
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index b163ee9..d9461b2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.MetadataProcessException
-import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException, NoSuchDataMapException}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -266,18 +266,249 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     sql(s"drop table if exists $tableName")
   }
 
-  test("test if preaggregate load is successfull for hivemetastore") {
-    try {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
-      sql("DROP TABLE IF EXISTS maintable")
-      sql(
-        """
-          | CREATE TABLE maintable(id int, name string, city string, age int)
-          | STORED BY 'org.apache.carbondata.format'
-        """.stripMargin)
-      sql(
-        s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
+    test("test support bloomFilter on binary data type") {
+        val tableName = "datamapshowtest"
+        val datamapName = "bloomdatamap"
+        val datamapName2 = "bloomdatamap2"
+        sql(s"drop table if exists $tableName")
+        // for index datamap
+        sql(s"create table $tableName (a string, b string, c string, d binary, e binary) stored by 'carbondata'")
+
+        sql(s"insert into $tableName  values('a1','b1','c1','d1','e1')")
+        sql(s"insert into $tableName  values('a1','b2','c2','d1','e2')")
+        sql(s"insert into $tableName  values('a3','b3','c1','d2','e2')")
+        sql(
+            s"""
+               | create datamap $datamapName on table $tableName using 'bloomfilter'
+               | DMPROPERTIES ('index_columns'='d', 'bloom_size'='32000', 'bloom_fpp'='0.001')
+       """.stripMargin)
+        sql(
+            s"""
+               | create datamap $datamapName2 on table $tableName using 'bloomfilter'
+               | DMPROPERTIES ('index_columns'='e')
+       """.stripMargin)
+        var bloom1 = sql(s"select * from $tableName where d=cast('d1' as binary)")
+        assert(2 == bloom1.collect().length)
+        bloom1.collect().foreach { each =>
+            assert(5 == each.length)
+            assert("a1".equals(each.get(0)))
+            assert("d1".equals(new String(each.getAs[Array[Byte]](3))))
+            if ("b1".equals(each.get(1))) {
+                assert("c1".equals(each.get(2)))
+                assert("e1".equals(new String(each.getAs[Array[Byte]](4))))
+            } else if ("b2".equals(each.get(1))) {
+                assert("c2".equals(each.get(2)))
+                assert("e2".equals(new String(each.getAs[Array[Byte]](4))))
+            } else {
+                assert(false)
+            }
+        }
+
+        bloom1 = sql(s"select * from $tableName where d=cast('d1' as binary) and e=cast('e1' as binary)")
+        assert(1 == bloom1.collect().length)
+        bloom1.collect().foreach { each =>
+            assert(5 == each.length)
+            assert("a1".equals(each.get(0)))
+            assert("d1".equals(new String(each.getAs[Array[Byte]](3))))
+            if ("b1".equals(each.get(1))) {
+                assert("c1".equals(each.get(2)))
+                assert("e1".equals(new String(each.getAs[Array[Byte]](4))))
+            } else {
+                assert(false)
+            }
+        }
+
+        val result = sql(s"show datamap on table $tableName").cache()
+
+        checkAnswer(sql(s"show datamap on table $tableName"),
+            Seq(Row(datamapName, "bloomfilter", s"default.$tableName", "'bloom_fpp'='0.001', 'bloom_size'='32000', 'index_columns'='d'", "ENABLED", "NA"),
+                Row(datamapName2, "bloomfilter", s"default.$tableName", "'index_columns'='e'", "ENABLED", "NA")))
+        result.unpersist()
+        sql(s"drop table if exists $tableName")
+    }
+
+    test("test don't support timeseries on binary data type") {
+        val tableName = "datamapshowtest"
+        sql(s"drop table if exists $tableName")
+
+        // for timeseries datamap
+        sql(s"CREATE TABLE $tableName(mytime timestamp, name string, age int, image binary) STORED BY 'org.apache.carbondata.format'")
+        val e = intercept[MalformedCarbonCommandException] {
+            sql(
+                s"""
+                   | CREATE DATAMAP agg0_hour ON TABLE $tableName
+                   | USING 'timeSeries'
+                   | DMPROPERTIES (
+                   | 'EVENT_TIME'='image',
+                   | 'HOUR_GRANULARITY'='1')
+                   | AS SELECT image, SUM(age) FROM $tableName
+                   | GROUP BY image
+                """.stripMargin)
+        }
+        assert(e.getMessage.contains("Timeseries event time is only supported on Timestamp column"))
+    }
+
+    test("test support preaggregate on binary data type") {
+        val tableName = "datamapshowtest"
+        sql(s"drop table if exists $tableName")
+
+        // for preaggreate datamap, the property is empty
+        sql(s"CREATE TABLE $tableName(id int, name string, city string, age string, image binary)" +
+                s" STORED BY 'org.apache.carbondata.format'")
+
+        sql(s"insert into $tableName  values(1,'a3','b3','c1','image2')")
+        sql(s"insert into $tableName  values(2,'a3','b2','c2','image2')")
+        sql(s"insert into $tableName  values(3,'a1','b2','c1','image3')")
+        sql(
+            s"""
+               | CREATE DATAMAP agg0 ON TABLE $tableName USING 'preaggregate' AS
+               | SELECT name, image,
+               | count(age)
+               | FROM $tableName GROUP BY name,image
+               | """.stripMargin)
+        checkAnswer(sql(s"show datamap on table $tableName"),
+            Seq(Row("agg0", "preaggregate", s"default.${tableName}_agg0", "", "NA", "NA")))
+        val pre = sql(
+            s"""
+               | select name, image, count(age)
+               | from $tableName
+               | where name = 'a3' and image=cast('image2' as binary)
+               | GROUP BY name,image
+             """.stripMargin)
+        assert(1 == pre.collect().length)
+        pre.collect().foreach { each =>
+            assert(3 == each.length)
+            assert("a3".equals(each.get(0)))
+            assert("image2".equals(new String(each.getAs[Array[Byte]](1))))
+            assert(2 == each.get(2))
+        }
+
+        val preExplain = sql(
+            s"""
+               | explain extended select name, image, count(age)
+               | from $tableName
+               | where name = 'a3' and image=cast('image2' as binary)
+               | GROUP BY name,image
+             """.stripMargin)
+        assert(preExplain.collect()(0).getString(0).contains("datamapshowtest_agg0"))
+        sql(s"drop table if exists $tableName")
+    }
+
+    test("Create table and preaggregate and load data with binary column for hive: test encode with base64") {
+        val tableName = "carbontable"
+        sql(s"drop table if exists $tableName")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS $tableName (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    binaryField binary,
+               |    image boolean)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataBase64.csv'
+               | INTO TABLE $tableName
+               | OPTIONS('header'='false','DELIMITER'=',','binary_decoder'='baSe64')
+             """.stripMargin)
+
+        sql(
+            s"""
+               | CREATE DATAMAP agg0 ON TABLE $tableName USING 'preaggregate' AS
+               | SELECT name, binaryField,
+               | count(id)
+               | FROM $tableName GROUP BY name,binaryField
+               | """.stripMargin)
+        checkAnswer(sql(s"show datamap on table $tableName"),
+            Seq(Row("agg0", "preaggregate", s"default.${tableName}_agg0", "", "NA", "NA")))
+        val pre = sql(
+            s"""
+               | select name, binaryField, count(id)
+               | from $tableName
+               | where name = '2.png'
+               | GROUP BY name,binaryField
+             """.stripMargin)
+        assert(1 == pre.collect().length)
+        pre.collect().foreach { each =>
+            assert(3 == each.length)
+            assert("2.png".equals(each.get(0)))
+            assert((new String(each.getAs[Array[Byte]](1))).startsWith("�PNG"))
+            assert(1 == each.get(2))
+        }
+
+        val preExplain = sql(
+            s"""
+               | explain extended select name, binaryField, count(id)
+               | from $tableName
+               | where name = '2.png'
+               | GROUP BY name,binaryField
+             """.stripMargin)
+        assert(preExplain.collect()(0).getString(0).contains("carbontable_agg0"))
+    }
+
+    test("test don't support lucene on binary data type") {
+        val tableName = "datamapshowtest20"
+        sql(s"drop table if exists $tableName")
+
+        sql(s"CREATE TABLE $tableName(id int, name string, city string, age string, image binary)" +
+                s" STORED BY 'org.apache.carbondata.format'")
+
+        sql(s"insert into $tableName  values(1,'a3','b3','c1','image2')")
+        sql(s"insert into $tableName  values(2,'a3','b2','c2','image2')")
+        sql(s"insert into $tableName  values(3,'a1','b2','c1','image3')")
+        sql(
+            s"""
+               | CREATE DATAMAP agg10 ON TABLE $tableName USING 'lucene'
+               | DMProperties('INDEX_COLUMNS'='name')
+               | """.stripMargin)
+
+        checkAnswer(sql(s"show datamap on table $tableName"),
+            Seq(Row("agg10", "lucene", s"default.${tableName}", "'index_columns'='name'", "ENABLED", "NA")))
+
+        val e = intercept[MalformedDataMapCommandException] {
+            sql(
+                s"""
+                   | CREATE DATAMAP agg1 ON TABLE $tableName USING 'lucene'
+                   | DMProperties('INDEX_COLUMNS'='image')
+                   | """.stripMargin)
+        }
+        assert(e.getMessage.contains("Only String column is supported, column 'image' is BINARY type."))
+        checkAnswer(sql(s"show datamap on table $tableName"),
+            Seq(Row("agg10", "lucene", s"default.${tableName}", "'index_columns'='name'", "ENABLED", "NA")))
+
+        val pre = sql(
+            s"""
+               | select name,image, id
+               | from $tableName
+               | where name = 'a3'
+             """.stripMargin)
+
+        assert(2 == pre.collect().length)
+        pre.collect().foreach { each =>
+            assert(3 == each.length)
+            assert("a3".equals(each.get(0)))
+            assert("image2".equals(new String(each.getAs[Array[Byte]](1))))
+            assert(2 == each.get(2) || 1 == each.get(2))
+        }
+
+        sql(s"drop table if exists $tableName")
+    }
+
+    test("test if preaggregate load is successfull for hivemetastore") {
+        try {
+            CarbonProperties.getInstance()
+                    .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+            sql("DROP TABLE IF EXISTS maintable")
+            sql(
+                """
+                  | CREATE TABLE maintable(id int, name string, city string, age int)
+                  | STORED BY 'org.apache.carbondata.format'
+                """.stripMargin)
+            sql(
+                s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
 
           .stripMargin)
       sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 2b0b2cb..d0e4ba7 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -1130,6 +1130,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       "SORT_COLUMN_BOUNDS",
       "LOAD_MIN_SIZE_INMB",
       "SCALE_FACTOR",
+      "BINARY_DECODER",
       "SORT_SCOPE"
     )
     var isSupported = true
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
index 8bdb512..65c82fb 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -274,6 +274,10 @@ object CarbonSparkDataSourceUtil {
     }
     builder.uniqueIdentifier(System.currentTimeMillis())
     val model = builder.buildLoadModel(schema)
+    val binary_decoder = options.get("binary_decoder").map(_.toString.toLowerCase())
+    if (binary_decoder.isDefined) {
+      model.setBinaryDecoder(String.valueOf(binary_decoder.get))
+    }
     val tableInfo = model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
     val properties =
       tableInfo.getFactTable.getTableProperties
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
index 064efc2..bdfc9dd 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
@@ -105,7 +105,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
     test("Don't support sort_columns") {
         import spark._
         sql("DROP TABLE IF EXISTS binaryTable")
-        val exception = intercept[Exception] {
+        var exception = intercept[Exception] {
             sql(
                 s"""
                    | CREATE TABLE binaryTable (
@@ -116,7 +116,40 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
                    |    autoLabel BOOLEAN)
                    | using carbon
                    | options('SORT_COLUMNS'='image')
-       """.stripMargin)
+            """.stripMargin)
+            // TODO: it should throw exception when create table
+            sql("SELECT COUNT(*) FROM binaryTable").show()
+        }
+        assert(exception.getCause.getMessage.contains("sort columns not supported for array, struct, map, double, float, decimal, varchar, binary"))
+
+        sql("DROP TABLE IF EXISTS binaryTable")
+        exception = intercept[Exception] {
+            sql(
+                s"""
+                   | CREATE TABLE binaryTable
+                   | using carbon
+                   | options('SORT_COLUMNS'='image')
+                   | LOCATION '$writerPath'
+                """.stripMargin)
+            sql("SELECT COUNT(*) FROM binaryTable").show()
+        }
+        assert(exception.getMessage.contains("Cannot use sort columns during infer schema"))
+
+
+        sql("DROP TABLE IF EXISTS binaryTable")
+        exception = intercept[Exception] {
+            sql(
+                s"""
+                   | CREATE TABLE binaryTable (
+                   |    id DOUBLE,
+                   |    label BOOLEAN,
+                   |    name STRING,
+                   |    image BINARY,
+                   |    autoLabel BOOLEAN)
+                   | using carbon
+                   | options('SORT_COLUMNS'='image')
+                   | LOCATION '$writerPath'
+                 """.stripMargin)
             sql("SELECT COUNT(*) FROM binaryTable").show()
         }
         assert(exception.getCause.getMessage.contains("sort columns not supported for array, struct, map, double, float, decimal, varchar, binary"))
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 19cf99f..56a15bd 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.carbondata.datasource.TestUtil._
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField => SparkStructField, StructType}
+import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType, StructField => SparkStructField, StructType}
 import org.apache.spark.util.SparkUtil
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
@@ -1805,6 +1805,73 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS binaryCarbon")
   }
 
+    test("test load data with binary_decoder in df") {
+        import spark._
+        try {
+            sql("DROP TABLE IF EXISTS carbon_table")
+            val rdd = spark.sparkContext.parallelize(1 to 3)
+                    .map(x => Row("a" + x % 10, "b", x, "YWJj".getBytes()))
+            val customSchema = StructType(Array(
+                SparkStructField("c1", StringType),
+                SparkStructField("c2", StringType),
+                SparkStructField("number", IntegerType),
+                SparkStructField("c4", BinaryType)))
+
+            val df = spark.createDataFrame(rdd, customSchema);
+            // Saves dataFrame to carbon file
+            df.write.format("carbon")
+                    .option("binary_decoder", "base64")
+                    .saveAsTable("carbon_table")
+            val path = warehouse1 + "/carbon_table"
+
+            val carbonDF = spark.read
+                    .format("carbon")
+                    .option("tablename", "carbon_table")
+                    .schema(customSchema)
+                    .load(path)  // TODO: check why can not read when without path
+            assert(carbonDF.schema.map(_.name) === Seq("c1", "c2", "number", "c4"))
+            // "YWJj" is base64 decode data of "abc" string,
+            // but spark doesn't support string for binary, so we use byte[] and
+            // carbon will not decode for byte
+            checkAnswer(carbonDF, Seq(Row("a1", "b", 1, "YWJj".getBytes()),
+                Row("a2", "b", 2, "YWJj".getBytes()),
+                Row("a3", "b", 3, "YWJj".getBytes())))
+
+            val carbonDF2 = carbonDF.drop("c1")
+            assert(carbonDF2.schema.map(_.name) === Seq("c2", "number", "c4"))
+            checkAnswer(sql(s"select * from carbon.`$path`"),
+                Seq(Row("a1", "b", 1, "YWJj".getBytes()),
+                    Row("a2", "b", 2, "YWJj".getBytes()),
+                    Row("a3", "b", 3, "YWJj".getBytes())))
+        } catch {
+            case e: Exception =>
+                e.printStackTrace()
+                assert(false)
+        } finally {
+            sql("DROP TABLE IF EXISTS carbon_table")
+        }
+    }
+
+    test("test spark doesn't support input string value for binary data type") {
+        try {
+            val rdd = spark.sparkContext.parallelize(1 to 3)
+                    .map(x => Row("a" + x % 10, "b", x, "YWJj".getBytes()))
+            val customSchema = StructType(Array(
+                SparkStructField("c1", StringType),
+                SparkStructField("c2", StringType),
+                SparkStructField("number", IntegerType),
+                SparkStructField("c4", BinaryType)))
+
+            try {
+                spark.createDataFrame(rdd, customSchema);
+            } catch {
+                case e: RuntimeException => e.getMessage.contains(
+                    "java.lang.String is not a valid external type for schema of binary")
+            }
+
+        }
+    }
+
   override protected def beforeAll(): Unit = {
     drop
     createParquetTable
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 8885f4a..6e7daaa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -74,6 +74,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
       case DateType => CarbonType.DATE.getName
       case decimal: DecimalType => s"decimal(${decimal.precision}, ${decimal.scale})"
       case BooleanType => CarbonType.BOOLEAN.getName
+      case BinaryType => CarbonType.BINARY.getName
       case other => CarbonException.analysisException(s"unsupported type: $other")
     }
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 6fe89a2..2cb3895 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -248,6 +248,8 @@ public final class DataLoadProcessBuilder {
         loadModel.getGlobalSortPartitions());
     configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
         loadModel.getBadRecordsLocation());
+    configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER,
+        loadModel.getBinaryDecoder());
 
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getTableName());
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
index 766cfeb..b3a216b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/BinaryFieldConverterImpl.java
@@ -16,16 +16,14 @@
  */
 package org.apache.carbondata.processing.loading.converter.impl;
 
-import java.nio.charset.Charset;
-
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.converter.FieldConverter;
+import org.apache.carbondata.processing.loading.converter.impl.binary.BinaryDecoder;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 
 import org.apache.log4j.Logger;
@@ -43,33 +41,35 @@ public class BinaryFieldConverterImpl implements FieldConverter {
   private String nullformat;
   private boolean isEmptyBadRecord;
   private DataField dataField;
+  private BinaryDecoder binaryDecoder;
   public BinaryFieldConverterImpl(DataField dataField, String nullformat, int index,
-      boolean isEmptyBadRecord) {
+      boolean isEmptyBadRecord,BinaryDecoder binaryDecoder) {
     this.dataType = dataField.getColumn().getDataType();
     this.dimension = (CarbonDimension) dataField.getColumn();
     this.nullformat = nullformat;
     this.index = index;
     this.isEmptyBadRecord = isEmptyBadRecord;
     this.dataField = dataField;
+    this.binaryDecoder = binaryDecoder;
   }
 
   @Override
   public void convert(CarbonRow row, BadRecordLogHolder logHolder)
       throws CarbonDataLoadingException {
-    if (row.getObject(index) instanceof String) {
-      row.update((((String) row.getObject(index)))
-          .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)), index);
-    } else if (row.getObject(index) instanceof byte[]) {
-      row.update(row.getObject(index), index);
-    } else {
-      throw new CarbonDataLoadingException("Binary only support String and byte[] data type");
-    }
+    row.update(convert(row.getObject(index), logHolder), index);
   }
 
   @Override
   public Object convert(Object value, BadRecordLogHolder logHolder)
       throws RuntimeException {
-    return null;
+    if (value instanceof String) {
+      return binaryDecoder.decode((String) value);
+    } else if (value instanceof byte[]) {
+      return value;
+    } else {
+      throw new CarbonDataLoadingException("Binary only support String and byte[] data type," +
+          " binary decoder only support Base64, Hex or no decode for string");
+    }
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index a6c61b4..6fa32a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
@@ -39,6 +40,13 @@ import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.converter.FieldConverter;
+import org.apache.carbondata.processing.loading.converter.impl.binary.Base64BinaryDecoder;
+import org.apache.carbondata.processing.loading.converter.impl.binary.BinaryDecoder;
+import org.apache.carbondata.processing.loading.converter.impl.binary.DefaultBinaryDecoder;
+import org.apache.carbondata.processing.loading.converter.impl.binary.HexBinaryDecoder;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+import org.apache.commons.lang3.StringUtils;
 
 public class FieldEncoderFactory {
 
@@ -76,6 +84,35 @@ public class FieldEncoderFactory {
       DictionaryClient client, Boolean useOnePass, Map<Object, Integer> localCache,
       boolean isEmptyBadRecord, String parentTablePath, boolean isConvertToBinary)
       throws IOException {
+    return createFieldEncoder(dataField, absoluteTableIdentifier, index, nullFormat, client,
+        useOnePass, localCache, isEmptyBadRecord, parentTablePath, isConvertToBinary,
+        CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_DEFAULT);
+  }
+
+
+  /**
+   * Creates the FieldConverter for all dimensions, for measures return null.
+   *
+   * @param dataField               column schema
+   * @param absoluteTableIdentifier table identifier
+   * @param index                   index of column in the row
+   * @param nullFormat              null format of the field
+   * @param client                  Dictionary Client
+   * @param useOnePass              whether use OnePass
+   * @param localCache              local Cache
+   * @param isEmptyBadRecord        whether is Empty BadRecord
+   * @param parentTablePath         parent tabel path
+   * @param isConvertToBinary       whether the no dictionary field to be converted to binary or not
+   * @param binaryDecoder           carbon binary decoder for loading data
+   * @return
+   * @throws IOException
+   */
+  public FieldConverter createFieldEncoder(DataField dataField,
+      AbsoluteTableIdentifier absoluteTableIdentifier, int index, String nullFormat,
+      DictionaryClient client, Boolean useOnePass, Map<Object, Integer> localCache,
+      boolean isEmptyBadRecord, String parentTablePath,
+      boolean isConvertToBinary, String binaryDecoder)
+      throws IOException {
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimension()) {
       if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
@@ -120,7 +157,22 @@ public class FieldEncoderFactory {
             createComplexDataType(dataField, absoluteTableIdentifier,
                 client, useOnePass, localCache, index, nullFormat, isEmptyBadRecord), index);
       } else if (dataField.getColumn().getDataType() == DataTypes.BINARY) {
-        return new BinaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
+        BinaryDecoder binaryDecoderObject = null;
+        if (binaryDecoder.equalsIgnoreCase(
+            CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64)) {
+          binaryDecoderObject = new Base64BinaryDecoder();
+        } else if (binaryDecoder.equalsIgnoreCase(
+            CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_HEX)) {
+          binaryDecoderObject = new HexBinaryDecoder();
+        } else if (!StringUtils.isBlank(binaryDecoder)) {
+          throw new CarbonDataLoadingException("Binary decoder only support Base64, " +
+              "Hex or no decode for string, don't support " + binaryDecoder);
+        } else {
+          binaryDecoderObject = new DefaultBinaryDecoder();
+        }
+
+        return new BinaryFieldConverterImpl(dataField, nullFormat,
+            index, isEmptyBadRecord, binaryDecoderObject);
       } else {
         // if the no dictionary column is a numeric column and no need to convert to binary
         // then treat it is as measure col
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index ac9413c..0d2868e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.core.dictionary.service.DictionaryOnePassService;
@@ -106,7 +107,9 @@ public class RowConverterImpl implements RowConverter {
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat, client,
               configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord,
-              configuration.getParentTablePath(), isConvertToBinary);
+              configuration.getParentTablePath(), isConvertToBinary,
+              (String) configuration.getDataLoadProperty(
+                  CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER));
       fieldConverterList.add(fieldConverter);
     }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
@@ -227,7 +230,9 @@ public class RowConverterImpl implements RowConverter {
         fieldConverter = FieldEncoderFactory.getInstance()
             .createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat,
                 client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord,
-                configuration.getParentTablePath(), isConvertToBinary);
+                configuration.getParentTablePath(), isConvertToBinary,
+                (String) configuration.getDataLoadProperty(
+                    CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER));
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/Base64BinaryDecoder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/Base64BinaryDecoder.java
new file mode 100644
index 0000000..382a76e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/Base64BinaryDecoder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter.impl.binary;
+
+import java.nio.charset.Charset;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+import org.apache.commons.codec.binary.Base64;
+
+public class Base64BinaryDecoder implements BinaryDecoder {
+  @Override
+  public byte[] decode(String input) {
+    byte[] parsedValue = (String.valueOf(input))
+        .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+    if (Base64.isArrayByteBase64(parsedValue)) {
+      parsedValue = Base64.decodeBase64(parsedValue);
+    } else {
+      throw new CarbonDataLoadingException("Binary decoder is " +
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64
+          + ", but data is not base64");
+    }
+    return parsedValue;
+  }
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/BinaryDecoder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/BinaryDecoder.java
new file mode 100644
index 0000000..8237ac2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/BinaryDecoder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter.impl.binary;
+
+public interface BinaryDecoder {
+  /**
+   * this method will decode string to byte[] by different decoder, like Hex, Base64
+   * Default decoder is convert string to bytes
+   *
+   * @param input String value
+   * @return byte[] of binary
+   */
+  byte[] decode(String input);
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/DefaultBinaryDecoder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/DefaultBinaryDecoder.java
new file mode 100644
index 0000000..44b3b2b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/DefaultBinaryDecoder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter.impl.binary;
+
+import java.nio.charset.Charset;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * Default decoder is convert string to bytes
+ */
+public class DefaultBinaryDecoder implements BinaryDecoder {
+  @Override
+  public byte[] decode(String input) {
+    return input.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+  }
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/HexBinaryDecoder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/HexBinaryDecoder.java
new file mode 100644
index 0000000..b383d67
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/binary/HexBinaryDecoder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter.impl.binary;
+
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+
+public class HexBinaryDecoder implements BinaryDecoder {
+  @Override
+  public byte[] decode(String input) {
+    try {
+      return Hex.decodeHex((input).toCharArray());
+    } catch (DecoderException e) {
+      throw new CarbonDataLoadingException("Binary decode hex String failed,", e);
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 1ce8aae..139fd2f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -235,6 +235,11 @@ public class CarbonLoadModel implements Serializable {
   private String columnCompressor;
 
   /**
+   * carbon binary decoder for loading data
+   */
+  private String binaryDecoder;
+
+  /**
    * the total size of loading data
    */
   private long totalSize;
@@ -488,6 +493,7 @@ public class CarbonLoadModel implements Serializable {
     copy.parentTablePath = parentTablePath;
     copy.sdkWriterCores = sdkWriterCores;
     copy.columnCompressor = columnCompressor;
+    copy.binaryDecoder = binaryDecoder;
     copy.rangePartitionColumn = rangePartitionColumn;
     copy.scaleFactor = scaleFactor;
     copy.totalSize = totalSize;
@@ -547,6 +553,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.parentTablePath = parentTablePath;
     copyObj.sdkWriterCores = sdkWriterCores;
     copyObj.columnCompressor = columnCompressor;
+    copyObj.binaryDecoder = binaryDecoder;
     copyObj.rangePartitionColumn = rangePartitionColumn;
     copyObj.scaleFactor = scaleFactor;
     copyObj.totalSize = totalSize;
@@ -960,6 +967,14 @@ public class CarbonLoadModel implements Serializable {
     this.columnCompressor = columnCompressor;
   }
 
+  public String getBinaryDecoder() {
+    return binaryDecoder;
+  }
+
+  public void setBinaryDecoder(String binaryDecoder) {
+    this.binaryDecoder = binaryDecoder;
+  }
+
   public CarbonColumn getRangePartitionColumn() {
     return rangePartitionColumn;
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 7abd573..907f3ec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.constants.SortScopeOptions;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -109,6 +110,7 @@ public class CarbonLoadModelBuilder {
       throw new InvalidLoadOptionException(e.getMessage());
     }
     validateAndSetColumnCompressor(model);
+    validateAndSetBinaryDecoder(model);
     return model;
   }
 
@@ -224,6 +226,8 @@ public class CarbonLoadModelBuilder {
       }
     }
 
+    String binaryDecoder = options.get("binary_decoder");
+    carbonLoadModel.setBinaryDecoder(binaryDecoder);
     carbonLoadModel.setTimestampformat(timestampformat);
     carbonLoadModel.setDateFormat(dateFormat);
     carbonLoadModel.setDefaultTimestampFormat(
@@ -300,6 +304,7 @@ public class CarbonLoadModelBuilder {
     validateAndSetLoadMinSize(carbonLoadModel);
 
     validateAndSetColumnCompressor(carbonLoadModel);
+    validateAndSetBinaryDecoder(carbonLoadModel);
 
     validateRangeColumn(optionsFinal, carbonLoadModel);
   }
@@ -426,6 +431,18 @@ public class CarbonLoadModelBuilder {
     }
   }
 
+
+  private void validateAndSetBinaryDecoder(CarbonLoadModel carbonLoadModel) {
+    String binaryDecoder = carbonLoadModel.getBinaryDecoder();
+    if (StringUtils.isBlank(binaryDecoder)) {
+      binaryDecoder = CarbonProperties.getInstance().getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_DEFAULT);
+    }
+    // check and load binary decoder
+    carbonLoadModel.setBinaryDecoder(binaryDecoder);
+  }
+
   /**
    * check whether using default value or not
    */
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 29bd0c6..1588005 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -424,6 +425,14 @@ public final class CarbonLoaderUtil {
         escapeChar.equalsIgnoreCase(BACKSPACE.getName());
   }
 
+  public static boolean isValidBinaryDecoder(String binaryDecoderChar) {
+    return binaryDecoderChar.equalsIgnoreCase(
+        CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64) ||
+        binaryDecoderChar.equalsIgnoreCase(
+            CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_HEX) ||
+        StringUtils.isBlank(binaryDecoderChar);
+  }
+
   public static String getEscapeChar(String escapeCharacter) {
     if (escapeCharacter.equalsIgnoreCase(NEW_LINE.getName())) {
       return NEW_LINE.getEscapeChar();
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 7569926..d29ccf7 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -196,7 +196,8 @@ public class CarbonWriterBuilder {
           !option.equalsIgnoreCase("complex_delimiter_level_2") &&
           !option.equalsIgnoreCase("complex_delimiter_level_3") &&
           !option.equalsIgnoreCase("quotechar") &&
-          !option.equalsIgnoreCase("escapechar")) {
+          !option.equalsIgnoreCase("escapechar") &&
+          !option.equalsIgnoreCase("binary_decoder")) {
         throw new IllegalArgumentException("Unsupported option:" + option
             + ". Refer method header or documentation");
       }
@@ -228,6 +229,13 @@ public class CarbonWriterBuilder {
         if (escapeChar.length() > 1 && !CarbonLoaderUtil.isValidEscapeSequence(escapeChar)) {
           throw new IllegalArgumentException("ESCAPECHAR cannot be more than one character.");
         }
+      } else if (entry.getKey().toLowerCase().equalsIgnoreCase("binary_decoder")) {
+        String binaryDecoderChar = entry.getValue();
+        if (binaryDecoderChar.length() > 1 &&
+            !CarbonLoaderUtil.isValidBinaryDecoder(binaryDecoderChar)) {
+          throw new IllegalArgumentException("Binary decoder only support Base64, " +
+              "Hex or no decode for string, don't support " + binaryDecoderChar);
+        }
       }
     }
 
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
index 3bfea26..30a881b 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
@@ -64,13 +64,16 @@ public class ImageTest extends TestCase {
     } catch (IOException e) {
       e.printStackTrace();
     }
-    Field[] fields = new Field[5];
+    Field[] fields = new Field[7];
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
     fields[2] = new Field("image1", DataTypes.BINARY);
     fields[3] = new Field("image2", DataTypes.BINARY);
     fields[4] = new Field("image3", DataTypes.BINARY);
-
+    fields[5] = new Field("decodeString", DataTypes.BINARY);
+    fields[6] = new Field("decodeByte", DataTypes.BINARY);
+    String[] projection = new String[]{"name", "age", "image1",
+        "image2", "image3", "decodeString", "decodeByte"};
     byte[] originBinary = null;
 
     // read and write image data
@@ -81,6 +84,7 @@ public class ImageTest extends TestCase {
           .withCsvInput(new Schema(fields))
           .writtenBy("SDKS3Example")
           .withPageSizeInMb(1)
+          .withLoadOption("binary_decoder", "base64")
           .build();
 
       for (int i = 0; i < rows; i++) {
@@ -90,7 +94,8 @@ public class ImageTest extends TestCase {
         while ((bis.read(originBinary)) != -1) {
         }
         // write data
-        writer.write(new Object[]{"robot" + (i % 10), i, originBinary, originBinary, originBinary});
+        writer.write(new Object[]{"robot" + (i % 10), i, originBinary,
+            originBinary, originBinary, "YWJj", "YWJj".getBytes()});
         bis.close();
       }
       writer.close();
@@ -98,6 +103,7 @@ public class ImageTest extends TestCase {
 
     CarbonReader reader = CarbonReader
         .builder(path, "_temp")
+        .projection(projection)
         .build();
 
     System.out.println("\nData:");
@@ -105,19 +111,27 @@ public class ImageTest extends TestCase {
     while (i < 20 && reader.hasNext()) {
       Object[] row = (Object[]) reader.readNextRow();
 
-      byte[] outputBinary = (byte[]) row[1];
-      byte[] outputBinary2 = (byte[]) row[2];
-      byte[] outputBinary3 = (byte[]) row[3];
+      byte[] outputBinary = (byte[]) row[2];
+      byte[] outputBinary2 = (byte[]) row[3];
+      byte[] outputBinary3 = (byte[]) row[4];
+      String stringValue = new String((byte[]) row[5]);
+      String byteValue = new String((byte[]) row[6]);
+      // when input is string, it will be decoded by base64.
+      Assert.assertTrue("abc".equals(stringValue));
+      // when input is byte[], it will be not decoded by base64.
+      Assert.assertTrue("YWJj".equals(byteValue));
       System.out.println(row[0] + " " + row[1] + " image1 size:" + outputBinary.length
-          + " image2 size:" + outputBinary2.length + " image3 size:" + outputBinary3.length);
+          + " image2 size:" + outputBinary2.length + " image3 size:" + outputBinary3.length
+          + "\t" + stringValue + "\t" + byteValue);
 
       for (int k = 0; k < 3; k++) {
 
-        byte[] originBinaryTemp = (byte[]) row[1 + k];
+        byte[] originBinaryTemp = (byte[]) row[2 + k];
         // validate output binary data and origin binary data
         assert (originBinaryTemp.length == outputBinary.length);
         for (int j = 0; j < originBinaryTemp.length; j++) {
           assert (originBinaryTemp[j] == outputBinary[j]);
+          assert (originBinary[j] == outputBinary[j]);
         }
 
         // save image, user can compare the save image and original image
@@ -657,6 +671,84 @@ public class ImageTest extends TestCase {
     assertEquals(3, result.size());
   }
 
+  @Test
+  public void testWriteNonBase64WithBase64Decoder() throws IOException, InvalidLoadOptionException, InterruptedException {
+    String imagePath = "./src/test/resources/image/carbondatalogo.jpg";
+    int num = 1;
+    int rows = 10;
+    String path = "./target/binary";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[7];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("image1", DataTypes.BINARY);
+    fields[3] = new Field("image2", DataTypes.BINARY);
+    fields[4] = new Field("image3", DataTypes.BINARY);
+    fields[5] = new Field("decodeString", DataTypes.BINARY);
+    fields[6] = new Field("decodeByte", DataTypes.BINARY);
+    byte[] originBinary = null;
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      CarbonWriter writer = CarbonWriter
+          .builder()
+          .outputPath(path)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("SDKS3Example")
+          .withPageSizeInMb(1)
+          .withLoadOption("binary_decoder", "base64")
+          .build();
+
+      for (int i = 0; i < rows; i++) {
+        // read image and encode to Hex
+        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(imagePath));
+        originBinary = new byte[bis.available()];
+        while ((bis.read(originBinary)) != -1) {
+        }
+        // write data
+        writer.write(new Object[]{"robot" + (i % 10), i, originBinary,
+            originBinary, originBinary, "^YWJj", "^YWJj".getBytes()});
+        bis.close();
+      }
+      try {
+        writer.close();
+        Assert.assertTrue(false);
+      } catch (Exception e) {
+        Assert.assertTrue(e.getMessage().contains("Binary decoder is base64, but data is not base64"));
+      }
+    }
+  }
+
+  public void testInvalidValueForBinaryDecoder() throws IOException, InvalidLoadOptionException {
+    String path = "./target/binary";
+    Field[] fields = new Field[7];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("image1", DataTypes.BINARY);
+    fields[3] = new Field("image2", DataTypes.BINARY);
+    fields[4] = new Field("image3", DataTypes.BINARY);
+    fields[5] = new Field("decodeString", DataTypes.BINARY);
+    fields[6] = new Field("decodeByte", DataTypes.BINARY);
+    try {
+      CarbonWriter
+          .builder()
+          .outputPath(path)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("SDKS3Example")
+          .withPageSizeInMb(1)
+          .withLoadOption("binary_decoder", "base")
+          .build();
+      Assert.assertTrue(false);
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "Binary decoder only support Base64, Hex or no decode for string, don't support base"));
+    }
+  }
+
   public void binaryToCarbonWithHWD(String sourceImageFolder, String outputPath, String preDestPath,
                                     String sufAnnotation, final String sufImage, int numToWrite)
       throws Exception {