You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/10/31 07:00:09 UTC

[11/22] carbondata git commit: [CARBONDATA-1444] Support Boolean data type

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
new file mode 100644
index 0000000..c19b0cd
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.booleantype
+
+import java.io.File
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class BooleanDataTypesLoadTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  val rootPath = new File(this.getClass.getResource("/").getPath
+    + "../../../..").getCanonicalPath
+
+  override def beforeEach(): Unit = {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+    sql("drop table if exists boolean_table3")
+    sql("drop table if exists boolean_table4")
+    sql("drop table if exists badRecords")
+    sql("CREATE TABLE if not exists carbon_table(booleanField BOOLEAN) STORED BY 'carbondata'")
+    sql(
+      s"""
+         | CREATE TABLE badRecords(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | decimalField DECIMAL(18,2),
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+    sql("drop table if exists boolean_table3")
+    sql("drop table if exists boolean_table4")
+    sql("drop table if exists badRecords")
+  }
+
+  test("Loading table: support boolean data type format") {
+    val fileLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanOnlyBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$fileLocation'
+         | INTO TABLE carbon_table
+         | OPTIONS('FILEHEADER' = 'booleanField')
+       """.stripMargin)
+
+    checkAnswer(sql("select * from carbon_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select * from carbon_table"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(false), Row(false), Row(false), Row(false), Row(null), Row(null), Row(null)))
+  }
+
+  test("Loading table: support boolean data type format, different format") {
+    val fileLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanDifferentFormat.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$fileLocation'
+         | INTO TABLE carbon_table
+         | OPTIONS('FILEHEADER' = 'booleanField')
+       """.stripMargin)
+
+    checkAnswer(sql("select * from carbon_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select * from carbon_table"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(false), Row(false), Row(false), Row(false)
+        , Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null)))
+  }
+
+  test("Loading table: support boolean and other data type") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+  }
+
+  test("Loading table: data columns is less than table defined columns") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table"),
+      Seq(Row(true, 10, null), Row(false, 17, null), Row(false, 11, null),
+        Row(true, 10, null), Row(true, 10, null), Row(true, 14, null),
+        Row(false, 10, null), Row(false, 10, null), Row(false, 16, null), Row(false, 10, null))
+    )
+  }
+
+  test("Loading table: support boolean and other data type, data columns bigger than table defined columns") {
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+  }
+
+  test("Loading table: support boolean and other data type, with file header") {
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanWithFileHeader.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+  }
+
+  test("Loading table: create with DICTIONARY_EXCLUDE, TABLE_BLOCKSIZE, NO_INVERTED_INDEX, SORT_SCOPE") {
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_EXCLUDE'='charField','TABLE_BLOCKSIZE'='512','NO_INVERTED_INDEX'='charField', 'SORT_SCOPE'='GLOBAL_SORT')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(sql("select booleanField from boolean_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = true"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
+      Row(0))
+
+    checkAnswer(sql(
+      s"""
+         |select count(*)
+         |from boolean_table where booleanField = \"true\"
+         |""".stripMargin),
+      Row(0))
+
+    checkAnswer(sql("select booleanField from boolean_table where booleanField = false"),
+      Seq(Row(false), Row(false), Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = false"),
+      Row(6))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = 'false'"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = null"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
+      Row(10))
+  }
+
+  test("Loading table: load with DELIMITER, QUOTECHAR, COMMENTCHAR, MULTILINE, ESCAPECHAR, COMPLEX_DELIMITER_LEVEL_1, SINGLE_PASS") {
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_EXCLUDE'='charField','TABLE_BLOCKSIZE'='512','NO_INVERTED_INDEX'='charField', 'SORT_SCOPE'='GLOBAL_SORT')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanWithFileHeader.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('DELIMITER'=',','QUOTECHAR'='"','COMMENTCHAR'='#','MULTILINE'='true','ESCAPECHAR'='\','COMPLEX_DELIMITER_LEVEL_1'='#','COMPLEX_DELIMITER_LEVEL_2'=':','SINGLE_PASS'='TRUE')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(sql("select booleanField from boolean_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = true"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
+      Row(0))
+
+    checkAnswer(sql(
+      s"""
+         |select count(*)
+         |from boolean_table where booleanField = \"true\"
+         |""".stripMargin),
+      Row(0))
+
+    checkAnswer(sql("select booleanField from boolean_table where booleanField = false"),
+      Seq(Row(false), Row(false), Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = false"),
+      Row(6))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = 'false'"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = null"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
+      Row(10))
+  }
+
+  test("Loading table: bad_records_action is FORCE") {
+    val fileLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanDifferentFormat.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$fileLocation'
+         | INTO TABLE carbon_table
+         | OPTIONS('FILEHEADER' = 'booleanField','bad_records_logger_enable'='true','bad_records_action'='FORCE')
+       """.stripMargin)
+
+    checkAnswer(sql("select * from carbon_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select * from carbon_table"),
+      Seq(Row(true), Row(true), Row(true), Row(true),
+        Row(false), Row(false), Row(false), Row(false),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null)))
+  }
+
+  test("Loading table: bad_records_action is FORCE, support boolean and other data type") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE badRecords
+         | options('bad_records_logger_enable'='true','bad_records_action'='FORCE','FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from badRecords"),
+      Seq(Row(true, 10, true), Row(null, 17, true), Row(null, 11, true),
+        Row(null, 10, true), Row(null, 10, true), Row(true, 14, null),
+        Row(false, 10, null), Row(false, 10, null), Row(false, 16, null), Row(false, 10, false))
+    )
+  }
+
+  test("Loading table: bad_records_action is IGNORE, support boolean and other data type") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE badRecords
+         | options('bad_records_logger_enable'='true','bad_records_action'='IGNORE','FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from badRecords"),
+      Seq(Row(true, 10, true), Row(false, 10, false))
+    )
+  }
+
+  test("Loading table: bad_records_action is REDIRECT, support boolean and other data type") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE badRecords
+         | options('bad_records_logger_enable'='true','bad_records_action'='REDIRECT','FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from badRecords"),
+      Seq(Row(true, 10, true), Row(false, 10, false))
+    )
+  }
+
+  test("Loading table: bad_records_action is FAIL") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv"
+    val exception_insert: Exception = intercept[Exception] {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '${storeLocation}'
+           | INTO TABLE badRecords
+           | options('bad_records_logger_enable'='true','bad_records_action'='FAIL','FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    }
+    assert(exception_insert.getMessage.contains("The value with column name booleanfield and column data type BOOLEAN is not a valid BOOLEAN type"))
+  }
+
+  test("Loading overwrite: into and then overwrite table with another table: support boolean data type and other format") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table3(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | stringField STRING,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE boolean_table4(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | stringField STRING,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='')
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | overwrite INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert overwrite table boolean_table2 select * from boolean_table")
+    sql("insert overwrite table boolean_table3 select shortField,booleanField,intField,stringField,booleanField2 from boolean_table")
+    sql("insert overwrite table boolean_table4 select shortField,booleanField,intField,stringField,booleanField2 from boolean_table where shortField > 3")
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table2"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table3"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table4"),
+      Seq(Row(false, 17), Row(false, 16))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table2"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table3"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+  }
+
+  test("Loading overwrite: support boolean data type format, different format") {
+    val fileLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanDifferentFormat.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$fileLocation'
+         | INTO TABLE carbon_table
+         | OPTIONS('FILEHEADER' = 'booleanField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$fileLocation'
+         | OVERWRITE INTO TABLE carbon_table
+         | OPTIONS('FILEHEADER' = 'booleanField')
+       """.stripMargin)
+
+
+    checkAnswer(sql("select * from carbon_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select * from carbon_table"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(false), Row(false), Row(false), Row(false)
+        , Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null)))
+  }
+
+  test("Loading overwrite: support boolean and other data type") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | OVERWRITE INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+  }
+
+  test("Comparing table: support boolean and other data type") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert into boolean_table2 select * from boolean_table where shortField = 1 and booleanField = true")
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table2"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table where exists (select booleanField,intField,booleanField2 " +
+        "from boolean_table2 where boolean_table.intField=boolean_table2.intField)"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true), Row(false, 10, false), Row(false, 10, false), Row(false, 10, false))
+    )
+  }
+
+  test("Loading table: unsafe, support boolean and other data type") {
+    initConf
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+    sql("drop table if exists boolean_table")
+    defaultConf
+  }
+
+  test("Loading table: unsafe, bad_records_action is IGNORE, support boolean and other data type") {
+    initConf
+    sql("drop table if exists badRecords")
+    sql(
+      s"""
+         | CREATE TABLE badRecords(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | decimalField DECIMAL(18,2),
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE badRecords
+         | options('bad_records_logger_enable'='true','bad_records_action'='IGNORE','FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from badRecords"),
+      Seq(Row(true, 10, true), Row(false, 10, false))
+    )
+    sql("drop table if exists badRecords")
+    defaultConf()
+  }
+
+  def initConf(): Unit ={
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
+        "true")
+  }
+
+  def defaultConf(): Unit ={
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
+        CarbonCommonConstants.ENABLE_DATA_LOADING_STATISTICS_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesParameterTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesParameterTest.scala
new file mode 100644
index 0000000..07c37b6
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesParameterTest.scala
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.booleantype
+
+import java.io.File
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class BooleanDataTypesParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  val filePath: String = s"$resourcesPath/globalsort"
+  val file1: String = resourcesPath + "/globalsort/sample1.csv"
+  val file2: String = resourcesPath + "/globalsort/sample2.csv"
+  val file3: String = resourcesPath + "/globalsort/sample3.csv"
+
+  override def beforeEach(): Unit = {
+    sql("drop table if exists boolean_one_column")
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""CREATE TABLE if not exists boolean_one_column(
+         |booleanField BOOLEAN)
+         |STORED BY 'carbondata'
+         |""".stripMargin)
+  }
+
+  val rootPath = new File(this.getClass.getResource("/").getPath
+    + "../../../..").getCanonicalPath
+
+  override def beforeAll(): Unit = {
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists boolean_one_column")
+    sql("drop table if exists boolean_table")
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: false, and Inserting and selecting table: one column boolean and many rows, should support") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+    sql("insert into boolean_one_column values(true)")
+    sql("insert into boolean_one_column values(True)")
+    sql("insert into boolean_one_column values(TRUE)")
+    sql("insert into boolean_one_column values('true')")
+    sql("insert into boolean_one_column values(False)")
+    sql("insert into boolean_one_column values(false)")
+    sql("insert into boolean_one_column values(FALSE)")
+    sql("insert into boolean_one_column values('false')")
+    sql("insert into boolean_one_column values('tr')")
+    sql("insert into boolean_one_column values(null)")
+    sql("insert into boolean_one_column values('truEe')")
+    sql("insert into boolean_one_column values('falsEe')")
+    sql("insert into boolean_one_column values('t')")
+    sql("insert into boolean_one_column values('f')")
+
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true), Row(true), Row(true), Row(true),
+        Row(false), Row(false), Row(false), Row(false),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null))
+    )
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_one_column")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 14)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: true, and Inserting and selecting table: one column boolean and many rows, should support") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql("insert into boolean_one_column values(true)")
+    sql("insert into boolean_one_column values(True)")
+    sql("insert into boolean_one_column values(TRUE)")
+    sql("insert into boolean_one_column values('true')")
+    sql("insert into boolean_one_column values(False)")
+    sql("insert into boolean_one_column values(false)")
+    sql("insert into boolean_one_column values(FALSE)")
+    sql("insert into boolean_one_column values('false')")
+    sql("insert into boolean_one_column values('tr')")
+    sql("insert into boolean_one_column values(null)")
+    sql("insert into boolean_one_column values('truEe')")
+    sql("insert into boolean_one_column values('falsEe')")
+    sql("insert into boolean_one_column values('t')")
+    sql("insert into boolean_one_column values('f')")
+
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true), Row(true), Row(true), Row(true),
+        Row(false), Row(false), Row(false), Row(false),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null))
+    )
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_one_column")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 18)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: false, and Loading table: support boolean and other data type") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    for (i <- 0 until 4) {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '${storeLocation}'
+           | INTO TABLE boolean_table
+           | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    }
+
+    checkAnswer(
+      sql("select count(*) from boolean_table"),
+      Seq(Row(40))
+    )
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_table")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 4)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: true, and Loading table: support boolean and other data type") {
+    //unfinish
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    for (i <- 0 until 4) {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '${storeLocation}'
+           | INTO TABLE boolean_table
+           | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    }
+
+    checkAnswer(
+      sql("select count(*) from boolean_table"),
+      Seq(Row(40))
+    )
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_table")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 5)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: false, and sort_columns is boolean") {
+    sql("drop table if exists boolean_one_column")
+    sql(
+      s"""CREATE TABLE if not exists boolean_one_column(
+         |booleanField BOOLEAN)
+         |STORED BY 'carbondata'
+         |TBLPROPERTIES('sort_columns'='booleanField','SORT_SCOPE'='GLOBAL_SORT')
+         |""".stripMargin)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+    sql("insert into boolean_one_column values(true)")
+    sql("insert into boolean_one_column values(True)")
+    sql("insert into boolean_one_column values(TRUE)")
+    sql("insert into boolean_one_column values('true')")
+    sql("insert into boolean_one_column values(False)")
+    sql("insert into boolean_one_column values(false)")
+    sql("insert into boolean_one_column values(FALSE)")
+    sql("insert into boolean_one_column values('false')")
+    sql("insert into boolean_one_column values('tr')")
+    sql("insert into boolean_one_column values(null)")
+    sql("insert into boolean_one_column values('truEe')")
+    sql("insert into boolean_one_column values('falsEe')")
+    sql("insert into boolean_one_column values('t')")
+    sql("insert into boolean_one_column values('f')")
+
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true), Row(true), Row(true), Row(true),
+        Row(false), Row(false), Row(false), Row(false),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null))
+    )
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_one_column")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 14)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: true, and sort_columns is boolean") {
+    sql("drop table if exists boolean_one_column")
+    sql(
+      s"""CREATE TABLE if not exists boolean_one_column(
+         |booleanField BOOLEAN)
+         |STORED BY 'carbondata'
+         |TBLPROPERTIES('sort_columns'='booleanField','SORT_SCOPE'='GLOBAL_SORT')
+         |""".stripMargin)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql("insert into boolean_one_column values(true)")
+    sql("insert into boolean_one_column values(True)")
+    sql("insert into boolean_one_column values(TRUE)")
+    sql("insert into boolean_one_column values('true')")
+    sql("insert into boolean_one_column values(False)")
+    sql("insert into boolean_one_column values(false)")
+    sql("insert into boolean_one_column values(FALSE)")
+    sql("insert into boolean_one_column values('false')")
+    sql("insert into boolean_one_column values('tr')")
+    sql("insert into boolean_one_column values(null)")
+    sql("insert into boolean_one_column values('truEe')")
+    sql("insert into boolean_one_column values('falsEe')")
+    sql("insert into boolean_one_column values('t')")
+    sql("insert into boolean_one_column values('f')")
+
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true), Row(true), Row(true), Row(true),
+        Row(false), Row(false), Row(false), Row(false),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null))
+    )
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_one_column")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 18)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesSortTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesSortTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesSortTest.scala
new file mode 100644
index 0000000..71c6c18
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesSortTest.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.booleantype
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class BooleanDataTypesSortTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  val rootPath = new File(this.getClass.getResource("/").getPath
+    + "../../../..").getCanonicalPath
+
+  override def beforeEach(): Unit = {
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+  }
+
+  test("Sort_columns: sort one boolean column") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='booleanField','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='booleanField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData')
+           """.stripMargin)
+
+    sql("insert into boolean_table2 select shortField,booleanField from boolean_table where shortField = 1 and booleanField = true")
+
+    checkAnswer(
+      sql("select shortField,booleanField from boolean_table"),
+      Seq(Row(5, false), Row(1, false), Row(2, false), Row(1, false), Row(4, false), Row(1, false), Row(1, true), Row(1, true), Row(1, true), Row(3, true))
+    )
+
+    checkAnswer(
+      sql("select shortField,booleanField from boolean_table2"),
+      Seq(Row(1, true), Row(1, true), Row(1, true))
+    )
+  }
+
+  test("Sort_columns: sort two boolean columns and other data type column") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='shortField,booleanField,booleanField2','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='shortField,booleanField,booleanField2')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert into boolean_table2 select shortField,booleanField,booleanField2 from boolean_table where shortField = 1 and booleanField = true")
+
+    checkAnswer(
+      sql("select shortField,booleanField,booleanField2 from boolean_table"),
+      Seq(Row(5, false, true), Row(1, false, false), Row(2, false, false), Row(1, false, false), Row(4, false, false),
+        Row(1, false, true), Row(1, true, true), Row(1, true, true), Row(1, true, true), Row(3, true, false))
+    )
+
+    checkAnswer(
+      sql("select shortField,booleanField,booleanField2 from boolean_table2"),
+      Seq(Row(1, true, true), Row(1, true, true), Row(1, true, true))
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index ff42e2d..1d8f941 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -127,7 +127,11 @@ public class UnsafeCarbonRowPage {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
         DataType dataType = measureDataType[mesCount];
-        if (dataType == DataTypes.SHORT) {
+        if (dataType == DataTypes.BOOLEAN) {
+          Boolean bval = (Boolean) value;
+          CarbonUnsafe.getUnsafe().putBoolean(baseObject, address + size, bval);
+          size += 1;
+        } else if (dataType == DataTypes.SHORT) {
           Short sval = (Short) value;
           CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
           size += 2;
@@ -209,7 +213,11 @@ public class UnsafeCarbonRowPage {
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       if (isSet(nullSetWords, mesCount)) {
         DataType dataType = measureDataType[mesCount];
-        if (dataType == DataTypes.SHORT) {
+        if (dataType == DataTypes.BOOLEAN) {
+          Boolean bval = CarbonUnsafe.getUnsafe().getBoolean(baseObject, address + size);
+          size += 1;
+          rowToFill[dimensionSize + mesCount] = bval;
+        } else if (dataType == DataTypes.SHORT) {
           Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
           size += 2;
           rowToFill[dimensionSize + mesCount] = sval;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index 3671316..fa50f1c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -347,7 +347,9 @@ public class IntermediateFileMerger implements Callable<Void> {
         if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
           stream.write((byte) 1);
           DataType dataType = aggType[counter];
-          if (dataType == DataTypes.SHORT) {
+          if (dataType == DataTypes.BOOLEAN) {
+            stream.writeBoolean((boolean)NonDictionaryUtil.getMeasure(fieldIndex, row));
+          } else if (dataType == DataTypes.SHORT) {
             stream.writeShort((short) NonDictionaryUtil.getMeasure(fieldIndex, row));
           } else if (dataType == DataTypes.INT) {
             stream.writeInt((int) NonDictionaryUtil.getMeasure(fieldIndex, row));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index 5cc96c5..8a60657 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -295,7 +295,9 @@ public class SortDataRows {
           if (null != value) {
             stream.write((byte) 1);
             DataType dataType = type[mesCount];
-            if (dataType == DataTypes.SHORT) {
+            if (dataType == DataTypes.BOOLEAN) {
+              stream.writeBoolean((boolean) value);
+            } else if (dataType == DataTypes.SHORT) {
               stream.writeShort((Short) value);
             } else if (dataType == DataTypes.INT) {
               stream.writeInt((Integer) value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 5d339c7..afa4cd7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -345,7 +345,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
       for (int i = 0; i < this.measureCount; i++) {
         if (stream.readByte() == 1) {
           DataType dataType = aggType[i];
-          if (dataType == DataTypes.SHORT) {
+          if (dataType == DataTypes.BOOLEAN) {
+            measures[index++] = stream.readBoolean();
+          } else if (dataType == DataTypes.SHORT) {
             measures[index++] = stream.readShort();
           } else if (dataType == DataTypes.INT) {
             measures[index++] = stream.readInt();