You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/20 00:36:49 UTC

[1/3] carbondata git commit: [CARBONDATA-1444] Support Boolean data type

Repository: carbondata
Updated Branches:
  refs/heads/master 228ab2fd1 -> 6abdd97a4


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
new file mode 100644
index 0000000..c19b0cd
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.booleantype
+
+import java.io.File
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class BooleanDataTypesLoadTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  val rootPath = new File(this.getClass.getResource("/").getPath
+    + "../../../..").getCanonicalPath
+
+  override def beforeEach(): Unit = {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+    sql("drop table if exists boolean_table3")
+    sql("drop table if exists boolean_table4")
+    sql("drop table if exists badRecords")
+    sql("CREATE TABLE if not exists carbon_table(booleanField BOOLEAN) STORED BY 'carbondata'")
+    sql(
+      s"""
+         | CREATE TABLE badRecords(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | decimalField DECIMAL(18,2),
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+    sql("drop table if exists boolean_table3")
+    sql("drop table if exists boolean_table4")
+    sql("drop table if exists badRecords")
+  }
+
+  test("Loading table: support boolean data type format") {
+    val fileLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanOnlyBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$fileLocation'
+         | INTO TABLE carbon_table
+         | OPTIONS('FILEHEADER' = 'booleanField')
+       """.stripMargin)
+
+    checkAnswer(sql("select * from carbon_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select * from carbon_table"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(false), Row(false), Row(false), Row(false), Row(null), Row(null), Row(null)))
+  }
+
+  test("Loading table: support boolean data type format, different format") {
+    val fileLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanDifferentFormat.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$fileLocation'
+         | INTO TABLE carbon_table
+         | OPTIONS('FILEHEADER' = 'booleanField')
+       """.stripMargin)
+
+    checkAnswer(sql("select * from carbon_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select * from carbon_table"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(false), Row(false), Row(false), Row(false)
+        , Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null)))
+  }
+
+  test("Loading table: support boolean and other data type") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+  }
+
+  test("Loading table: data columns is less than table defined columns") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table"),
+      Seq(Row(true, 10, null), Row(false, 17, null), Row(false, 11, null),
+        Row(true, 10, null), Row(true, 10, null), Row(true, 14, null),
+        Row(false, 10, null), Row(false, 10, null), Row(false, 16, null), Row(false, 10, null))
+    )
+  }
+
+  test("Loading table: support boolean and other data type, data columns bigger than table defined columns") {
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+  }
+
+  test("Loading table: support boolean and other data type, with file header") {
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanWithFileHeader.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+  }
+
+  test("Loading table: create with DICTIONARY_EXCLUDE, TABLE_BLOCKSIZE, NO_INVERTED_INDEX, SORT_SCOPE") {
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_EXCLUDE'='charField','TABLE_BLOCKSIZE'='512','NO_INVERTED_INDEX'='charField', 'SORT_SCOPE'='GLOBAL_SORT')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(sql("select booleanField from boolean_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = true"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
+      Row(0))
+
+    checkAnswer(sql(
+      s"""
+         |select count(*)
+         |from boolean_table where booleanField = \"true\"
+         |""".stripMargin),
+      Row(0))
+
+    checkAnswer(sql("select booleanField from boolean_table where booleanField = false"),
+      Seq(Row(false), Row(false), Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = false"),
+      Row(6))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = 'false'"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = null"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
+      Row(10))
+  }
+
+  test("Loading table: load with DELIMITER, QUOTECHAR, COMMENTCHAR, MULTILINE, ESCAPECHAR, COMPLEX_DELIMITER_LEVEL_1, SINGLE_PASS") {
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_EXCLUDE'='charField','TABLE_BLOCKSIZE'='512','NO_INVERTED_INDEX'='charField', 'SORT_SCOPE'='GLOBAL_SORT')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanWithFileHeader.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('DELIMITER'=',','QUOTECHAR'='"','COMMENTCHAR'='#','MULTILINE'='true','ESCAPECHAR'='\','COMPLEX_DELIMITER_LEVEL_1'='#','COMPLEX_DELIMITER_LEVEL_2'=':','SINGLE_PASS'='TRUE')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(sql("select booleanField from boolean_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = true"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
+      Row(0))
+
+    checkAnswer(sql(
+      s"""
+         |select count(*)
+         |from boolean_table where booleanField = \"true\"
+         |""".stripMargin),
+      Row(0))
+
+    checkAnswer(sql("select booleanField from boolean_table where booleanField = false"),
+      Seq(Row(false), Row(false), Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = false"),
+      Row(6))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = 'false'"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = null"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
+      Row(10))
+  }
+
+  test("Loading table: bad_records_action is FORCE") {
+    val fileLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanDifferentFormat.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$fileLocation'
+         | INTO TABLE carbon_table
+         | OPTIONS('FILEHEADER' = 'booleanField','bad_records_logger_enable'='true','bad_records_action'='FORCE')
+       """.stripMargin)
+
+    checkAnswer(sql("select * from carbon_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select * from carbon_table"),
+      Seq(Row(true), Row(true), Row(true), Row(true),
+        Row(false), Row(false), Row(false), Row(false),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null)))
+  }
+
+  test("Loading table: bad_records_action is FORCE, support boolean and other data type") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE badRecords
+         | options('bad_records_logger_enable'='true','bad_records_action'='FORCE','FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from badRecords"),
+      Seq(Row(true, 10, true), Row(null, 17, true), Row(null, 11, true),
+        Row(null, 10, true), Row(null, 10, true), Row(true, 14, null),
+        Row(false, 10, null), Row(false, 10, null), Row(false, 16, null), Row(false, 10, false))
+    )
+  }
+
+  test("Loading table: bad_records_action is IGNORE, support boolean and other data type") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE badRecords
+         | options('bad_records_logger_enable'='true','bad_records_action'='IGNORE','FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from badRecords"),
+      Seq(Row(true, 10, true), Row(false, 10, false))
+    )
+  }
+
+  test("Loading table: bad_records_action is REDIRECT, support boolean and other data type") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE badRecords
+         | options('bad_records_logger_enable'='true','bad_records_action'='REDIRECT','FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from badRecords"),
+      Seq(Row(true, 10, true), Row(false, 10, false))
+    )
+  }
+
+  test("Loading table: bad_records_action is FAIL") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv"
+    val exception_insert: Exception = intercept[Exception] {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '${storeLocation}'
+           | INTO TABLE badRecords
+           | options('bad_records_logger_enable'='true','bad_records_action'='FAIL','FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    }
+    assert(exception_insert.getMessage.contains("The value with column name booleanfield and column data type BOOLEAN is not a valid BOOLEAN type"))
+  }
+
+  test("Loading overwrite: into and then overwrite table with another table: support boolean data type and other format") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table3(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | stringField STRING,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE boolean_table4(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | stringField STRING,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='')
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | overwrite INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert overwrite table boolean_table2 select * from boolean_table")
+    sql("insert overwrite table boolean_table3 select shortField,booleanField,intField,stringField,booleanField2 from boolean_table")
+    sql("insert overwrite table boolean_table4 select shortField,booleanField,intField,stringField,booleanField2 from boolean_table where shortField > 3")
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table2"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table3"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table4"),
+      Seq(Row(false, 17), Row(false, 16))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table2"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table3"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+  }
+
+  test("Loading overwrite: support boolean data type format, different format") {
+    val fileLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanDifferentFormat.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$fileLocation'
+         | INTO TABLE carbon_table
+         | OPTIONS('FILEHEADER' = 'booleanField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$fileLocation'
+         | OVERWRITE INTO TABLE carbon_table
+         | OPTIONS('FILEHEADER' = 'booleanField')
+       """.stripMargin)
+
+
+    checkAnswer(sql("select * from carbon_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select * from carbon_table"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(false), Row(false), Row(false), Row(false)
+        , Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null), Row(null)))
+  }
+
+  test("Loading overwrite: support boolean and other data type") {
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | OVERWRITE INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+  }
+
+  test("Comparing table: support boolean and other data type") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert into boolean_table2 select * from boolean_table where shortField = 1 and booleanField = true")
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table2"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table where exists (select booleanField,intField,booleanField2 " +
+        "from boolean_table2 where boolean_table.intField=boolean_table2.intField)"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true), Row(false, 10, false), Row(false, 10, false), Row(false, 10, false))
+    )
+  }
+
+  test("Loading table: unsafe, support boolean and other data type") {
+    initConf
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+    sql("drop table if exists boolean_table")
+    defaultConf
+  }
+
+  test("Loading table: unsafe, bad_records_action is IGNORE, support boolean and other data type") {
+    initConf
+    sql("drop table if exists badRecords")
+    sql(
+      s"""
+         | CREATE TABLE badRecords(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | decimalField DECIMAL(18,2),
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE badRecords
+         | options('bad_records_logger_enable'='true','bad_records_action'='IGNORE','FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from badRecords"),
+      Seq(Row(true, 10, true), Row(false, 10, false))
+    )
+    sql("drop table if exists badRecords")
+    defaultConf()
+  }
+
+  def initConf(): Unit ={
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
+        "true")
+  }
+
+  def defaultConf(): Unit ={
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
+        CarbonCommonConstants.ENABLE_DATA_LOADING_STATISTICS_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesParameterTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesParameterTest.scala
new file mode 100644
index 0000000..07c37b6
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesParameterTest.scala
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.booleantype
+
+import java.io.File
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class BooleanDataTypesParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  val filePath: String = s"$resourcesPath/globalsort"
+  val file1: String = resourcesPath + "/globalsort/sample1.csv"
+  val file2: String = resourcesPath + "/globalsort/sample2.csv"
+  val file3: String = resourcesPath + "/globalsort/sample3.csv"
+
+  override def beforeEach(): Unit = {
+    sql("drop table if exists boolean_one_column")
+    sql("drop table if exists boolean_table")
+    sql(
+      s"""CREATE TABLE if not exists boolean_one_column(
+         |booleanField BOOLEAN)
+         |STORED BY 'carbondata'
+         |""".stripMargin)
+  }
+
+  val rootPath = new File(this.getClass.getResource("/").getPath
+    + "../../../..").getCanonicalPath
+
+  override def beforeAll(): Unit = {
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists boolean_one_column")
+    sql("drop table if exists boolean_table")
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: false, and Inserting and selecting table: one column boolean and many rows, should support") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+    sql("insert into boolean_one_column values(true)")
+    sql("insert into boolean_one_column values(True)")
+    sql("insert into boolean_one_column values(TRUE)")
+    sql("insert into boolean_one_column values('true')")
+    sql("insert into boolean_one_column values(False)")
+    sql("insert into boolean_one_column values(false)")
+    sql("insert into boolean_one_column values(FALSE)")
+    sql("insert into boolean_one_column values('false')")
+    sql("insert into boolean_one_column values('tr')")
+    sql("insert into boolean_one_column values(null)")
+    sql("insert into boolean_one_column values('truEe')")
+    sql("insert into boolean_one_column values('falsEe')")
+    sql("insert into boolean_one_column values('t')")
+    sql("insert into boolean_one_column values('f')")
+
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true), Row(true), Row(true), Row(true),
+        Row(false), Row(false), Row(false), Row(false),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null))
+    )
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_one_column")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 14)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: true, and Inserting and selecting table: one column boolean and many rows, should support") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql("insert into boolean_one_column values(true)")
+    sql("insert into boolean_one_column values(True)")
+    sql("insert into boolean_one_column values(TRUE)")
+    sql("insert into boolean_one_column values('true')")
+    sql("insert into boolean_one_column values(False)")
+    sql("insert into boolean_one_column values(false)")
+    sql("insert into boolean_one_column values(FALSE)")
+    sql("insert into boolean_one_column values('false')")
+    sql("insert into boolean_one_column values('tr')")
+    sql("insert into boolean_one_column values(null)")
+    sql("insert into boolean_one_column values('truEe')")
+    sql("insert into boolean_one_column values('falsEe')")
+    sql("insert into boolean_one_column values('t')")
+    sql("insert into boolean_one_column values('f')")
+
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true), Row(true), Row(true), Row(true),
+        Row(false), Row(false), Row(false), Row(false),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null))
+    )
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_one_column")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 18)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: false, and Loading table: support boolean and other data type") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    for (i <- 0 until 4) {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '${storeLocation}'
+           | INTO TABLE boolean_table
+           | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    }
+
+    checkAnswer(
+      sql("select count(*) from boolean_table"),
+      Seq(Row(40))
+    )
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_table")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 4)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: true, and Loading table: support boolean and other data type") {
+    //unfinish
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    for (i <- 0 until 4) {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '${storeLocation}'
+           | INTO TABLE boolean_table
+           | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+    }
+
+    checkAnswer(
+      sql("select count(*) from boolean_table"),
+      Seq(Row(40))
+    )
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_table")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 5)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: false, and sort_columns is boolean") {
+    sql("drop table if exists boolean_one_column")
+    sql(
+      s"""CREATE TABLE if not exists boolean_one_column(
+         |booleanField BOOLEAN)
+         |STORED BY 'carbondata'
+         |TBLPROPERTIES('sort_columns'='booleanField','SORT_SCOPE'='GLOBAL_SORT')
+         |""".stripMargin)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+    sql("insert into boolean_one_column values(true)")
+    sql("insert into boolean_one_column values(True)")
+    sql("insert into boolean_one_column values(TRUE)")
+    sql("insert into boolean_one_column values('true')")
+    sql("insert into boolean_one_column values(False)")
+    sql("insert into boolean_one_column values(false)")
+    sql("insert into boolean_one_column values(FALSE)")
+    sql("insert into boolean_one_column values('false')")
+    sql("insert into boolean_one_column values('tr')")
+    sql("insert into boolean_one_column values(null)")
+    sql("insert into boolean_one_column values('truEe')")
+    sql("insert into boolean_one_column values('falsEe')")
+    sql("insert into boolean_one_column values('t')")
+    sql("insert into boolean_one_column values('f')")
+
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true), Row(true), Row(true), Row(true),
+        Row(false), Row(false), Row(false), Row(false),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null))
+    )
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_one_column")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 14)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("ENABLE_AUTO_LOAD_MERGE: true, and sort_columns is boolean") {
+    sql("drop table if exists boolean_one_column")
+    sql(
+      s"""CREATE TABLE if not exists boolean_one_column(
+         |booleanField BOOLEAN)
+         |STORED BY 'carbondata'
+         |TBLPROPERTIES('sort_columns'='booleanField','SORT_SCOPE'='GLOBAL_SORT')
+         |""".stripMargin)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql("insert into boolean_one_column values(true)")
+    sql("insert into boolean_one_column values(True)")
+    sql("insert into boolean_one_column values(TRUE)")
+    sql("insert into boolean_one_column values('true')")
+    sql("insert into boolean_one_column values(False)")
+    sql("insert into boolean_one_column values(false)")
+    sql("insert into boolean_one_column values(FALSE)")
+    sql("insert into boolean_one_column values('false')")
+    sql("insert into boolean_one_column values('tr')")
+    sql("insert into boolean_one_column values(null)")
+    sql("insert into boolean_one_column values('truEe')")
+    sql("insert into boolean_one_column values('falsEe')")
+    sql("insert into boolean_one_column values('t')")
+    sql("insert into boolean_one_column values('f')")
+
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true), Row(true), Row(true), Row(true),
+        Row(false), Row(false), Row(false), Row(false),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null))
+    )
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE boolean_one_column")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 18)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesSortTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesSortTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesSortTest.scala
new file mode 100644
index 0000000..71c6c18
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesSortTest.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.booleantype
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class BooleanDataTypesSortTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  val rootPath = new File(this.getClass.getResource("/").getPath
+    + "../../../..").getCanonicalPath
+
+  override def beforeEach(): Unit = {
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+  }
+
+  test("Sort_columns: sort one boolean column") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='booleanField','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='booleanField')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData')
+           """.stripMargin)
+
+    sql("insert into boolean_table2 select shortField,booleanField from boolean_table where shortField = 1 and booleanField = true")
+
+    checkAnswer(
+      sql("select shortField,booleanField from boolean_table"),
+      Seq(Row(5, false), Row(1, false), Row(2, false), Row(1, false), Row(4, false), Row(1, false), Row(1, true), Row(1, true), Row(1, true), Row(3, true))
+    )
+
+    checkAnswer(
+      sql("select shortField,booleanField from boolean_table2"),
+      Seq(Row(1, true), Row(1, true), Row(1, true))
+    )
+  }
+
+  test("Sort_columns: sort two boolean columns and other data type column") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='shortField,booleanField,booleanField2','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='shortField,booleanField,booleanField2')
+       """.stripMargin)
+
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert into boolean_table2 select shortField,booleanField,booleanField2 from boolean_table where shortField = 1 and booleanField = true")
+
+    checkAnswer(
+      sql("select shortField,booleanField,booleanField2 from boolean_table"),
+      Seq(Row(5, false, true), Row(1, false, false), Row(2, false, false), Row(1, false, false), Row(4, false, false),
+        Row(1, false, true), Row(1, true, true), Row(1, true, true), Row(1, true, true), Row(3, true, false))
+    )
+
+    checkAnswer(
+      sql("select shortField,booleanField,booleanField2 from boolean_table2"),
+      Seq(Row(1, true, true), Row(1, true, true), Row(1, true, true))
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index ff42e2d..1d8f941 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -127,7 +127,11 @@ public class UnsafeCarbonRowPage {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
         DataType dataType = measureDataType[mesCount];
-        if (dataType == DataTypes.SHORT) {
+        if (dataType == DataTypes.BOOLEAN) {
+          Boolean bval = (Boolean) value;
+          CarbonUnsafe.getUnsafe().putBoolean(baseObject, address + size, bval);
+          size += 1;
+        } else if (dataType == DataTypes.SHORT) {
           Short sval = (Short) value;
           CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
           size += 2;
@@ -209,7 +213,11 @@ public class UnsafeCarbonRowPage {
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       if (isSet(nullSetWords, mesCount)) {
         DataType dataType = measureDataType[mesCount];
-        if (dataType == DataTypes.SHORT) {
+        if (dataType == DataTypes.BOOLEAN) {
+          Boolean bval = CarbonUnsafe.getUnsafe().getBoolean(baseObject, address + size);
+          size += 1;
+          rowToFill[dimensionSize + mesCount] = bval;
+        } else if (dataType == DataTypes.SHORT) {
           Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
           size += 2;
           rowToFill[dimensionSize + mesCount] = sval;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index 3671316..fa50f1c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -347,7 +347,9 @@ public class IntermediateFileMerger implements Callable<Void> {
         if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
           stream.write((byte) 1);
           DataType dataType = aggType[counter];
-          if (dataType == DataTypes.SHORT) {
+          if (dataType == DataTypes.BOOLEAN) {
+            stream.writeBoolean((boolean)NonDictionaryUtil.getMeasure(fieldIndex, row));
+          } else if (dataType == DataTypes.SHORT) {
             stream.writeShort((short) NonDictionaryUtil.getMeasure(fieldIndex, row));
           } else if (dataType == DataTypes.INT) {
             stream.writeInt((int) NonDictionaryUtil.getMeasure(fieldIndex, row));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index 5cc96c5..8a60657 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -295,7 +295,9 @@ public class SortDataRows {
           if (null != value) {
             stream.write((byte) 1);
             DataType dataType = type[mesCount];
-            if (dataType == DataTypes.SHORT) {
+            if (dataType == DataTypes.BOOLEAN) {
+              stream.writeBoolean((boolean) value);
+            } else if (dataType == DataTypes.SHORT) {
               stream.writeShort((Short) value);
             } else if (dataType == DataTypes.INT) {
               stream.writeInt((Integer) value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 5d339c7..afa4cd7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -345,7 +345,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
       for (int i = 0; i < this.measureCount; i++) {
         if (stream.readByte() == 1) {
           DataType dataType = aggType[i];
-          if (dataType == DataTypes.SHORT) {
+          if (dataType == DataTypes.BOOLEAN) {
+            measures[index++] = stream.readBoolean();
+          } else if (dataType == DataTypes.SHORT) {
             measures[index++] = stream.readShort();
           } else if (dataType == DataTypes.INT) {
             measures[index++] = stream.readInt();


[2/3] carbondata git commit: [CARBONDATA-1444] Support Boolean data type

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesBaseTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesBaseTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesBaseTest.scala
new file mode 100644
index 0000000..c0087a8
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesBaseTest.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.booleantype
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class BooleanDataTypesBaseTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+
+  override def beforeEach(): Unit = {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists boolean_table")
+  }
+
+  override def afterEach(): Unit = {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists boolean_table")
+  }
+
+  test("Creating table: boolean one column, should support") {
+    try {
+      sql("CREATE TABLE if not exists boolean_table(cc BOOLEAN) STORED BY 'carbondata'")
+      assert(true)
+    } catch {
+      case _: Exception => assert(false)
+    }
+  }
+
+  test("Creating table: boolean and other table, should support") {
+    try {
+      sql(
+        s"""
+           |CREATE TABLE if not exists boolean_table(
+           |aa INT, bb STRING, cc BOOLEAN
+           |) STORED BY 'carbondata'""".stripMargin)
+      assert(true)
+    } catch {
+      case _: Exception => assert(false)
+    }
+  }
+
+  test("Describing table: boolean data type, should support") {
+    sql(
+      s"""
+         |CREATE TABLE if not exists carbon_table(
+         |cc BOOLEAN
+         |) STORED BY 'carbondata'""".stripMargin)
+    checkExistence(sql("describe formatted carbon_table"), true, "boolean")
+  }
+
+  test("Describing table: support boolean data type format and other table ") {
+    sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+    checkExistence(sql("describe formatted carbon_table"), true, "boolean")
+  }
+
+  test("Altering table and add column: add boolean type column") {
+    sql(
+      s"""
+         |CREATE TABLE if not exists carbon_table(
+         |aa INT, bb STRING
+         |) STORED BY 'carbondata'""".stripMargin)
+    sql("alter table carbon_table add columns (dd BOOLEAN)")
+    checkExistence(sql("describe formatted carbon_table"), true, "boolean")
+    checkExistence(sql("describe formatted carbon_table"), true, "dd")
+  }
+
+  test("Altering table and add column: exists boolean column, add boolean type column") {
+    sql(
+      s"""
+         |CREATE TABLE if not exists carbon_table(
+         |aa INT, bb STRING, cc BOOLEAN
+         |) STORED BY 'carbondata'""".stripMargin)
+    sql("alter table carbon_table add columns (dd BOOLEAN)")
+    checkExistence(sql("describe formatted carbon_table"), true, "boolean")
+    checkExistence(sql("describe formatted carbon_table"), true, "dd")
+  }
+
+  test("Altering table and add column: exists boolean column, add not boolean type column") {
+    sql(
+      s"""
+         |CREATE TABLE if not exists carbon_table(
+         |aa INT, bb STRING, cc BOOLEAN
+         |) STORED BY 'carbondata'""".stripMargin)
+    sql("alter table carbon_table add columns (dd STRING)")
+    checkExistence(sql("describe formatted carbon_table"), true, "boolean")
+    checkExistence(sql("describe formatted carbon_table"), true, "dd")
+  }
+
+  test("Altering table and add column and insert values: exists boolean column, add boolean type column") {
+    sql(
+      s"""
+         |CREATE TABLE if not exists carbon_table(
+         |aa STRING, bb INT, cc BOOLEAN
+         |) STORED BY 'carbondata'""".stripMargin)
+    sql("alter table carbon_table add columns (dd BOOLEAN)")
+    sql("insert into carbon_table values('adam',11,true,false)")
+    checkAnswer(sql("select * from carbon_table"), Seq(Row("adam", 11, true, false)))
+  }
+
+  test("Altering table and drop column and insert values: exists boolean column, add boolean type column") {
+    sql(
+      s"""
+         |CREATE TABLE if not exists carbon_table(
+         |aa STRING, bb INT, cc BOOLEAN, dd BOOLEAN
+         |) STORED BY 'carbondata'""".stripMargin)
+    sql("alter table carbon_table drop columns (dd)")
+    sql("insert into carbon_table values('adam',11,true)")
+    checkAnswer(sql("select * from carbon_table"), Seq(Row("adam", 11, true)))
+  }
+
+  test("Deleting table and drop column and insert values: exists boolean column, add boolean type column") {
+    sql(
+      s"""
+         |CREATE TABLE if not exists carbon_table(
+         |aa STRING, bb INT, cc BOOLEAN, dd BOOLEAN
+         |) STORED BY 'carbondata'""".stripMargin)
+    sql("alter table carbon_table drop columns (dd)")
+    sql("insert into carbon_table values('adam',11,true)")
+    checkAnswer(sql("select * from carbon_table"), Seq(Row("adam", 11, true)))
+    sql("delete from carbon_table where cc=true")
+    checkAnswer(sql("select COUNT(*) from carbon_table"), Row(0))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesBigFileTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesBigFileTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesBigFileTest.scala
new file mode 100644
index 0000000..53835fb
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesBigFileTest.scala
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.booleantype
+
+import java.io.{File, PrintWriter}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+
+class BooleanDataTypesBigFileTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  val rootPath = new File(this.getClass.getResource("/").getPath
+    + "../../../..").getCanonicalPath
+
+  override def beforeEach(): Unit = {
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+    assert(BooleanFile.deleteFile(pathOfManyDataType))
+    assert(BooleanFile.deleteFile(pathOfOnlyBoolean))
+  }
+
+  val pathOfManyDataType = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBigFile.csv"
+  val pathOfOnlyBoolean = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanBigFileOnlyBoolean.csv"
+  val trueNum = 10000
+
+  override def beforeAll(): Unit = {
+    assert(BooleanFile.createBooleanFileWithOtherDataType(pathOfManyDataType, trueNum))
+    assert(BooleanFile.createOnlyBooleanFile(pathOfOnlyBoolean, trueNum))
+  }
+
+  test("Loading table: support boolean and other data type, big file") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | intField INT,
+         | booleanField BOOLEAN,
+         | stringField STRING,
+         | doubleField DOUBLE,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${pathOfManyDataType}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='intField,booleanField,stringField,doubleField,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select count(*) from boolean_table"),
+      Row(trueNum + trueNum / 10))
+  }
+
+  test("Inserting table: support boolean and other data type, big file") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | intField INT,
+         | booleanField BOOLEAN,
+         | stringField STRING,
+         | doubleField DOUBLE,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | intField INT,
+         | booleanField BOOLEAN,
+         | stringField STRING,
+         | doubleField DOUBLE,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${pathOfManyDataType}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='intField,booleanField,stringField,doubleField,booleanField2')
+           """.stripMargin)
+
+    sql("insert into boolean_table2 select * from boolean_table")
+
+    checkAnswer(
+      sql("select count(*) from boolean_table2"),
+      Row(trueNum + trueNum / 10))
+  }
+
+  test("Filtering table: support boolean data type, only boolean, big file") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | booleanField BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${pathOfOnlyBoolean}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='booleanField')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select count(*) from boolean_table"),
+      Row(trueNum + trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField is not null"),
+      Row(trueNum + trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField is null"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField = true"),
+      Row(trueNum))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField >= true"),
+      Row(trueNum))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField > true"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField < true"),
+      Row(trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField = false"),
+      Row(trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField <= false"),
+      Row(trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField > false"),
+      Row(trueNum))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField < false"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField in (false)"),
+      Row(trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField not in (false)"),
+      Row(trueNum))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField in (true,false)"),
+      Row(trueNum + trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField not in (true,false)"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField like 'f%'"),
+      Row(trueNum / 10))
+  }
+
+  test("Filtering table: support boolean and other data type, big file") {
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | intField INT,
+         | booleanField BOOLEAN,
+         | stringField STRING,
+         | doubleField DOUBLE,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${pathOfManyDataType}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='intField,booleanField,stringField,doubleField,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql("select booleanField from boolean_table where intField >=1 and intField <11"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true))
+    )
+
+    checkAnswer(
+      sql(s"select booleanField from boolean_table where intField >='${trueNum - 5}' and intField <=${trueNum + 1}"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(true), Row(false), Row(false))
+    )
+
+    checkAnswer(
+      sql(s"select count(*) from boolean_table where intField >='${trueNum - 5}' and doubleField <=${trueNum + 1} and booleanField=false"),
+      Seq(Row(2))
+    )
+
+    checkAnswer(
+      sql(s"select * from boolean_table where intField >4 and doubleField < 6.0"),
+      Seq(Row(5, true, "num5", 5.0, false))
+    )
+
+    checkAnswer(
+      sql("select count(*) from boolean_table"),
+      Row(trueNum + trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField is not null"),
+      Row(trueNum + trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField is null"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField = true"),
+      Row(trueNum))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField >= true"),
+      Row(trueNum))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField > true"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField < true"),
+      Row(trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField = false"),
+      Row(trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField <= false"),
+      Row(trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField > false"),
+      Row(trueNum))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField < false"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField in (false)"),
+      Row(trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField not in (false)"),
+      Row(trueNum))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField in (true,false)"),
+      Row(trueNum + trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField not in (true,false)"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField like 'f%'"),
+      Row(trueNum / 10))
+  }
+
+  test("Filtering table: support boolean and other data type, big file, load twice") {
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | intField INT,
+         | booleanField BOOLEAN,
+         | stringField STRING,
+         | doubleField DOUBLE,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+    val repeat: Int = 2
+    for (i <- 0 until repeat) {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '${pathOfManyDataType}'
+           | INTO TABLE boolean_table
+           | options('FILEHEADER'='intField,booleanField,stringField,doubleField,booleanField2')
+           """.stripMargin
+      )
+    }
+
+    checkAnswer(
+      sql("select booleanField from boolean_table where intField >=1 and intField <11"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true),
+        Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true))
+    )
+
+    checkAnswer(
+      sql(s"select booleanField from boolean_table where intField >='${trueNum - 5}' and intField <=${trueNum + 1}"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(true), Row(false), Row(false),
+        Row(true), Row(true), Row(true), Row(true), Row(true), Row(false), Row(false))
+    )
+
+    checkAnswer(
+      sql(s"select count(*) from boolean_table where intField >='${trueNum - 5}' and doubleField <=${trueNum + 1} and booleanField=false"),
+      Seq(Row(4))
+    )
+
+    checkAnswer(
+      sql(s"select * from boolean_table where intField >4 and doubleField < 6.0"),
+      Seq(Row(5, true, "num5", 5.0, false), Row(5, true, "num5", 5.0, false))
+    )
+
+    checkAnswer(
+      sql("select count(*) from boolean_table"),
+      Row(repeat * (trueNum + trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField is not null"),
+      Row(repeat * (trueNum + trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField is null"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField = true"),
+      Row(repeat * (trueNum)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField >= true"),
+      Row(repeat * (trueNum)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField > true"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField < true"),
+      Row(repeat * (trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField = false"),
+      Row(repeat * (trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField <= false"),
+      Row(repeat * (trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField > false"),
+      Row(repeat * (trueNum)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField < false"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField in (false)"),
+      Row(repeat * (trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField not in (false)"),
+      Row(repeat * (trueNum)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField in (true,false)"),
+      Row(repeat * (trueNum + trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField not in (true,false)"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField like 'f%'"),
+      Row(repeat * (trueNum / 10)))
+  }
+
+  test("Sort_columns: support boolean and other data type, big file") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | intField INT,
+         | booleanField BOOLEAN,
+         | stringField STRING,
+         | doubleField DOUBLE,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='booleanField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${pathOfManyDataType}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='intField,booleanField,stringField,doubleField,booleanField2')
+           """.stripMargin)
+
+    checkAnswer(
+      sql(s"select booleanField from boolean_table where intField >='${trueNum - 5}' and intField <=${trueNum + 1}"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(true), Row(false), Row(false))
+    )
+  }
+
+  test("Inserting into Hive table from carbon table: support boolean data type and other format, big file") {
+    sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         | intField INT,
+         | booleanField BOOLEAN,
+         | stringField STRING,
+         | doubleField DOUBLE,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE hive_table(
+         | intField INT,
+         | booleanField BOOLEAN,
+         | stringField STRING,
+         | doubleField DOUBLE,
+         | booleanField2 BOOLEAN
+         | )
+         |  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+       """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${pathOfManyDataType}'
+         | INTO TABLE carbon_table
+         | options('FILEHEADER'='intField,booleanField,stringField,doubleField,booleanField2')
+           """.stripMargin)
+
+    sql("insert into hive_table select * from carbon_table")
+
+    checkAnswer(
+      sql(s"select booleanField from hive_table where intField >='${trueNum - 5}' and intField <=${trueNum + 1}"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(true), Row(false), Row(false))
+    )
+
+    checkAnswer(
+      sql(s"select * from hive_table where intField >4 and doubleField < 6.0"),
+      Seq(Row(5, true, "num5", 5.0, false))
+    )
+
+    checkAnswer(
+      sql("select count(*) from hive_table"),
+      Row(trueNum + trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from hive_table where booleanField = true"),
+      Row(trueNum))
+
+    checkAnswer(
+      sql("select count(*) from hive_table where booleanField = false"),
+      Row(trueNum / 10))
+  }
+
+  test("Inserting into carbon table from Hive table: support boolean data type and other format, big file") {
+    sql(
+      s"""
+         | CREATE TABLE hive_table(
+         | intField INT,
+         | booleanField BOOLEAN,
+         | stringField STRING,
+         | doubleField DOUBLE,
+         | booleanField2 BOOLEAN
+         | )
+         | ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         | intField INT,
+         | booleanField BOOLEAN,
+         | stringField STRING,
+         | doubleField DOUBLE,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${pathOfManyDataType}'
+         | INTO TABLE hive_table
+           """.stripMargin)
+
+    sql("insert into carbon_table select * from hive_table")
+
+    checkAnswer(
+      sql(s"select booleanField from carbon_table where intField >='${trueNum - 5}' and intField <=${trueNum + 1}"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(true), Row(false), Row(false))
+    )
+
+    checkAnswer(
+      sql(s"select * from carbon_table where intField >4 and doubleField < 6.0"),
+      Seq(Row(5, true, "num5", 5.0, false))
+    )
+
+    checkAnswer(
+      sql("select count(*) from carbon_table"),
+      Row(trueNum + trueNum / 10))
+
+    checkAnswer(
+      sql("select count(*) from carbon_table where booleanField = true"),
+      Row(trueNum))
+
+    checkAnswer(
+      sql("select count(*) from carbon_table where booleanField = false"),
+      Row(trueNum / 10))
+  }
+
+  test("Filtering table: unsafe, support boolean and other data type, big file, load twice") {
+    initConf()
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | intField INT,
+         | booleanField BOOLEAN,
+         | stringField STRING,
+         | doubleField DOUBLE,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+       """.stripMargin)
+    val repeat: Int = 2
+    for (i <- 0 until repeat) {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '${pathOfManyDataType}'
+           | INTO TABLE boolean_table
+           | options('FILEHEADER'='intField,booleanField,stringField,doubleField,booleanField2')
+           """.stripMargin
+      )
+    }
+
+    checkAnswer(
+      sql("select booleanField from boolean_table where intField >=1 and intField <11"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true),
+        Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true), Row(true))
+    )
+
+    checkAnswer(
+      sql(s"select booleanField from boolean_table where intField >='${trueNum - 5}' and intField <=${trueNum + 1}"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(true), Row(false), Row(false),
+        Row(true), Row(true), Row(true), Row(true), Row(true), Row(false), Row(false))
+    )
+
+    checkAnswer(
+      sql(s"select count(*) from boolean_table where intField >='${trueNum - 5}' and doubleField <=${trueNum + 1} and booleanField=false"),
+      Seq(Row(4))
+    )
+
+    checkAnswer(
+      sql(s"select * from boolean_table where intField >4 and doubleField < 6.0"),
+      Seq(Row(5, true, "num5", 5.0, false), Row(5, true, "num5", 5.0, false))
+    )
+
+    checkAnswer(
+      sql("select count(*) from boolean_table"),
+      Row(repeat * (trueNum + trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField is not null"),
+      Row(repeat * (trueNum + trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField is null"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField = true"),
+      Row(repeat * (trueNum)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField >= true"),
+      Row(repeat * (trueNum)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField > true"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField < true"),
+      Row(repeat * (trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField = false"),
+      Row(repeat * (trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField <= false"),
+      Row(repeat * (trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField > false"),
+      Row(repeat * (trueNum)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField < false"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField in (false)"),
+      Row(repeat * (trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField not in (false)"),
+      Row(repeat * (trueNum)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField in (true,false)"),
+      Row(repeat * (trueNum + trueNum / 10)))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField not in (true,false)"),
+      Row(0))
+
+    checkAnswer(
+      sql("select count(*) from boolean_table where booleanField like 'f%'"),
+      Row(repeat * (trueNum / 10)))
+    defaultConf()
+  }
+
+  def initConf(): Unit = {
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
+        "true")
+  }
+
+  def defaultConf(): Unit = {
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
+        CarbonCommonConstants.ENABLE_DATA_LOADING_STATISTICS_DEFAULT)
+  }
+}
+
+object BooleanFile {
+  def createBooleanFileWithOtherDataType(path: String, trueLines: Int): Boolean = {
+    try {
+      val write = new PrintWriter(path)
+      var d: Double = 0.0
+      for (i <- 0 until trueLines) {
+        write.println(i + "," + true + ",num" + i + "," + d + "," + false)
+        d = d + 1
+      }
+      for (i <- 0 until trueLines / 10) {
+        write.println((trueLines + i) + "," + false + ",num" + (trueLines + i) + "," + d + "," + true)
+        d = d + 1
+      }
+      write.close()
+    } catch {
+      case _: Exception => assert(false)
+    }
+    return true
+  }
+
+  def deleteFile(path: String): Boolean = {
+    try {
+      val file = new File(path)
+      file.delete()
+    } catch {
+      case _: Exception => assert(false)
+    }
+    return true
+  }
+
+  def createOnlyBooleanFile(path: String, num: Int): Boolean = {
+    try {
+      val write = new PrintWriter(path)
+      for (i <- 0 until num) {
+        write.println(true)
+      }
+      for (i <- 0 until num / 10) {
+        write.println(false)
+      }
+      write.close()
+    } catch {
+      case _: Exception => assert(false)
+    }
+    return true
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
new file mode 100644
index 0000000..66d74d5
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.booleantype
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class BooleanDataTypesFilterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists boolean_table")
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanOnlyBoolean.csv"
+
+    sql("CREATE TABLE if not exists carbon_table(booleanField BOOLEAN) STORED BY 'carbondata'")
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$storeLocation'
+         | INTO TABLE carbon_table
+         | OPTIONS('FILEHEADER' = 'booleanField')
+       """.stripMargin)
+
+    val booleanLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBoolean.csv"
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${booleanLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData')
+       """.stripMargin)
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists boolean_table")
+  }
+
+  test("Filtering table: support boolean, Expression: EqualToExpression, IsNotNullExpression, Or") {
+    checkAnswer(sql("select * from carbon_table where booleanField = true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField = true"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField = 'true'"),
+      Row(0))
+
+    checkAnswer(sql(
+      s"""
+         |select count(*)
+         |from carbon_table where booleanField = \"true\"
+         |""".stripMargin),
+      Row(0))
+
+    checkAnswer(sql("select * from carbon_table where booleanField = false"),
+      Seq(Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField = false"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField = 'false'"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField = null"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField = false or booleanField = true"),
+      Row(8))
+  }
+
+  test("Filtering table: support boolean, Expression: InExpression") {
+    checkAnswer(sql("select * from carbon_table where booleanField in (true)"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField in (true)"),
+      Row(4))
+
+    checkAnswer(sql("select * from carbon_table where booleanField in (false)"),
+      Seq(Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField in (false)"),
+      Row(4))
+
+    checkAnswer(sql("select * from carbon_table where booleanField in (true,false)"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField in (true,false)"),
+      Row(8))
+  }
+
+  test("Filtering table: support boolean, Expression: NotInExpression") {
+    checkAnswer(sql("select * from carbon_table where booleanField not in (false)"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField not in (true)"),
+      Row(4))
+
+    checkAnswer(sql("select * from carbon_table where booleanField not in (true)"),
+      Seq(Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField not in (true)"),
+      Row(4))
+
+    checkAnswer(sql("select * from carbon_table where booleanField not in (null)"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField not in (null)"),
+      Row(8))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField not in (true,false)"),
+      Row(0))
+  }
+
+  test("Filtering table: support boolean, Expression: NotEqualsExpression") {
+    checkAnswer(sql("select * from carbon_table where booleanField != false"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField != false"),
+      Row(4))
+
+    checkAnswer(sql("select * from carbon_table where booleanField != true"),
+      Seq(Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField != true"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField != null"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField != false or booleanField != true"),
+      Row(8))
+  }
+
+  test("Filtering table: support boolean, Expression: GreaterThanEqualToExpression") {
+    checkAnswer(sql("select * from carbon_table where booleanField >= true"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField >= true"),
+      Row(4))
+
+    checkAnswer(sql("select * from carbon_table where booleanField >= false"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField >=false"),
+      Row(8))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField >= null"),
+      Row(0))
+  }
+
+  test("Filtering table: support boolean, Expression: GreaterThanExpression") {
+    checkAnswer(sql("select * from carbon_table where booleanField > false"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField > false"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField > false"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField > null"),
+      Row(0))
+  }
+
+  test("Filtering table: support boolean, Expression: LessThanEqualToExpression") {
+    checkAnswer(sql("select * from carbon_table where booleanField <= false"),
+      Seq(Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField <= false"),
+      Row(4))
+
+    checkAnswer(sql("select * from carbon_table where booleanField <= true"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField <= true"),
+      Row(8))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField <= null"),
+      Row(0))
+  }
+
+  test("Filtering table: support boolean, Expression: LessThanExpression") {
+    checkAnswer(sql("select * from carbon_table where booleanField < true"),
+      Seq(Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField < true"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField < false"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField < null"),
+      Row(0))
+  }
+
+  test("Filtering table: support boolean, Expression: between") {
+    checkAnswer(sql("select * from carbon_table where booleanField < true and booleanField >= false"),
+      Seq(Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField < true and booleanField >= false"),
+      Row(4))
+  }
+
+  test("Filtering table: support boolean, Expression: empty") {
+    checkAnswer(sql("select count(*) from carbon_table where booleanField =''"),
+      Row(0))
+  }
+
+  test("Filtering table: support boolean, Expression: IsNotNull") {
+    checkAnswer(sql("select count(*) from carbon_table where booleanField is not null"),
+      Row(8))
+    checkAnswer(sql("select * from carbon_table where booleanField is not null"),
+      Seq(Row(true), Row(true), Row(true), Row(true), Row(false), Row(false), Row(false), Row(false)))
+  }
+
+  test("Filtering table: support boolean, Expression: IsNull") {
+    checkAnswer(sql("select count(*) from carbon_table where booleanField is null"),
+      Row(3))
+    checkAnswer(sql("select * from carbon_table where booleanField is null"),
+      Seq(Row(null), Row(null), Row(null)))
+  }
+
+  test("Filtering table: support boolean and other data type, and") {
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = false and shortField = 1"),
+      Row(3))
+
+    checkAnswer(sql("select count(*) from boolean_table where booleanField = false and stringField='flink'"),
+      Row(1))
+
+    checkAnswer(sql("select intField,booleanField,stringField from boolean_table where booleanField = false and stringField='flink'"),
+      Row(11, false, "flink"))
+  }
+
+  test("Filtering table: support boolean and other data type, GreaterThanEqualToExpression") {
+    checkAnswer(sql("select count(*) from boolean_table where booleanField <= false and shortField >= 1"),
+      Row(6))
+  }
+
+  test("Filtering table: support boolean, like") {
+    checkAnswer(sql("select * from carbon_table where booleanField like 't%'"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select * from carbon_table where booleanField like 'tru%'"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select * from carbon_table where booleanField like '%ue'"),
+      Seq(Row(true), Row(true), Row(true), Row(true)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like 't%'"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like 'T%'"),
+      Row(0))
+
+    checkAnswer(sql("select * from carbon_table where booleanField like 'f%'"),
+      Seq(Row(false), Row(false), Row(false), Row(false)))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like 'f%'"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like 'F%'"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like 'n%'"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like 'f%' or booleanField like 't%'"),
+      Row(8))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like '%e'"),
+      Row(8))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like '%a%'"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like '%z%'"),
+      Row(0))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like '%e'"),
+      Row(8))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like '_rue'"),
+      Row(4))
+
+    checkAnswer(sql("select count(*) from carbon_table where booleanField like 'f___e'"),
+      Row(4))
+  }
+
+  test("Filtering table: support boolean and other data type, two boolean column") {
+    sql("drop table if exists boolean_table2")
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val booleanLocation2 = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${booleanLocation2}'
+         | INTO TABLE boolean_table2
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+       """.stripMargin)
+    checkAnswer(sql("select count(*) from boolean_table2 where booleanField = false and booleanField2 = false"),
+      Row(4))
+    checkAnswer(sql("select count(*) from boolean_table2 where booleanField = true and booleanField2 = true"),
+      Row(3))
+    checkAnswer(sql("select count(*) from boolean_table2 where booleanField = false and booleanField2 = true"),
+      Row(2))
+    checkAnswer(sql("select count(*) from boolean_table2 where booleanField = true and booleanField2 = false"),
+      Row(1))
+    sql("drop table if exists boolean_table2")
+  }
+
+  test("Filtering table: support boolean and other data type, load twice") {
+    sql("drop table if exists boolean_table2")
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val booleanLocation2 = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${booleanLocation2}'
+         | INTO TABLE boolean_table2
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${booleanLocation2}'
+         | INTO TABLE boolean_table2
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+       """.stripMargin)
+    checkAnswer(sql("select count(*) from boolean_table2 where booleanField = false and booleanField2 = false"),
+      Row(8))
+    checkAnswer(sql("select count(*) from boolean_table2 where booleanField = true and booleanField2 = true"),
+      Row(6))
+    checkAnswer(sql("select count(*) from boolean_table2 where booleanField = false and booleanField2 = true"),
+      Row(4))
+    checkAnswer(sql("select count(*) from boolean_table2 where booleanField = true and booleanField2 = false"),
+      Row(2))
+    sql("drop table if exists boolean_table2")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
new file mode 100644
index 0000000..2f06900
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
@@ -0,0 +1,948 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.booleantype
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+
+  override def beforeEach(): Unit = {
+    sql("drop table if exists boolean_one_column")
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+    sql("drop table if exists boolean_table3")
+    sql("drop table if exists boolean_table4")
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+    sql("CREATE TABLE if not exists boolean_one_column(booleanField BOOLEAN) STORED BY 'carbondata'")
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists boolean_one_column")
+    sql("drop table if exists boolean_table")
+    sql("drop table if exists boolean_table2")
+    sql("drop table if exists boolean_table3")
+    sql("drop table if exists boolean_table4")
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists hive_table")
+  }
+
+  test("Inserting and selecting table: one column boolean, should support") {
+    sql("insert into boolean_one_column values(true)")
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true))
+    )
+  }
+
+  test("Inserting and selecting table: one column boolean and many rows, should support") {
+    sql("insert into boolean_one_column values(true)")
+    sql("insert into boolean_one_column values(True)")
+    sql("insert into boolean_one_column values(TRUE)")
+    sql("insert into boolean_one_column values('true')")
+    sql("insert into boolean_one_column values(False)")
+    sql("insert into boolean_one_column values(false)")
+    sql("insert into boolean_one_column values(FALSE)")
+    sql("insert into boolean_one_column values('false')")
+    sql("insert into boolean_one_column values('tr')")
+    sql("insert into boolean_one_column values(null)")
+    sql("insert into boolean_one_column values('truEe')")
+    sql("insert into boolean_one_column values('falsEe')")
+    sql("insert into boolean_one_column values('t')")
+    sql("insert into boolean_one_column values('f')")
+
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true), Row(true), Row(true), Row(true),
+        Row(false), Row(false), Row(false), Row(false),
+        Row(null), Row(null), Row(null), Row(null), Row(null), Row(null))
+    )
+  }
+
+  test("Inserting and selecting table: create one column boolean table and insert two columns") {
+    sql("insert into boolean_one_column values(true,false)")
+    sql("insert into boolean_one_column values(True)")
+    sql("insert into boolean_one_column values(false,true)")
+
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true), Row(true), Row(false))
+    )
+  }
+
+  test("Inserting and selecting table: two columns boolean and many rows, should support") {
+    sql("CREATE TABLE if not exists boolean_table2(col1 BOOLEAN, col2 BOOLEAN) STORED BY 'carbondata'")
+
+    sql("insert into boolean_table2 values(true,true)")
+    sql("insert into boolean_table2 values(True,false)")
+    sql("insert into boolean_table2 values(TRUE,false)")
+    sql("insert into boolean_table2 values(false,true)")
+    sql("insert into boolean_table2 values(FALSE,false)")
+    sql("insert into boolean_table2 values('false',false)")
+    sql("insert into boolean_table2 values(null,true)")
+
+    checkAnswer(
+      sql("select * from boolean_table2"),
+      Seq(Row(true, true), Row(true, false), Row(true, false),
+        Row(false, true), Row(false, false), Row(false, false), Row(null, true))
+    )
+  }
+
+  test("Inserting and selecting table: two columns and other data type, should support") {
+    sql("CREATE TABLE if not exists boolean_table2(col1 INT, col2 BOOLEAN) STORED BY 'carbondata'")
+
+    sql("insert into boolean_table2 values(1,true)")
+    sql("insert into boolean_table2 values(100,true)")
+    sql("insert into boolean_table2 values(1991,false)")
+    sql("insert into boolean_table2 values(906,false)")
+    sql("insert into boolean_table2 values(218,false)")
+    sql("insert into boolean_table2 values(1011,false)")
+
+    checkAnswer(
+      sql("select * from boolean_table2"),
+      Seq(Row(1, true), Row(100, true), Row(1991, false),
+        Row(906, false), Row(218, false), Row(1011, false))
+    )
+  }
+
+  test("Inserting into table with another table: support boolean data type and other format") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table3(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | stringField STRING,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE boolean_table4(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | stringField STRING,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='')
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert into boolean_table2 select * from boolean_table")
+    sql("insert into boolean_table3 select shortField,booleanField,intField,stringField,booleanField2 from boolean_table")
+    sql("insert into boolean_table4 select shortField,booleanField,intField,stringField,booleanField2 from boolean_table where shortField > 3")
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table2"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table3"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table4"),
+      Seq(Row(false, 17), Row(false, 16))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table2"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table3"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+  }
+
+  test("Inserting with the order of data type in source and target table columns being different") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | booleanField BOOLEAN,
+         | shortField SHORT,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert into boolean_table2 select * from boolean_table")
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table2"),
+      Seq(Row(null, 10), Row(null, 17), Row(null, 11),
+        Row(null, 10), Row(null, 10), Row(null, 14),
+        Row(null, 10), Row(null, 10), Row(null, 16), Row(null, 10))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table2"),
+      Seq(Row(null, 10, true), Row(null, 17, true), Row(null, 11, true),
+        Row(null, 10, true), Row(null, 10, true), Row(null, 14, false),
+        Row(null, 10, false), Row(null, 10, false), Row(null, 16, false), Row(null, 10, false))
+    )
+  }
+
+  test("Inserting with the number of data type in source and target table columns being different, source more than target") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert into boolean_table2 select * from boolean_table")
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table2"),
+      Seq(Row(true, 10, null), Row(false, 17, null), Row(false, 11, null),
+        Row(true, 10, null), Row(true, 10, null), Row(true, 14, null),
+        Row(false, 10, null), Row(false, 10, null), Row(false, 16, null), Row(false, 10, null))
+    )
+  }
+
+  test("Inserting with the number of data type in source and target table columns being different, source less than target") {
+    val exception_insert: Exception =intercept[Exception] {
+      sql(
+        s"""
+           | CREATE TABLE boolean_table(
+           | shortField SHORT,
+           | booleanField BOOLEAN,
+           | intField INT,
+           | bigintField LONG,
+           | doubleField DOUBLE,
+           | stringField STRING,
+           | timestampField TIMESTAMP,
+           | decimalField DECIMAL(18,2),
+           | dateField DATE,
+           | charField CHAR(5),
+           | floatField FLOAT,
+           | complexData ARRAY<STRING>
+           | )
+           | STORED BY 'carbondata'
+           | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+      sql(
+        s"""
+           | CREATE TABLE boolean_table2(
+           | shortField SHORT,
+           | booleanField BOOLEAN,
+           | intField INT,
+           | bigintField LONG,
+           | doubleField DOUBLE,
+           | stringField STRING,
+           | timestampField TIMESTAMP,
+           | decimalField DECIMAL(18,2),
+           | dateField DATE,
+           | charField CHAR(5),
+           | floatField FLOAT,
+           | complexData ARRAY<STRING>,
+           | booleanField2 BOOLEAN
+           | )
+           | STORED BY 'carbondata'
+           | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+      val rootPath = new File(this.getClass.getResource("/").getPath
+        + "../../../..").getCanonicalPath
+      val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '${storeLocation}'
+           | INTO TABLE boolean_table
+           | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+      sql("insert into boolean_table2 select * from boolean_table")
+    }
+    assert(exception_insert.getMessage.contains("Cannot insert into target table because column number are different"))
+  }
+
+  test("Inserting into Hive table from carbon table: support boolean data type and other format") {
+    sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE hive_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE carbon_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert into hive_table select * from carbon_table where shortField = 1 and booleanField = true")
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from carbon_table"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from hive_table"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from carbon_table where exists (select booleanField,intField,booleanField2 " +
+        "from hive_table where carbon_table.intField=hive_table.intField)"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true), Row(false, 10, false), Row(false, 10, false), Row(false, 10, false))
+    )
+  }
+
+  test("Inserting into carbon table from Hive table: support boolean data type and other format") {
+    sql(
+      s"""
+         | CREATE TABLE hive_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE hive_table
+           """.stripMargin)
+
+    sql("insert into carbon_table select * from hive_table where shortField = 1 and booleanField = true")
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from hive_table"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from carbon_table"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from hive_table where exists (select booleanField,intField,booleanField2 " +
+        "from carbon_table where hive_table.intField=carbon_table.intField)"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true), Row(false, 10, false), Row(false, 10, false), Row(false, 10, false))
+    )
+  }
+
+  test("Inserting overwrite: one column boolean and many rows, should support") {
+    sql("insert into boolean_one_column values(True)")
+
+    sql("insert overwrite table boolean_one_column values(false)")
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(false))
+    )
+
+    sql("insert overwrite table boolean_one_column values(true)")
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true))
+    )
+
+    sql("insert overwrite table boolean_one_column values(null)")
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(null))
+    )
+
+    sql("insert overwrite table boolean_one_column values(true)")
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true))
+    )
+
+    sql("insert overwrite table boolean_one_column values('t')")
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(null))
+    )
+  }
+
+  test("Inserting overwrite: create one column boolean table and insert two columns") {
+    sql("insert overwrite table boolean_one_column values(true,false)")
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(true))
+    )
+    sql("insert overwrite table boolean_one_column values(True)")
+    sql("insert overwrite table boolean_one_column values(false,true)")
+    checkAnswer(
+      sql("select * from boolean_one_column"),
+      Seq(Row(false))
+    )
+  }
+
+  test("Inserting overwrite: two columns boolean and many rows, should support") {
+    sql("CREATE TABLE if not exists boolean_table2(col1 BOOLEAN, col2 BOOLEAN) STORED BY 'carbondata'")
+
+    sql("insert overwrite table boolean_table2 values(true,true)")
+    checkAnswer(
+      sql("select * from boolean_table2"),
+      Seq(Row(true, true))
+    )
+
+    sql("insert overwrite table boolean_table2 values(True,false)")
+    checkAnswer(
+      sql("select * from boolean_table2"),
+      Seq(Row(true, false))
+    )
+    sql("insert overwrite table boolean_table2 values(FALSE,false)")
+    sql("insert overwrite table boolean_table2 values('false',false)")
+    checkAnswer(
+      sql("select * from boolean_table2"),
+      Seq(Row(false, false))
+    )
+    sql("insert overwrite table boolean_table2 values(null,true)")
+    checkAnswer(
+      sql("select * from boolean_table2"),
+      Seq(Row(null, true))
+    )
+  }
+
+  test("Inserting overwrite: two columns and other data type, should support") {
+    sql("CREATE TABLE if not exists boolean_table2(col1 INT, col2 BOOLEAN) STORED BY 'carbondata'")
+    sql("insert overwrite table boolean_table2 values(1,true)")
+    checkAnswer(
+      sql("select * from boolean_table2"),
+      Seq(Row(1, true))
+    )
+    sql("insert overwrite table boolean_table2 values(100,true)")
+    sql("insert overwrite table boolean_table2 values(1991,false)")
+    checkAnswer(
+      sql("select * from boolean_table2"),
+      Seq(Row(1991, false))
+    )
+    sql("insert overwrite table boolean_table2 values(906,false)")
+    checkAnswer(
+      sql("select * from boolean_table2"),
+      Seq(Row(906, false))
+    )
+    sql("insert overwrite table boolean_table2 values(218,false)")
+    checkAnswer(
+      sql("select * from boolean_table2"),
+      Seq(Row(218, false))
+    )
+    sql("insert overwrite table boolean_table2 values(1011,true)")
+    checkAnswer(
+      sql("select * from boolean_table2"),
+      Seq(Row(1011, true))
+    )
+  }
+
+  test("Inserting overwrite: overwrite table with another table: support boolean data type and other format") {
+    sql(
+      s"""
+         | CREATE TABLE boolean_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table2(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE boolean_table3(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | stringField STRING,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='')
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE boolean_table4(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | stringField STRING,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='')
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE boolean_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert overwrite table boolean_table2 select * from boolean_table")
+    sql("insert overwrite table boolean_table3 select shortField,booleanField,intField,stringField,booleanField2 from boolean_table")
+    sql("insert overwrite table boolean_table4 select shortField,booleanField,intField,stringField,booleanField2 from boolean_table where shortField > 3")
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table2"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table3"),
+      Seq(Row(true, 10), Row(false, 17), Row(false, 11),
+        Row(true, 10), Row(true, 10), Row(true, 14),
+        Row(false, 10), Row(false, 10), Row(false, 16), Row(false, 10))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField from boolean_table4"),
+      Seq(Row(false, 17), Row(false, 16))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table2"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from boolean_table3"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+  }
+
+  test("Inserting overwrite: overwrite table Hive table from carbon table: support boolean data type and other format") {
+    sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE hive_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE carbon_table
+         | options('FILEHEADER'='shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData,booleanField2')
+           """.stripMargin)
+
+    sql("insert overwrite table hive_table select * from carbon_table where shortField = 1 and booleanField = true")
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from carbon_table"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from hive_table"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from carbon_table where exists (select booleanField,intField,booleanField2 " +
+        "from hive_table where carbon_table.intField=hive_table.intField)"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true), Row(false, 10, false), Row(false, 10, false), Row(false, 10, false))
+    )
+  }
+
+  test("Inserting overwrite: overwrite table carbon table from Hive table: support boolean data type and other format") {
+    sql(
+      s"""
+         | CREATE TABLE hive_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         | shortField SHORT,
+         | booleanField BOOLEAN,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT,
+         | complexData ARRAY<STRING>,
+         | booleanField2 BOOLEAN
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('sort_columns'='','DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv"
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${storeLocation}'
+         | INTO TABLE hive_table
+           """.stripMargin)
+
+    sql("insert overwrite table carbon_table select * from hive_table where shortField = 1 and booleanField = true")
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from hive_table"),
+      Seq(Row(true, 10, true), Row(false, 17, true), Row(false, 11, true),
+        Row(true, 10, true), Row(true, 10, true), Row(true, 14, false),
+        Row(false, 10, false), Row(false, 10, false), Row(false, 16, false), Row(false, 10, false))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from carbon_table"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true))
+    )
+
+    checkAnswer(
+      sql("select booleanField,intField,booleanField2 from hive_table where exists (select booleanField,intField,booleanField2 " +
+        "from carbon_table where hive_table.intField=carbon_table.intField)"),
+      Seq(Row(true, 10, true), Row(true, 10, true), Row(true, 10, true), Row(false, 10, false), Row(false, 10, false), Row(false, 10, false))
+    )
+  }
+
+}


[3/3] carbondata git commit: [CARBONDATA-1444] Support Boolean data type

Posted by ja...@apache.org.
[CARBONDATA-1444] Support Boolean data type

Spark/Hive table support Boolean data type, the internal table also should support Boolean data type.

Boolean data type Range: TRUE or FALSE. Do not use quotation marks around the TRUE and FALSE literal values. You can write the literal values in uppercase, lowercase, or mixed case. The values queried from a table are always returned in lowercase, true or false.

This closes #1362


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6abdd97a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6abdd97a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6abdd97a

Branch: refs/heads/master
Commit: 6abdd97a42e945a0df61573aa477c9b3ddb7af02
Parents: 228ab2f
Author: xubo245 <60...@qq.com>
Authored: Mon Oct 16 19:59:50 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Oct 20 08:36:09 2017 +0800

----------------------------------------------------------------------
 .../core/datastore/page/ColumnPage.java         |  43 +-
 .../core/datastore/page/LazyColumnPage.java     |   4 +-
 .../page/UnsafeFixLengthColumnPage.java         |   3 +-
 .../page/encoding/ColumnPageEncoderMeta.java    |   6 +-
 .../page/encoding/DefaultEncodingFactory.java   |   5 +-
 .../page/encoding/EncodingFactory.java          |   5 +
 .../page/encoding/bool/BooleanConvert.java      |  63 ++
 .../datastore/page/encoding/rle/RLECodec.java   |   5 +-
 .../statistics/PrimitivePageStatsCollector.java |  24 +-
 .../ThriftWrapperSchemaConverterImpl.java       |   6 +-
 .../impl/AbstractScannedResultCollector.java    |   4 +-
 .../conditional/EqualToExpression.java          |   4 +-
 .../GreaterThanEqualToExpression.java           |   4 +-
 .../conditional/GreaterThanExpression.java      |   4 +-
 .../expression/conditional/InExpression.java    |   4 +-
 .../conditional/LessThanEqualToExpression.java  |   4 +-
 .../conditional/LessThanExpression.java         |   4 +-
 .../conditional/NotEqualsExpression.java        |   4 +-
 .../expression/conditional/NotInExpression.java |   4 +-
 .../carbondata/core/scan/filter/FilterUtil.java |   5 +-
 .../executer/ExcludeFilterExecuterImpl.java     |   4 +-
 .../executer/IncludeFilterExecuterImpl.java     |   4 +-
 .../executer/RowLevelFilterExecuterImpl.java    |   8 +-
 .../vector/MeasureDataVectorProcessor.java      |  61 +-
 .../util/AbstractDataFileFooterConverter.java   |   2 +
 .../core/util/CarbonMetadataUtil.java           |   4 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   9 +-
 .../carbondata/core/util/DataTypeUtil.java      |  31 +-
 .../core/util/comparator/Comparator.java        |  28 +-
 .../ThriftWrapperSchemaConverterImplTest.java   |  12 +-
 .../conditional/EqualToExpressionUnitTest.java  |   2 +-
 .../GreaterThanEqualToExpressionUnitTest.java   |   2 +-
 .../GreaterThanExpressionUnitTest.java          |   2 +-
 .../conditional/InExpressionUnitTest.java       |   2 +-
 .../LessThanEqualToExpressionUnitTest.java      |   2 +-
 .../conditional/LessThanExpressionUnitTest.java |   2 +-
 .../NotEqualsExpressionUnitTest.java            |   2 +-
 .../conditional/NotInExpressionUnitTest.java    |   2 +-
 .../DataCompactionBoundaryConditionsTest.scala  |   3 +
 .../spark/util/DataTypeConverterUtil.scala      |   4 +
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +-
 .../VectorizedCarbonRecordReader.java           |   3 +-
 .../src/test/resources/bool/supportBoolean.csv  |  10 +
 .../resources/bool/supportBooleanBadRecords.csv |  10 +
 .../bool/supportBooleanDifferentFormat.csv      |  20 +
 .../bool/supportBooleanOnlyBoolean.csv          |  11 +
 .../bool/supportBooleanTwoBooleanColumns.csv    |  10 +
 .../bool/supportBooleanWithFileHeader.csv       |  11 +
 .../booleantype/BooleanDataTypesBaseTest.scala  | 157 +++
 .../BooleanDataTypesBigFileTest.scala           | 729 ++++++++++++++
 .../BooleanDataTypesFilterTest.scala            | 416 ++++++++
 .../BooleanDataTypesInsertTest.scala            | 948 +++++++++++++++++++
 .../booleantype/BooleanDataTypesLoadTest.scala  | 744 +++++++++++++++
 .../BooleanDataTypesParameterTest.scala         | 288 ++++++
 .../booleantype/BooleanDataTypesSortTest.scala  | 145 +++
 .../sort/unsafe/UnsafeCarbonRowPage.java        |  12 +-
 .../sort/sortdata/IntermediateFileMerger.java   |   4 +-
 .../processing/sort/sortdata/SortDataRows.java  |   4 +-
 .../sort/sortdata/SortTempFileChunkHolder.java  |   4 +-
 59 files changed, 3848 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 012413b..35bc560 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
@@ -184,7 +185,9 @@ public abstract class ColumnPage {
       int pageSize) throws MemoryException {
     ColumnPage instance;
     if (unsafe) {
-      if (dataType == DataTypes.BYTE ||
+      if (dataType == DataTypes.BOOLEAN) {
+        instance = new UnsafeFixLengthColumnPage(columnSpec, BYTE, pageSize);
+      } else if (dataType == DataTypes.BYTE ||
           dataType == DataTypes.SHORT ||
           dataType == DataTypes.SHORT_INT ||
           dataType == DataTypes.INT ||
@@ -200,7 +203,7 @@ public abstract class ColumnPage {
         throw new RuntimeException("Unsupported data dataType: " + dataType);
       }
     } else {
-      if (dataType == DataTypes.BYTE) {
+      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
         instance = newBytePage(columnSpec, new byte[pageSize]);
       } else if (dataType == DataTypes.SHORT) {
         instance = newShortPage(columnSpec, new short[pageSize]);
@@ -344,7 +347,10 @@ public abstract class ColumnPage {
       nullBitSet.set(rowId);
       return;
     }
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
+      if (columnSpec.getSchemaDataType() == DataTypes.BOOLEAN) {
+        value = BooleanConvert.boolean2Byte((Boolean) value);
+      }
       putByte(rowId, (byte) value);
       statsCollector.update((byte) value);
     } else if (dataType == DataTypes.SHORT) {
@@ -411,6 +417,13 @@ public abstract class ColumnPage {
   public abstract void putShortInt(int rowId, int value);
 
   /**
+   * Set boolean value at rowId
+   */
+  public void putBoolean(int rowId, boolean value) {
+    putByte(rowId, BooleanConvert.boolean2Byte(value));
+  }
+
+  /**
    * Set byte array from offset to length at rowId
    */
   public abstract void putBytes(int rowId, byte[] bytes, int offset, int length);
@@ -420,7 +433,9 @@ public abstract class ColumnPage {
    * Set null at rowId
    */
   private void putNull(int rowId) {
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN) {
+      putBoolean(rowId, false);
+    } else if (dataType == DataTypes.BYTE) {
       putByte(rowId, (byte) 0);
     } else if (dataType == DataTypes.SHORT) {
       putShort(rowId, (short) 0);
@@ -453,6 +468,13 @@ public abstract class ColumnPage {
   public abstract int getShortInt(int rowId);
 
   /**
+   * Get boolean value at rowId
+   */
+  public boolean getBoolean(int rowId) {
+    return BooleanConvert.byte2Boolean(getByte(rowId));
+  }
+
+  /**
    * Get int value at rowId
    */
   public abstract int getInt(int rowId);
@@ -498,6 +520,13 @@ public abstract class ColumnPage {
   public abstract byte[] getShortIntPage();
 
   /**
+   * Get boolean value page
+   */
+  public byte[] getBooleanPage() {
+    return getBytePage();
+  }
+
+  /**
    * Get int value page
    */
   public abstract int[] getIntPage();
@@ -541,7 +570,9 @@ public abstract class ColumnPage {
    * Compress page data using specified compressor
    */
   public byte[] compress(Compressor compressor) throws MemoryException, IOException {
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN) {
+      return compressor.compressByte(getBooleanPage());
+    } else if (dataType == DataTypes.BYTE) {
       return compressor.compressByte(getBytePage());
     } else if (dataType == DataTypes.SHORT) {
       return compressor.compressShort(getShortPage());
@@ -574,7 +605,7 @@ public abstract class ColumnPage {
     Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
     TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
     DataType storeDataType = meta.getStoreDataType();
-    if (storeDataType == DataTypes.BYTE) {
+    if (storeDataType == DataTypes.BOOLEAN || storeDataType == DataTypes.BYTE) {
       byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
       return newBytePage(columnSpec, byteData);
     } else if (storeDataType == DataTypes.SHORT) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index cebb3c0..11bdaca 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -53,7 +53,7 @@ public class LazyColumnPage extends ColumnPage {
   @Override
   public long getLong(int rowId) {
     DataType dataType = columnPage.getDataType();
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
       return converter.decodeLong(columnPage.getByte(rowId));
     } else if (dataType == DataTypes.SHORT) {
       return converter.decodeLong(columnPage.getShort(rowId));
@@ -71,7 +71,7 @@ public class LazyColumnPage extends ColumnPage {
   @Override
   public double getDouble(int rowId) {
     DataType dataType = columnPage.getDataType();
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
       return converter.decodeDouble(columnPage.getByte(rowId));
     } else if (dataType == DataTypes.SHORT) {
       return converter.decodeDouble(columnPage.getShort(rowId));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 055c27a..5695b70 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -56,7 +56,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
       throws MemoryException {
     super(columnSpec, dataType, pageSize);
-    if (dataType == DataTypes.BYTE ||
+    if (dataType == DataTypes.BOOLEAN ||
+        dataType == DataTypes.BYTE ||
         dataType == DataTypes.SHORT ||
         dataType == DataTypes.INT ||
         dataType == DataTypes.LONG ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index daccc32..a38da84 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -79,8 +79,6 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
     }
   }
 
-
-
   public DataType getStoreDataType() {
     return storeDataType;
   }
@@ -108,7 +106,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
 
   private void writeMinMax(DataOutput out) throws IOException {
     DataType dataType = columnSpec.getSchemaDataType();
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
       out.writeByte((byte) getMaxValue());
       out.writeByte((byte) getMinValue());
       out.writeLong(0L); // unique value is obsoleted, maintain for compatibility
@@ -151,7 +149,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
 
   private void readMinMax(DataInput in) throws IOException {
     DataType dataType = columnSpec.getSchemaDataType();
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
       this.setMaxValue(in.readByte());
       this.setMinValue(in.readByte());
       in.readLong();  // for non exist value which is obsoleted, it is backward compatibility;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index d78d144..03af657 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.Compl
 import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DictDimensionIndexCodec;
 import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DirectDictDimensionIndexCodec;
 import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.HighCardDictDimensionIndexCodec;
+import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -113,7 +114,9 @@ public class DefaultEncodingFactory extends EncodingFactory {
   private ColumnPageEncoder createEncoderForMeasure(ColumnPage columnPage) {
     SimpleStatsResult stats = columnPage.getStatistics();
     DataType dataType = stats.getDataType();
-    if (dataType == DataTypes.BYTE ||
+    if (dataType == DataTypes.BOOLEAN) {
+      return new RLECodec().createEncoder(null);
+    } else if (dataType == DataTypes.BYTE ||
         dataType == DataTypes.SHORT ||
         dataType == DataTypes.INT ||
         dataType == DataTypes.LONG) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 180228a..c298525 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.format.Encoding;
 import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL;
 import static org.apache.carbondata.format.Encoding.ADAPTIVE_FLOATING;
 import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL;
+import static org.apache.carbondata.format.Encoding.BOOL_BYTE;
 import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS;
 import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL;
 
@@ -94,6 +95,10 @@ public abstract class EncodingFactory {
       RLEEncoderMeta metadata = new RLEEncoderMeta();
       metadata.readFields(in);
       return new RLECodec().createDecoder(metadata);
+    } else if (encoding == BOOL_BYTE) {
+      RLEEncoderMeta metadata = new RLEEncoderMeta();
+      metadata.readFields(in);
+      return new RLECodec().createDecoder(metadata);
     } else {
       // for backward compatibility
       ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/bool/BooleanConvert.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/bool/BooleanConvert.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/bool/BooleanConvert.java
new file mode 100644
index 0000000..b373adf
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/bool/BooleanConvert.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.bool;
+
+/**
+ * convert tools for boolean data type
+ */
+public class BooleanConvert {
+
+  public static final byte TRUE_VALUE = 1;
+  public static final byte FALSE_VALUE = 0;
+  /**
+   * convert boolean to byte
+   *
+   * @param data data of boolean data type
+   * @return byte type data by convert
+   */
+  public static byte boolean2Byte(boolean data) {
+    return data ? TRUE_VALUE : FALSE_VALUE;
+  }
+
+  /**
+   * convert byte to boolean
+   *
+   * @param data byte type data
+   * @return boolean type data
+   */
+  public static boolean byte2Boolean(int data) {
+    return data == TRUE_VALUE;
+  }
+
+  /**
+   * parse boolean, true and false to boolean
+   *
+   * @param input string type data
+   * @return Boolean type data
+   */
+  public static Boolean parseBoolean(String input) {
+    String value = input.toLowerCase();
+    if (("false").equals(value)) {
+      return Boolean.FALSE;
+    } else if (("true").equals(value)) {
+      return Boolean.TRUE;
+    } else {
+      throw new NumberFormatException("Not a valid Boolean type");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index 809bac0..7007084 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -71,7 +71,8 @@ public class RLECodec implements ColumnPageCodec {
 
   // This codec supports integral type only
   private void validateDataType(DataType dataType) {
-    if (! (dataType == DataTypes.BYTE || dataType == DataTypes.SHORT || dataType == DataTypes.INT ||
+    if (! (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE ||
+        dataType == DataTypes.SHORT || dataType == DataTypes.INT ||
         dataType == DataTypes.LONG)) {
       throw new UnsupportedOperationException(dataType + " is not supported for RLE");
     }
@@ -293,7 +294,7 @@ public class RLECodec implements ColumnPageCodec {
       DataType dataType = columnSpec.getSchemaDataType();
       DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length));
       ColumnPage resultPage = ColumnPage.newPage(columnSpec, dataType, pageSize);
-      if (dataType == DataTypes.BYTE) {
+      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
         decodeBytePage(in, resultPage);
       } else if (dataType == DataTypes.SHORT) {
         decodeShortPage(in, resultPage);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
index 304d998..ed92622 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -20,10 +20,14 @@ package org.apache.carbondata.core.datastore.page.statistics;
 import java.math.BigDecimal;
 
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
+import static org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert.FALSE_VALUE;
+import static org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert.TRUE_VALUE;
+
 /** statics for primitive column page */
 public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, SimpleStatsResult {
   private DataType dataType;
@@ -53,7 +57,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
         meta.getScale(), meta.getPrecision());
     // set min max from meta
     DataType dataType = meta.getSchemaDataType();
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
       instance.minByte = (byte) meta.getMinValue();
       instance.maxByte = (byte) meta.getMaxValue();
     } else if (dataType == DataTypes.SHORT) {
@@ -87,7 +91,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
         new PrimitivePageStatsCollector(DataType.getDataType(meta.getType()), -1, -1);
     // set min max from meta
     DataType dataType = DataType.getDataType(meta.getType());
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
       instance.minByte = (byte) meta.getMinValue();
       instance.maxByte = (byte) meta.getMaxValue();
     } else if (dataType == DataTypes.SHORT) {
@@ -118,7 +122,10 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
 
   private PrimitivePageStatsCollector(DataType dataType, int scale, int precision) {
     this.dataType = dataType;
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN) {
+      minByte = TRUE_VALUE;
+      maxByte = FALSE_VALUE;
+    } else if (dataType == DataTypes.BYTE) {
       minByte = Byte.MAX_VALUE;
       maxByte = Byte.MIN_VALUE;
     } else if (dataType == DataTypes.SHORT) {
@@ -148,7 +155,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   @Override
   public void updateNull(int rowId) {
     long value = 0;
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
       update((byte) value);
     } else if (dataType == DataTypes.SHORT) {
       update((short) value);
@@ -270,7 +277,10 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
 
   @Override
   public String toString() {
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN) {
+      return String.format("min: %s, max: %s ", BooleanConvert.byte2Boolean(minByte),
+              BooleanConvert.byte2Boolean(minByte));
+    } else if (dataType == DataTypes.BYTE) {
       return String.format("min: %s, max: %s, decimal: %s ", minByte, maxByte, decimal);
     } else if (dataType == DataTypes.SHORT) {
       return String.format("min: %s, max: %s, decimal: %s ", minShort, maxShort, decimal);
@@ -286,7 +296,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
 
   @Override
   public Object getMin() {
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
       return minByte;
     } else if (dataType == DataTypes.SHORT) {
       return minShort;
@@ -304,7 +314,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
 
   @Override
   public Object getMax() {
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
       return maxByte;
     } else if (dataType == DataTypes.SHORT) {
       return maxShort;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 6b2cb90..7faa7e6 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -127,7 +127,9 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
       return null;
     }
     // data type object maybe created by GSON, use id to compare the type instead of object address
-    if (dataType.getId() == DataTypes.STRING.getId()) {
+    if (dataType.getId() == DataTypes.BOOLEAN.getId()) {
+      return org.apache.carbondata.format.DataType.BOOLEAN;
+    } else if (dataType.getId() == DataTypes.STRING.getId()) {
       return org.apache.carbondata.format.DataType.STRING;
     } else if (dataType.getId() == DataTypes.INT.getId()) {
       return org.apache.carbondata.format.DataType.INT;
@@ -361,6 +363,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
       return null;
     }
     switch (dataType) {
+      case BOOLEAN:
+        return DataTypes.BOOLEAN;
       case STRING:
         return DataTypes.STRING;
       case INT:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
index 1fa42dc..7e24413 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -91,7 +91,9 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
       CarbonMeasure carbonMeasure) {
     if (!dataChunk.getNullBits().get(index)) {
       DataType dataType = carbonMeasure.getDataType();
-      if (dataType == DataTypes.SHORT) {
+      if (dataType == DataTypes.BOOLEAN) {
+        return dataChunk.getBoolean(index);
+      } else if (dataType == DataTypes.SHORT) {
         return (short) dataChunk.getLong(index);
       } else if (dataType == DataTypes.INT) {
         return (int) dataChunk.getLong(index);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
index 780ca89..f143189 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
@@ -67,7 +67,9 @@ public class EqualToExpression extends BinaryConditionalExpression {
     }
 
     DataType dataType = val1.getDataType();
-    if (dataType == DataTypes.STRING) {
+    if (dataType == DataTypes.BOOLEAN) {
+      result = val1.getBoolean().equals(val2.getBoolean());
+    } else if (dataType == DataTypes.STRING) {
       result = val1.getString().equals(val2.getString());
     } else if (dataType == DataTypes.SHORT) {
       result = val1.getShort().equals(val2.getShort());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
index 469672e..1472959 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
@@ -50,7 +50,9 @@ public class GreaterThanEqualToExpression extends BinaryConditionalExpression {
     }
     boolean result = false;
     DataType dataType = exprResVal1.getDataType();
-    if (dataType == DataTypes.STRING) {
+    if (dataType == DataTypes.BOOLEAN) {
+      result = elRes.getBoolean().compareTo(erRes.getBoolean()) >= 0;
+    } else if (dataType == DataTypes.STRING) {
       result = elRes.getString().compareTo(erRes.getString()) >= 0;
     } else if (dataType == DataTypes.SHORT) {
       result = elRes.getShort() >= (erRes.getShort());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
index 931de6c..b8a8a7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
@@ -52,7 +52,9 @@ public class GreaterThanExpression extends BinaryConditionalExpression {
     }
     boolean result = false;
     DataType dataType = val1.getDataType();
-    if (dataType == DataTypes.STRING) {
+    if (dataType == DataTypes.BOOLEAN) {
+      result = exprLeftRes.getBoolean().compareTo(exprRightRes.getBoolean()) > 0;
+    } else if (dataType == DataTypes.STRING) {
       result = exprLeftRes.getString().compareTo(exprRightRes.getString()) > 0;
     } else if (dataType == DataTypes.DOUBLE) {
       result = exprLeftRes.getDouble() > (exprRightRes.getDouble());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
index bc714f7..7243741 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
@@ -54,7 +54,9 @@ public class InExpression extends BinaryConditionalExpression {
           val = expressionResVal;
         }
         DataType dataType = val.getDataType();
-        if (dataType == DataTypes.STRING) {
+        if (dataType == DataTypes.BOOLEAN) {
+          val = new ExpressionResult(val.getDataType(), expressionResVal.getBoolean());
+        } else if (dataType == DataTypes.STRING) {
           val = new ExpressionResult(val.getDataType(), expressionResVal.getString());
         } else if (dataType == DataTypes.SHORT) {
           val = new ExpressionResult(val.getDataType(), expressionResVal.getShort());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
index f416a6b..6a9fc3c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
@@ -50,7 +50,9 @@ public class LessThanEqualToExpression extends BinaryConditionalExpression {
     }
     boolean result = false;
     DataType dataType = exprResValue1.getDataType();
-    if (dataType == DataTypes.STRING) {
+    if (dataType == DataTypes.BOOLEAN) {
+      result = elRes.getBoolean().compareTo(erRes.getBoolean()) <= 0;
+    } else if (dataType == DataTypes.STRING) {
       result = elRes.getString().compareTo(erRes.getString()) <= 0;
     } else if (dataType == DataTypes.SHORT) {
       result = elRes.getShort() <= (erRes.getShort());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
index c0d7c10..4283d83 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
@@ -54,7 +54,9 @@ public class LessThanExpression extends BinaryConditionalExpression {
 
     }
     DataType dataType = val1.getDataType();
-    if (dataType == DataTypes.STRING) {
+    if (dataType == DataTypes.BOOLEAN) {
+      result = elRes.getBoolean().compareTo(erRes.getBoolean()) < 0;
+    } else if (dataType == DataTypes.STRING) {
       result = elRes.getString().compareTo(erRes.getString()) < 0;
     } else if (dataType == DataTypes.SHORT) {
       result = elRes.getShort() < (erRes.getShort());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
index 8930c94..5055caf 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
@@ -63,7 +63,9 @@ public class NotEqualsExpression extends BinaryConditionalExpression {
       }
     }
     DataType dataType = val1.getDataType();
-    if (dataType == DataTypes.STRING) {
+    if (dataType == DataTypes.BOOLEAN) {
+      result = !val1.getBoolean().equals(val2.getBoolean());
+    } else if (dataType == DataTypes.STRING) {
       result = !val1.getString().equals(val2.getString());
     } else if (dataType == DataTypes.SHORT) {
       result = val1.getShort().shortValue() != val2.getShort().shortValue();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
index 5f6359b..89a0374 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
@@ -75,7 +75,9 @@ public class NotInExpression extends BinaryConditionalExpression {
           val = exprResVal;
         }
         DataType dataType = val.getDataType();
-        if (dataType == DataTypes.STRING) {
+        if (dataType == DataTypes.BOOLEAN) {
+          val = new ExpressionResult(val.getDataType(), exprResVal.getBoolean());
+        } else if (dataType == DataTypes.STRING) {
           val = new ExpressionResult(val.getDataType(), exprResVal.getString());
         } else if (dataType == DataTypes.SHORT) {
           val = new ExpressionResult(val.getDataType(), exprResVal.getShort());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index cfe2cc8..ecac617 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -1296,7 +1296,10 @@ public final class FilterUtil {
   public static int compareFilterKeyBasedOnDataType(String dictionaryVal, String memberVal,
       DataType dataType) {
     try {
-      if (dataType == DataTypes.SHORT) {
+      if (dataType == DataTypes.BOOLEAN) {
+        return Boolean.compare((Boolean.parseBoolean(dictionaryVal)),
+                (Boolean.parseBoolean(memberVal)));
+      } else if (dataType == DataTypes.SHORT) {
         return Short.compare((Short.parseShort(dictionaryVal)), (Short.parseShort(memberVal)));
       } else if (dataType == DataTypes.INT) {
         return Integer.compare((Integer.parseInt(dictionaryVal)), (Integer.parseInt(memberVal)));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 4013578..7a80c88 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -124,7 +124,9 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
   }
 
   private DataType getMeasureDataType(MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo) {
-    if (msrColumnEvaluatorInfo.getType() == DataTypes.SHORT) {
+    if (msrColumnEvaluatorInfo.getType() == DataTypes.BOOLEAN) {
+      return DataTypes.BOOLEAN;
+    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.SHORT) {
       return DataTypes.SHORT;
     } else if (msrColumnEvaluatorInfo.getType() == DataTypes.INT) {
       return DataTypes.INT;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 04fce4a..843da71 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -148,7 +148,9 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
   }
 
   private DataType getMeasureDataType(MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo) {
-    if (msrColumnEvaluatorInfo.getType() == DataTypes.SHORT) {
+    if (msrColumnEvaluatorInfo.getType() == DataTypes.BOOLEAN) {
+      return DataTypes.BOOLEAN;
+    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.SHORT) {
       return DataTypes.SHORT;
     } else if (msrColumnEvaluatorInfo.getType() == DataTypes.INT) {
       return DataTypes.INT;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index c243368..dbf8d4c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -351,7 +351,9 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
     for (int i = 0; i < msrColEvalutorInfoList.size(); i++) {
       MeasureColumnResolvedFilterInfo msrColumnEvalutorInfo = msrColEvalutorInfoList.get(i);
       DataType dataType = msrColumnEvalutorInfo.getType();
-      if (dataType == DataTypes.SHORT) {
+      if (dataType == DataTypes.BOOLEAN) {
+        msrType = DataTypes.BOOLEAN;
+      } else if (dataType == DataTypes.SHORT) {
         msrType = DataTypes.SHORT;
       } else if (dataType == DataTypes.INT) {
         msrType = DataTypes.INT;
@@ -376,7 +378,9 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
       ColumnPage columnPage =
           blockChunkHolder.getMeasureRawDataChunk()[measureBlocksIndex[0]]
               .convertToColumnPage(pageIndex);
-      if (msrType == DataTypes.SHORT) {
+      if (msrType == DataTypes.BOOLEAN) {
+        msrValue = columnPage.getBoolean(index);
+      } else if (msrType == DataTypes.SHORT) {
         msrValue = (short) columnPage.getLong(index);
       } else if (msrType == DataTypes.INT) {
         msrValue = (int) columnPage.getLong(index);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
index 9b01e1f..cf6c88b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
@@ -89,6 +89,63 @@ public class MeasureDataVectorProcessor {
     }
   }
 
+  /**
+   * Fill Measure Vector For Boolean data type
+   */
+  public static class BooleanMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullBits();
+      if (nullBitSet.isEmpty()) {
+        for (int i = offset; i < len; i++) {
+          vector.putBoolean(vectorOffset, dataChunk.getBoolean(i));
+          vectorOffset++;
+        }
+      } else {
+        for (int i = offset; i < len; i++) {
+          if (nullBitSet.get(i)) {
+            vector.putNull(vectorOffset);
+          } else {
+            vector.putBoolean(vectorOffset, dataChunk.getBoolean(i));
+          }
+          vectorOffset++;
+        }
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping,
+        ColumnPage dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullBits();
+      if (nullBitSet.isEmpty()) {
+        for (int i = offset; i < len; i++) {
+          int currentRow = rowMapping[i];
+          vector.putBoolean(vectorOffset, dataChunk.getBoolean(currentRow));
+          vectorOffset++;
+        }
+      } else {
+        for (int i = offset; i < len; i++) {
+          int currentRow = rowMapping[i];
+          if (nullBitSet.get(currentRow)) {
+            vector.putNull(vectorOffset);
+          } else {
+            vector.putBoolean(vectorOffset, dataChunk.getBoolean(currentRow));
+          }
+          vectorOffset++;
+        }
+      }
+    }
+  }
+
   public static class ShortMeasureVectorFiller implements MeasureVectorFiller {
 
     @Override
@@ -307,7 +364,9 @@ public class MeasureDataVectorProcessor {
   public static class MeasureVectorFillerFactory {
 
     public static MeasureVectorFiller getMeasureVectorFiller(DataType dataType) {
-      if (dataType == DataTypes.SHORT) {
+      if (dataType == DataTypes.BOOLEAN) {
+        return new BooleanMeasureVectorFiller();
+      } else if (dataType == DataTypes.SHORT) {
         return new ShortMeasureVectorFiller();
       } else if (dataType == DataTypes.INT) {
         return new IntegralMeasureVectorFiller();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 1020e5b..15f85d6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -352,6 +352,8 @@ public abstract class AbstractDataFileFooterConverter {
   protected DataType thriftDataTyopeToWrapperDataType(
       org.apache.carbondata.format.DataType dataTypeThrift) {
     switch (dataTypeThrift) {
+      case BOOLEAN:
+        return DataTypes.BOOLEAN;
       case STRING:
         return DataTypes.STRING;
       case SHORT:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 2c9ef66..d232d61 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -344,7 +344,9 @@ public class CarbonMetadataUtil {
   private static int compareMeasureData(byte[] first, byte[] second, DataType dataType) {
     ByteBuffer firstBuffer = null;
     ByteBuffer secondBuffer = null;
-    if (dataType == DataTypes.DOUBLE) {
+    if (dataType == DataTypes.BOOLEAN) {
+      return first[0] - second[0];
+    } else if (dataType == DataTypes.DOUBLE) {
       firstBuffer = ByteBuffer.allocate(8);
       firstBuffer.put(first);
       secondBuffer = ByteBuffer.allocate(8);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 2924c09..ef3e71d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1938,11 +1938,10 @@ public final class CarbonUtil {
    */
   public static byte[] getValueAsBytes(DataType dataType, Object value) {
     ByteBuffer b;
-    if (dataType == DataTypes.BYTE) {
-      b = ByteBuffer.allocate(8);
-      b.putLong((byte) value);
-      b.flip();
-      return b.array();
+    if (dataType == DataTypes.BYTE || dataType == DataTypes.BOOLEAN) {
+      byte[] bytes = new byte[1];
+      bytes[0] = (byte) value;
+      return bytes;
     } else if (dataType == DataTypes.SHORT) {
       b = ByteBuffer.allocate(8);
       b.putLong((short) value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index b8cd59d..01e34a7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -102,7 +103,9 @@ public final class DataTypeUtil {
    */
   public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
       CarbonMeasure carbonMeasure) {
-    if (dataType == DataTypes.DECIMAL) {
+    if (dataType == DataTypes.BOOLEAN) {
+      return BooleanConvert.parseBoolean(msrValue);
+    } else if (dataType == DataTypes.DECIMAL) {
       BigDecimal bigDecimal =
           new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
       return normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision());
@@ -126,7 +129,9 @@ public final class DataTypeUtil {
       return null;
     }
     ByteBuffer bb = ByteBuffer.wrap(data);
-    if (dataType == DataTypes.SHORT) {
+    if (dataType == DataTypes.BOOLEAN) {
+      return BooleanConvert.byte2Boolean(bb.get());
+    } else if (dataType == DataTypes.SHORT) {
       return (short) bb.getLong();
     } else if (dataType == DataTypes.INT) {
       return (int) bb.getLong();
@@ -141,7 +146,9 @@ public final class DataTypeUtil {
 
   public static Object getMeasureObjectBasedOnDataType(ColumnPage measurePage, int index,
       DataType dataType, CarbonMeasure carbonMeasure) {
-    if (dataType == DataTypes.SHORT) {
+    if (dataType == DataTypes.BOOLEAN) {
+      return measurePage.getBoolean(index);
+    } else if (dataType == DataTypes.SHORT) {
       return (short) measurePage.getLong(index);
     } else if (dataType == DataTypes.INT) {
       return (int) measurePage.getLong(index);
@@ -240,6 +247,9 @@ public final class DataTypeUtil {
   public static DataType getDataType(String dataTypeStr) {
     DataType dataType = null;
     switch (dataTypeStr) {
+      case "BOOLEAN":
+        dataType = DataTypes.BOOLEAN;
+        break;
       case "DATE":
         dataType = DataTypes.DATE;
         break;
@@ -303,7 +313,12 @@ public final class DataTypeUtil {
       return null;
     }
     try {
-      if (actualDataType == DataTypes.INT) {
+      if (actualDataType == DataTypes.BOOLEAN) {
+        if (data.isEmpty()) {
+          return null;
+        }
+        return BooleanConvert.parseBoolean(data);
+      } else if (actualDataType == DataTypes.INT) {
         if (data.isEmpty()) {
           return null;
         }
@@ -366,7 +381,9 @@ public final class DataTypeUtil {
 
   public static byte[] getBytesBasedOnDataTypeForNoDictionaryColumn(String dimensionValue,
       DataType actualDataType, String dateFormat) {
-    if (actualDataType == DataTypes.STRING) {
+    if (actualDataType == DataTypes.BOOLEAN) {
+      return ByteUtil.toBytes(BooleanConvert.parseBoolean(dimensionValue));
+    } else if (actualDataType == DataTypes.STRING) {
       return ByteUtil.toBytes(dimensionValue);
     } else if (actualDataType == DataTypes.BOOLEAN) {
       return ByteUtil.toBytes(Boolean.parseBoolean(dimensionValue));
@@ -411,7 +428,9 @@ public final class DataTypeUtil {
       return null;
     }
     try {
-      if (actualDataType == DataTypes.STRING) {
+      if (actualDataType == DataTypes.BOOLEAN) {
+        return ByteUtil.toBoolean(dataInBytes);
+      } else if (actualDataType == DataTypes.STRING) {
         return getDataTypeConverter().convertFromByteToUTF8String(dataInBytes);
       } else if (actualDataType == DataTypes.BOOLEAN) {
         return ByteUtil.toBoolean(dataInBytes);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java b/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java
index a43ed0f..d1beb16 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java
@@ -26,7 +26,9 @@ import org.apache.carbondata.core.util.ByteUtil;
 public final class Comparator {
 
   public static SerializableComparator getComparator(DataType dataType) {
-    if (dataType == DataTypes.INT) {
+    if (dataType == DataTypes.BOOLEAN) {
+      return new BooleanSerializableComparator();
+    } else if (dataType == DataTypes.INT) {
       return new IntSerializableComparator();
     } else if (dataType == DataTypes.SHORT) {
       return new ShortSerializableComparator();
@@ -49,7 +51,9 @@ public final class Comparator {
    * @return
    */
   public static SerializableComparator getComparatorByDataTypeForMeasure(DataType dataType) {
-    if (dataType == DataTypes.INT) {
+    if (dataType == DataTypes.BOOLEAN) {
+      return new BooleanSerializableComparator();
+    } else if (dataType == DataTypes.INT) {
       return new IntSerializableComparator();
     } else if (dataType == DataTypes.SHORT) {
       return new ShortSerializableComparator();
@@ -71,6 +75,26 @@ class ByteArraySerializableComparator implements SerializableComparator {
   }
 }
 
+class BooleanSerializableComparator implements SerializableComparator {
+  @Override
+  public int compare(Object key1, Object key2) {
+    if (key1 == null && key2 == null) {
+      return 0;
+    } else if (key1 == null) {
+      return -1;
+    } else if (key2 == null) {
+      return 1;
+    }
+    if (Boolean.compare((boolean) key1, (boolean) key2) < 0) {
+      return -1;
+    } else if (Boolean.compare((boolean) key1, (boolean) key2) > 0) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+}
+
 class IntSerializableComparator implements SerializableComparator {
   @Override public int compare(Object key1, Object key2) {
     if (key1 == null && key2 == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index 8bce684..4a3ef32 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -414,7 +414,11 @@ public class ThriftWrapperSchemaConverterImplTest {
     assertEquals(expectedResult, actualResult);
   }
 
-  @Test public void testFromWrapperToExternalColumnSchema() {
+  @Test public void testFromWrapperToExternalColumnSchemaForBooleanDataType() {
+    org.apache.carbondata.format.ColumnSchema thriftColumnSchema =
+            new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.BOOLEAN,
+                    "columnName", "1", true, encoders, true);
+    thriftColumnSchema.setSchemaOrdinal(1);
     ColumnSchema wrapperColumnSchema = new ColumnSchema();
 
     new MockUp<ColumnSchema>() {
@@ -1304,7 +1308,7 @@ public class ThriftWrapperSchemaConverterImplTest {
     encoders.add(null);
     org.apache.carbondata.format.ColumnSchema thriftColumnSchema = null;
     thriftColumnSchema =
-        new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRING,
+        new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.BOOLEAN,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
 
@@ -1327,7 +1331,7 @@ public class ThriftWrapperSchemaConverterImplTest {
       }
 
       @Mock public DataType getDataType() {
-        return DataTypes.BOOLEAN;
+        return DataTypes.STRING;
       }
 
       @Mock public String getColumnName() {
@@ -1445,7 +1449,7 @@ public class ThriftWrapperSchemaConverterImplTest {
       }
 
       @Mock public DataType getDataType() {
-        return DataTypes.BOOLEAN;
+        return DataTypes.STRING;
       }
 
       @Mock public String getColumnName() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java
index 0abfb18..51bb0fe 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpressionUnitTest.java
@@ -182,7 +182,7 @@ public class EqualToExpressionUnitTest {
     }
   }
 
-  @Test(expected = FilterUnsupportedException.class) public void testForEqualToExpressionForDefaultCase()
+  @Test public void testForEqualToExpressionForDefaultCase()
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ColumnExpression right = new ColumnExpression("contact", DataTypes.BOOLEAN);
     right.setColIndex(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java
index 35d8886..0375a6d 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpressionUnitTest.java
@@ -189,7 +189,7 @@ public class GreaterThanEqualToExpressionUnitTest {
     }
   }
 
-  @Test(expected = FilterUnsupportedException.class) public void testForGreaterThanEqualToExpressionWithDefaultCase()
+  @Test public void testForGreaterThanEqualToExpressionWithDefaultCase()
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ColumnExpression right = new ColumnExpression("contact", DataTypes.BOOLEAN);
     right.setColIndex(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java
index 7d6a168..1940069 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpressionUnitTest.java
@@ -279,7 +279,7 @@ public class GreaterThanExpressionUnitTest {
     assertTrue(result.getBoolean());
   }
 
-  @Test(expected = FilterUnsupportedException.class) public void testForGreaterThanExpressionWithDefaultCase()
+  @Test public void testForGreaterThanExpressionWithDefaultCase()
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ColumnExpression right = new ColumnExpression("contact", DataTypes.BOOLEAN);
     right.setColIndex(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java
index 2fd6585..d31ec31 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/InExpressionUnitTest.java
@@ -224,7 +224,7 @@ public class InExpressionUnitTest {
     assertTrue(result.getBoolean());
   }
 
-  @Test(expected = FilterUnsupportedException.class) public void testForInExpressionWithDefaultCase()
+  @Test public void testForInExpressionWithDefaultCase()
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ColumnExpression left = new ColumnExpression("contact", DataTypes.BOOLEAN);
     left.setColIndex(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java
index 7000ceb..a2a5d2a 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpressionUnitTest.java
@@ -282,7 +282,7 @@ public class LessThanEqualToExpressionUnitTest {
     assertTrue(result.getBoolean());
   }
 
-  @Test(expected = FilterUnsupportedException.class) public void testForLessThanEqualToExpressionWithDefaultCase()
+  @Test public void testForLessThanEqualToExpressionWithDefaultCase()
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ColumnExpression right = new ColumnExpression("contact", DataTypes.BOOLEAN);
     right.setColIndex(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java
index 889ebbd..d385dc1 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpressionUnitTest.java
@@ -279,7 +279,7 @@ public class LessThanExpressionUnitTest {
     assertTrue(result.getBoolean());
   }
 
-  @Test(expected = FilterUnsupportedException.class) public void testForLessThanExpressionWithDefaultCase()
+  @Test public void testForLessThanExpressionWithDefaultCase()
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ColumnExpression right = new ColumnExpression("contact", DataTypes.BOOLEAN);
     right.setColIndex(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java
index 1cb008c..e7fa544 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpressionUnitTest.java
@@ -247,7 +247,7 @@ public class NotEqualsExpressionUnitTest {
     }
   }
 
-  @Test(expected = FilterUnsupportedException.class) public void testForNotEqualsExpressionWithDefaultCase()
+  @Test public void testForNotEqualsExpressionWithDefaultCase()
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ColumnExpression right = new ColumnExpression("contact", DataTypes.BOOLEAN);
     right.setColIndex(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java
index 230e305..5758625 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpressionUnitTest.java
@@ -223,7 +223,7 @@ public class NotInExpressionUnitTest {
     assertTrue(result.getBoolean());
   }
 
-  @Test(expected = FilterUnsupportedException.class) public void testDefaultCaseForNotInExpression()
+  @Test public void testDefaultCaseForNotInExpression()
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ColumnExpression left = new ColumnExpression("contact", DataTypes.BOOLEAN);
     left.setColIndex(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
index 5fadfb7..31cb4d0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
@@ -99,6 +99,9 @@ class DataCompactionBoundaryConditionsTest extends QueryTest with BeforeAndAfter
     sql("drop table if exists  boundarytest")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index 027c654..89a3ac3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -26,6 +26,7 @@ object DataTypeConverterUtil {
 
   def convertToCarbonType(dataType: String): DataType = {
     dataType.toLowerCase match {
+      case "boolean" => DataTypes.BOOLEAN
       case "string" => DataTypes.STRING
       case "int" => DataTypes.INT
       case "integer" => DataTypes.INT
@@ -48,6 +49,7 @@ object DataTypeConverterUtil {
 
   def convertToCarbonTypeForSpark2(dataType: String): DataType = {
     dataType.toLowerCase match {
+      case "booleantype" => DataTypes.BOOLEAN
       case "stringtype" => DataTypes.STRING
       case "inttype" => DataTypes.INT
       case "integertype" => DataTypes.INT
@@ -79,6 +81,7 @@ object DataTypeConverterUtil {
 
   def convertToString(dataType: DataType): String = {
     dataType match {
+      case DataTypes.BOOLEAN => "boolean"
       case DataTypes.STRING => "string"
       case DataTypes.SHORT => "smallint"
       case DataTypes.INT => "int"
@@ -106,6 +109,7 @@ object DataTypeConverterUtil {
     dataType match {
       case "string" => ThriftDataType.STRING
       case "int" => ThriftDataType.INT
+      case "boolean" => ThriftDataType.BOOLEAN
       case "short" => ThriftDataType.SHORT
       case "long" | "bigint" => ThriftDataType.LONG
       case "double" => ThriftDataType.DOUBLE

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 16301d6..4649082 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -163,6 +163,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val FLOAT = carbonKeyWord("FLOAT")
   protected val SHORT = carbonKeyWord("SHORT")
   protected val INT = carbonKeyWord("INT")
+  protected val BOOLEAN = carbonKeyWord("BOOLEAN")
   protected val BIGINT = carbonKeyWord("BIGINT")
   protected val ARRAY = carbonKeyWord("ARRAY")
   protected val STRUCT = carbonKeyWord("STRUCT")
@@ -954,7 +955,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected lazy val dimCol: Parser[Field] = anyFieldDef
 
   protected lazy val primitiveTypes =
-    STRING ^^^ "string" | INTEGER ^^^ "integer" |
+    STRING ^^^ "string" |BOOLEAN ^^^ "boolean" | INTEGER ^^^ "integer" |
     TIMESTAMP ^^^ "timestamp" | NUMERIC ^^^ "numeric" |
     BIGINT ^^^ "bigint" | (SHORT | SMALLINT) ^^^ "smallint" |
     INT ^^^ "int" | DOUBLE ^^^ "double" | FLOAT ^^^ "double" | decimalType |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 1183d94..1d3783e 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -206,7 +206,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
     for (int i = 0; i < queryMeasures.size(); i++) {
       QueryMeasure msr = queryMeasures.get(i);
       DataType dataType = msr.getMeasure().getDataType();
-      if (dataType == DataTypes.SHORT || dataType == DataTypes.INT || dataType == DataTypes.LONG) {
+      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT ||
+          dataType == DataTypes.INT || dataType == DataTypes.LONG) {
         fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
             CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
             null);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/resources/bool/supportBoolean.csv
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/resources/bool/supportBoolean.csv b/integration/spark2/src/test/resources/bool/supportBoolean.csv
new file mode 100644
index 0000000..25e57d7
--- /dev/null
+++ b/integration/spark2/src/test/resources/bool/supportBoolean.csv
@@ -0,0 +1,10 @@
+1,true,10,1100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world'
+5,false,17,1140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world'
+1,false,11,1100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world'
+1,true,10,1150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world'
+1,true,10,1100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world'
+3,true,14,1160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world'
+2,false,10,1100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world'
+1,false,10,1100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world'
+4,false,16,1130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5,'foo'#'bar'#'world'
+1,false,10,1100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,2.5,'foo'#'bar'#'world'

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv b/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv
new file mode 100644
index 0000000..c9c769f
--- /dev/null
+++ b/integration/spark2/src/test/resources/bool/supportBooleanBadRecords.csv
@@ -0,0 +1,10 @@
+1,true,10,1100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world',true
+5,falsee,17,1140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world',true
+1,f,11,1100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world',true
+1,truee,10,1150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world',true
+1,truea,10,1100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world',true
+3,true,14,1160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world',falsee
+2,false,10,1100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world',falsea
+1,false,10,1100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world',falsef
+4,false,16,1130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5,'foo'#'bar'#'world',falsea
+1,false,10,1100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,2.5,'foo'#'bar'#'world',false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/resources/bool/supportBooleanDifferentFormat.csv
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/resources/bool/supportBooleanDifferentFormat.csv b/integration/spark2/src/test/resources/bool/supportBooleanDifferentFormat.csv
new file mode 100644
index 0000000..032e92b
--- /dev/null
+++ b/integration/spark2/src/test/resources/bool/supportBooleanDifferentFormat.csv
@@ -0,0 +1,20 @@
+True
+TRUE
+true
+"true"
+false
+False
+FALSE
+"FALSE"
+
+null
+NULL
+TRUEA
+true"
+'true'
+truee
+'false'
+falsee
+FFALSE
+f
+t
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/resources/bool/supportBooleanOnlyBoolean.csv
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/resources/bool/supportBooleanOnlyBoolean.csv b/integration/spark2/src/test/resources/bool/supportBooleanOnlyBoolean.csv
new file mode 100644
index 0000000..3ce21d8
--- /dev/null
+++ b/integration/spark2/src/test/resources/bool/supportBooleanOnlyBoolean.csv
@@ -0,0 +1,11 @@
+True
+TRUE
+true
+"true"
+false
+False
+FALSE
+"FALSE"
+
+null
+NULL
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv b/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv
new file mode 100644
index 0000000..696befb
--- /dev/null
+++ b/integration/spark2/src/test/resources/bool/supportBooleanTwoBooleanColumns.csv
@@ -0,0 +1,10 @@
+1,true,10,1100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world',true
+5,false,17,1140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world',true
+1,false,11,1100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world',true
+1,true,10,1150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world',true
+1,true,10,1100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world',true
+3,true,14,1160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world',false
+2,false,10,1100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world',false
+1,false,10,1100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world',false
+4,false,16,1130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5,'foo'#'bar'#'world',false
+1,false,10,1100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,2.5,'foo'#'bar'#'world',false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6abdd97a/integration/spark2/src/test/resources/bool/supportBooleanWithFileHeader.csv
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/resources/bool/supportBooleanWithFileHeader.csv b/integration/spark2/src/test/resources/bool/supportBooleanWithFileHeader.csv
new file mode 100644
index 0000000..237eade
--- /dev/null
+++ b/integration/spark2/src/test/resources/bool/supportBooleanWithFileHeader.csv
@@ -0,0 +1,11 @@
+shortField,booleanField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData
+1,true,10,1100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world'
+5,false,17,1140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world'
+1,false,11,1100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world'
+1,true,10,1150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world'
+1,true,10,1100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world'
+3,true,14,1160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world'
+2,false,10,1100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world'
+1,false,10,1100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world'
+4,false,16,1130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5,'foo'#'bar'#'world'
+1,false,10,1100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,2.5,'foo'#'bar'#'world'