You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/15 09:07:48 UTC

[1/3] incubator-carbondata git commit: support datatype: date and char

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master bbb5919a6 -> ecf29472e


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
index 4f8bf1a..f166025 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
@@ -50,16 +50,16 @@ class TimestampDataTypeDirectDictionaryTest extends QueryTest with BeforeAndAfte
           TimeStampGranularityConstants.TIME_GRAN_SEC.toString
         )
       CarbonProperties.getInstance().addProperty("carbon.direct.dictionary", "true")
+      sql("drop table if exists directDictionaryTable")
+      sql("drop table if exists directDictionaryTable_hive")
       sql(
-        "CREATE TABLE if not exists directDictionaryTable (empno int,doj Timestamp, " +
-          "salary int) " +
+        "CREATE TABLE if not exists directDictionaryTable (empno int,doj Timestamp, salary int) " +
           "STORED BY 'org.apache.carbondata.format'"
       )
 
       sql(
-        "CREATE TABLE if not exists directDictionaryTable_hive (empno int,doj Timestamp, " +
-        "salary int) " +
-        "row format delimited fields terminated by ','"
+        "CREATE TABLE if not exists directDictionaryTable_hive (empno int,doj Timestamp, salary int) " +
+          "row format delimited fields terminated by ','"
       )
 
       CarbonProperties.getInstance()
@@ -68,8 +68,8 @@ class TimestampDataTypeDirectDictionaryTest extends QueryTest with BeforeAndAfte
         .getCanonicalPath
       val csvFilePath = currentDirectory + "/src/test/resources/datasample.csv"
       sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE directDictionaryTable OPTIONS" +
-        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')");
-      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE directDictionaryTable_hive");
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE directDictionaryTable_hive")
     } catch {
       case x: Throwable => CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
@@ -97,8 +97,8 @@ class TimestampDataTypeDirectDictionaryTest extends QueryTest with BeforeAndAfte
 
   test("test direct dictionary for not equals condition") {
     checkAnswer(
-      sql("select doj from directDictionaryTable where doj != '2016-04-14 15:00:09.0'"),
-      Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09.0"))
+      sql("select doj from directDictionaryTable where doj != '2016-04-14 15:00:09'"),
+      Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09"))
       )
     )
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryWithNoDictTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryWithNoDictTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryWithNoDictTestCase.scala
index 22678af..991b1bf 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryWithNoDictTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryWithNoDictTestCase.scala
@@ -88,8 +88,8 @@ class TimestampDataTypeDirectDictionaryWithNoDictTestCase extends QueryTest with
     )
 
   }
-  
-    test("select doj from directDictionaryTable with greater than filter") {
+
+  test("select doj from directDictionaryTable with greater than filter") {
     checkAnswer(
       sql("select doj from directDictionaryTable where doj>'2016-03-14 15:00:09'"),
       Seq(Row(Timestamp.valueOf("2016-04-14 15:00:09")))
@@ -99,7 +99,7 @@ class TimestampDataTypeDirectDictionaryWithNoDictTestCase extends QueryTest with
 
 
   override def afterAll {
-     sql("drop table directDictionaryTable")
+    sql("drop table directDictionaryTable")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
     CarbonProperties.getInstance().addProperty("carbon.direct.dictionary", "false")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeNullDataTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeNullDataTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeNullDataTest.scala
index f1a9ccb..9b7359f 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeNullDataTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampDataTypeNullDataTest.scala
@@ -50,7 +50,7 @@ class TimestampDataTypeNullDataTest extends QueryTest with BeforeAndAfterAll {
         )
       sql(
         """CREATE TABLE IF NOT EXISTS timestampTyeNullData
-                     (ID Int, date Timestamp, country String,
+                     (ID Int, dateField Timestamp, country String,
                      name String, phonetype String, serialname String, salary Int)
                     STORED BY 'org.apache.carbondata.format'"""
       )
@@ -68,18 +68,18 @@ class TimestampDataTypeNullDataTest extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("SELECT max(date) FROM timestampTyeNullData where date is not null") {
+  test("SELECT max(dateField) FROM timestampTyeNullData where dateField is not null") {
     checkAnswer(
-      sql("SELECT max(date) FROM timestampTyeNullData where date is not null"),
+      sql("SELECT max(dateField) FROM timestampTyeNullData where dateField is not null"),
       Seq(Row(Timestamp.valueOf("2015-07-23 00:00:00.0"))
       )
     )
   }
-    test("SELECT * FROM timestampTyeNullData where date is null") {
-      checkAnswer(
-        sql("SELECT date FROM timestampTyeNullData where date is null"),
-        Seq(Row(null)
-        ))
+  test("SELECT * FROM timestampTyeNullData where dateField is null") {
+    checkAnswer(
+      sql("SELECT dateField FROM timestampTyeNullData where dateField is null"),
+      Seq(Row(null)
+      ))
   }
 
   override def afterAll {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 65d583c..1bfcdea 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -39,11 +39,13 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
 
   @Override public Row readRow(Object[] data) {
     for (int i = 0; i < dictionaries.length; i++) {
-      if (dictionaries[i] == null) {
+      if (dictionaries[i] == null && data[i] != null) {
         if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
           //convert the long to timestamp in case of direct dictionary column
           if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
-            data[i] = new Timestamp((long) data[i] / 1000);
+            data[i] = new Timestamp((long) data[i]);
+          } else if(DataType.DATE == carbonColumns[i].getDataType()) {
+            data[i] = new java.sql.Date((int) data[i]);
           }
         } else if(dataTypes[i].equals(DataType.INT)) {
           data[i] = ((Long)(data[i])).intValue();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 2468962..5db5d14 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -134,6 +134,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
       case DoubleType => CarbonType.DOUBLE.getName
       case BooleanType => CarbonType.DOUBLE.getName
       case TimestampType => CarbonType.TIMESTAMP.getName
+      case DateType => CarbonType.DATE.getName
       case other => sys.error(s"unsupported type: $other")
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 940c6d7..3b63021 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -104,6 +104,7 @@ case class CarbonDictionaryDecoder(
           DecimalType(precision, scale)
         }
       case DataType.TIMESTAMP => TimestampType
+      case DataType.DATE => DateType
       case DataType.STRUCT =>
         CarbonMetastoreTypes
           .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
@@ -261,6 +262,7 @@ class CarbonDecoderRDD(
           DecimalType(precision, scale)
         }
       case DataType.TIMESTAMP => TimestampType
+      case DataType.DATE => DateType
       case DataType.STRUCT =>
         CarbonMetastoreTypes
             .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index b14a95c..2ba8a03 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -102,12 +102,10 @@ class CarbonSource extends CreatableRelationProvider
 
   private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
                                      dataSchema: StructType): String = {
-    val (dbName, tableName) = parameters.get("path") match {
-      case Some(path) =>
-        val p = path.split(File.separator)
-        ("default", p(p.length - 1))
-      case _ => throw new Exception("do not have dbname and tablename for carbon table")
-    }
+
+    val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+    val tableName: String = parameters.getOrElse("tableName", "default_table")
+
     try {
       CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession)
       CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
@@ -134,7 +132,8 @@ class CarbonSource extends CreatableRelationProvider
         val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
         CreateTable(cm).run(sparkSession)
         CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
-      case _ => throw new Exception("do not have dbname and tablename for carbon table")
+      case ex: Exception =>
+        throw new Exception("do not have dbname and tablename for carbon table", ex)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index e375710..1faaafa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -39,7 +39,7 @@ object TableCreator {
 
   // detects whether double or decimal column is part of dictionary_exclude
   def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
-    val dataTypes = Array("string", "timestamp")
+    val dataTypes = Array("string", "timestamp", "date", "stringtype", "timestamptype", "datetype")
     dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
   }
 
@@ -47,7 +47,7 @@ object TableCreator {
   def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
     val dimensionType =
       Array("string", "stringtype", "array", "arraytype", "struct",
-        "structtype", "timestamp", "timestamptype")
+        "structtype", "timestamp", "timestamptype", "date", "datetype")
     dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
   }
 
@@ -111,7 +111,8 @@ object TableCreator {
     fields.foreach(field => {
 
       if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
-        if (DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.TIMESTAMP) {
+        if (DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.TIMESTAMP &&
+            DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.DATE) {
           noDictionaryDims :+= field.column
         }
         dimFields += field

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 24601f4..9467804 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -609,6 +609,7 @@ object CarbonMetastoreTypes extends RegexParsers {
     fixedDecimalType |
     "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
     "varchar\\((\\d+)\\)".r ^^^ StringType |
+    "date" ^^^ DateType |
     "timestamp" ^^^ TimestampType
 
   protected lazy val fixedDecimalType: Parser[DataType] =
@@ -668,6 +669,7 @@ object CarbonMetastoreTypes extends RegexParsers {
       case BooleanType => "boolean"
       case DecimalType() => "decimal"
       case TimestampType => "timestamp"
+      case DateType => "date"
     }
   }
 }


[3/3] incubator-carbondata git commit: [CARBONDATA-535]Support data type: date and char This closes #411

Posted by ja...@apache.org.
[CARBONDATA-535]Support data type: date and char  This closes #411


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ecf29472
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ecf29472
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ecf29472

Branch: refs/heads/master
Commit: ecf29472ee73745a1c89a37f7d3dd5603c4ac9bf
Parents: bbb5919 d73f4bf
Author: jackylk <ja...@huawei.com>
Authored: Thu Dec 15 17:04:53 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Thu Dec 15 17:04:53 2016 +0800

----------------------------------------------------------------------
 .../cache/dictionary/ColumnDictionaryInfo.java  |   1 +
 .../ThriftWrapperSchemaConverterImpl.java       |   4 +
 .../DirectDictionaryGenerator.java              |   2 +-
 .../DirectDictionaryKeyGeneratorFactory.java    |   4 +
 .../DateDirectDictionaryGenerator.java          | 162 +++++++++++++++++++
 .../util/AbstractDataFileFooterConverter.java   |   2 +
 .../carbondata/core/util/DataTypeUtil.java      |   5 +
 .../sortindex/CarbonDictionarySortModel.java    |   1 +
 .../scan/complextypes/PrimitiveQueryType.java   |   2 +
 .../scan/expression/ExpressionResult.java       |  99 ++++++++++--
 .../scan/expression/LiteralExpression.java      |   4 +-
 .../conditional/EqualToExpression.java          |   1 +
 .../GreaterThanEqualToExpression.java           |   1 +
 .../conditional/GreaterThanExpression.java      |   1 +
 .../expression/conditional/InExpression.java    |   1 +
 .../conditional/LessThanEqualToExpression.java  |   1 +
 .../conditional/LessThanExpression.java         |   1 +
 .../conditional/NotEqualsExpression.java        |   1 +
 .../expression/conditional/NotInExpression.java |   1 +
 .../carbondata/scan/filter/FilterUtil.java      |  13 +-
 .../resolver/ConditionalFilterResolverImpl.java |   3 +-
 .../resolver/RestructureFilterResolverImpl.java |   6 +-
 .../visitor/CustomTypeDictionaryVisitor.java    |  15 +-
 .../visitor/ResolvedFilterInfoVisitorIntf.java  |   2 +-
 .../carbondata/core/util/DataTypeUtilTest.java  |   1 +
 .../DictionaryBasedResultCollectorTest.java     |   9 +-
 .../scan/expression/ExpressionResultTest.java   |   4 +-
 .../carbondata/examples/CarbonExample.scala     |   2 +-
 examples/spark2/src/main/resources/data.csv     |  20 +--
 .../carbondata/examples/CarbonExample.scala     |  23 ++-
 format/src/main/thrift/schema.thrift            |   1 +
 .../carbondata/spark/util/CarbonScalaUtil.scala |   1 +
 .../spark/util/DataTypeConverterUtil.scala      |   5 +-
 .../execution/command/carbonTableSchema.scala   |   2 +-
 .../readsupport/SparkRowReadSupportImpl.java    |  16 +-
 .../spark/CarbonDataFrameWriter.scala           |   1 +
 .../spark/sql/CarbonDictionaryDecoder.scala     |   1 +
 .../org/apache/spark/sql/CarbonSqlParser.scala  |  21 ++-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   5 +-
 .../src/test/resources/datasamplefordate.csv    |   4 +
 .../spark/src/test/resources/datasamplenull.csv |   2 +-
 .../DateDataTypeDirectDictionaryTest.scala      | 154 ++++++++++++++++++
 ...TypeDirectDictionaryWithNoDictTestCase.scala | 101 ++++++++++++
 .../DateDataTypeNullDataTest.scala              |  88 ++++++++++
 ...estampDataTypeDirectDictionaryTestCase.scala |  18 +--
 ...TypeDirectDictionaryWithNoDictTestCase.scala |   6 +-
 .../TimestampDataTypeNullDataTest.scala         |  16 +-
 .../readsupport/SparkRowReadSupportImpl.java    |   6 +-
 .../spark/sql/CarbonDataFrameWriter.scala       |   1 +
 .../spark/sql/CarbonDictionaryDecoder.scala     |   2 +
 .../org/apache/spark/sql/CarbonSource.scala     |  13 +-
 .../org/apache/spark/sql/TableCreator.scala     |   7 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   2 +
 53 files changed, 768 insertions(+), 97 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-carbondata git commit: support datatype: date and char

Posted by ja...@apache.org.
support datatype: date and char


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d73f4bfe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d73f4bfe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d73f4bfe

Branch: refs/heads/master
Commit: d73f4bfe82e8be8970f41ce04707859e5b9bcce9
Parents: bbb5919
Author: QiangCai <qi...@qq.com>
Authored: Wed Dec 7 14:29:29 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Thu Dec 15 17:04:16 2016 +0800

----------------------------------------------------------------------
 .../cache/dictionary/ColumnDictionaryInfo.java  |   1 +
 .../ThriftWrapperSchemaConverterImpl.java       |   4 +
 .../DirectDictionaryGenerator.java              |   2 +-
 .../DirectDictionaryKeyGeneratorFactory.java    |   4 +
 .../DateDirectDictionaryGenerator.java          | 162 +++++++++++++++++++
 .../util/AbstractDataFileFooterConverter.java   |   2 +
 .../carbondata/core/util/DataTypeUtil.java      |   5 +
 .../sortindex/CarbonDictionarySortModel.java    |   1 +
 .../scan/complextypes/PrimitiveQueryType.java   |   2 +
 .../scan/expression/ExpressionResult.java       |  99 ++++++++++--
 .../scan/expression/LiteralExpression.java      |   4 +-
 .../conditional/EqualToExpression.java          |   1 +
 .../GreaterThanEqualToExpression.java           |   1 +
 .../conditional/GreaterThanExpression.java      |   1 +
 .../expression/conditional/InExpression.java    |   1 +
 .../conditional/LessThanEqualToExpression.java  |   1 +
 .../conditional/LessThanExpression.java         |   1 +
 .../conditional/NotEqualsExpression.java        |   1 +
 .../expression/conditional/NotInExpression.java |   1 +
 .../carbondata/scan/filter/FilterUtil.java      |  13 +-
 .../resolver/ConditionalFilterResolverImpl.java |   3 +-
 .../resolver/RestructureFilterResolverImpl.java |   6 +-
 .../visitor/CustomTypeDictionaryVisitor.java    |  15 +-
 .../visitor/ResolvedFilterInfoVisitorIntf.java  |   2 +-
 .../carbondata/core/util/DataTypeUtilTest.java  |   1 +
 .../DictionaryBasedResultCollectorTest.java     |   9 +-
 .../scan/expression/ExpressionResultTest.java   |   4 +-
 .../carbondata/examples/CarbonExample.scala     |   2 +-
 examples/spark2/src/main/resources/data.csv     |  20 +--
 .../carbondata/examples/CarbonExample.scala     |  23 ++-
 format/src/main/thrift/schema.thrift            |   1 +
 .../carbondata/spark/util/CarbonScalaUtil.scala |   1 +
 .../spark/util/DataTypeConverterUtil.scala      |   5 +-
 .../execution/command/carbonTableSchema.scala   |   2 +-
 .../readsupport/SparkRowReadSupportImpl.java    |  16 +-
 .../spark/CarbonDataFrameWriter.scala           |   1 +
 .../spark/sql/CarbonDictionaryDecoder.scala     |   1 +
 .../org/apache/spark/sql/CarbonSqlParser.scala  |  21 ++-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   5 +-
 .../src/test/resources/datasamplefordate.csv    |   4 +
 .../spark/src/test/resources/datasamplenull.csv |   2 +-
 .../DateDataTypeDirectDictionaryTest.scala      | 154 ++++++++++++++++++
 ...TypeDirectDictionaryWithNoDictTestCase.scala | 101 ++++++++++++
 .../DateDataTypeNullDataTest.scala              |  88 ++++++++++
 ...estampDataTypeDirectDictionaryTestCase.scala |  18 +--
 ...TypeDirectDictionaryWithNoDictTestCase.scala |   6 +-
 .../TimestampDataTypeNullDataTest.scala         |  16 +-
 .../readsupport/SparkRowReadSupportImpl.java    |   6 +-
 .../spark/sql/CarbonDataFrameWriter.scala       |   1 +
 .../spark/sql/CarbonDictionaryDecoder.scala     |   2 +
 .../org/apache/spark/sql/CarbonSource.scala     |  13 +-
 .../org/apache/spark/sql/TableCreator.scala     |   7 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   2 +
 53 files changed, 768 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
index c19f244..1d2eb8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
@@ -285,6 +285,7 @@ public class ColumnDictionaryInfo extends AbstractColumnDictionaryInfo {
         case BOOLEAN:
           return Boolean
               .compare((Boolean.parseBoolean(dictionaryVal)), (Boolean.parseBoolean(memberVal)));
+        case DATE:
         case TIMESTAMP:
           SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
               .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index c1020e3..7d5386e 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -132,6 +132,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         return org.apache.carbondata.format.DataType.DOUBLE;
       case DECIMAL:
         return org.apache.carbondata.format.DataType.DECIMAL;
+      case DATE:
+        return org.apache.carbondata.format.DataType.DATE;
       case TIMESTAMP:
         return org.apache.carbondata.format.DataType.TIMESTAMP;
       case ARRAY:
@@ -306,6 +308,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         return DataType.DECIMAL;
       case TIMESTAMP:
         return DataType.TIMESTAMP;
+      case DATE:
+        return DataType.DATE;
       case ARRAY:
         return DataType.ARRAY;
       case STRUCT:

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
index 2eab97d..9b8a230 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
@@ -48,7 +48,7 @@ public interface DirectDictionaryGenerator {
    * filter member first it should be converted in date form as per above format and needs to
    * retrieve time stamp.
    *
-   * @param member The member string value
+   * @param memberStr The member string value
    * @return returns dictionary/ surrogate value
    */
   int generateDirectSurrogateKey(String memberStr, String format);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
index f7148b3..790fa2e 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.keygenerator.directdictionary;
 
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampDirectDictionaryGenerator;
 import org.apache.carbondata.core.util.CarbonProperties;
 
@@ -45,6 +46,9 @@ public final class DirectDictionaryKeyGeneratorFactory {
       String dateFormat) {
     DirectDictionaryGenerator directDictionaryGenerator = null;
     switch (dataType) {
+      case DATE:
+        directDictionaryGenerator = new DateDirectDictionaryGenerator(dateFormat);
+        break;
       case TIMESTAMP:
         directDictionaryGenerator = new TimeStampDirectDictionaryGenerator(dateFormat);
         break;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
new file mode 100644
index 0000000..f3a1225
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.keygenerator.directdictionary.timestamp;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * The class provides the method to generate dictionary key and getting the actual value from
+ * the dictionaryKey for direct dictionary column for TIMESTAMP type.
+ */
+public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator {
+
+  static final int cutOffDate = Integer.MAX_VALUE >> 1;
+  static final long SECONDS_PER_DAY = 60 * 60 * 24L;
+  static final long  MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L;
+
+
+  private ThreadLocal<SimpleDateFormat> simpleDateFormatLocal = new ThreadLocal<>();
+
+  private String dateFormat;
+
+  /**
+   * Logger instance
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DateDirectDictionaryGenerator.class.getName());
+
+
+  public DateDirectDictionaryGenerator(String dateFormat) {
+    this.dateFormat = dateFormat;
+    initialize();
+  }
+
+  public DateDirectDictionaryGenerator() {
+    this(CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+                    CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+  }
+
+  /**
+   * The method take member String as input and converts
+   * and returns the dictionary key
+   *
+   * @param memberStr date format string
+   * @return dictionary value
+   */
+  @Override public int generateDirectSurrogateKey(String memberStr) {
+    if (null == memberStr || memberStr.trim().isEmpty() || memberStr
+        .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+      return 1;
+    }
+    return getDirectSurrogateForMember(memberStr);
+  }
+
+  /**
+   * The method take member String as input and converts
+   * and returns the dictionary key
+   *
+   * @param memberStr date format string
+   * @return dictionary value
+   */
+  public int generateDirectSurrogateKey(String memberStr, String format) {
+    if (null == format) {
+      return generateDirectSurrogateKeyForNonTimestampType(memberStr);
+    } else {
+      if (null == memberStr || memberStr.trim().isEmpty() || memberStr
+          .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+        return 1;
+      }
+      return getDirectSurrogateForMember(memberStr);
+    }
+  }
+
+  private int getDirectSurrogateForMember(String memberStr) {
+    Date dateToStr = null;
+    try {
+      SimpleDateFormat simpleDateFormat = simpleDateFormatLocal.get();
+      if (null == simpleDateFormat) {
+        initialize();
+        simpleDateFormat = simpleDateFormatLocal.get();
+      }
+      dateToStr = simpleDateFormat.parse(memberStr);
+    } catch (ParseException e) {
+      LOGGER.debug(
+          "Cannot convert " + memberStr + " to Time/Long type value. Value considered as null." + e
+              .getMessage());
+      dateToStr = null;
+    }
+    //adding +2 to reserve the first cuttOffDiff value for null or empty date
+    if (null == dateToStr) {
+      return 1;
+    } else {
+      return generateKey(dateToStr.getTime());
+    }
+  }
+
+  /**
+   * The method take dictionary key as input and returns the
+   *
+   * @param key
+   * @return member value/actual value Date
+   */
+  @Override public Object getValueFromSurrogate(int key) {
+    if (key == 1) {
+      return null;
+    }
+    return key - cutOffDate;
+  }
+
+  private int generateDirectSurrogateKeyForNonTimestampType(String memberStr) {
+    long timeValue = -1;
+    try {
+      timeValue = Long.valueOf(memberStr) / 1000;
+    } catch (NumberFormatException e) {
+      LOGGER.debug(
+          "Cannot convert " + memberStr + " Long type value. Value considered as null." + e
+              .getMessage());
+    }
+    if (timeValue == -1) {
+      return 1;
+    } else {
+      return generateKey(timeValue);
+    }
+  }
+
+  private int generateKey(long timeValue) {
+    int key = (int)Math.floor((double)timeValue / MILLIS_PER_DAY) + cutOffDate;
+    return key;
+  }
+
+  public void initialize(){
+    if (simpleDateFormatLocal.get() == null) {
+      simpleDateFormatLocal.set(new SimpleDateFormat(dateFormat));
+      simpleDateFormatLocal.get().setLenient(false);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 7f50c34..83763e6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -322,6 +322,8 @@ public abstract class AbstractDataFileFooterConverter {
         return DataType.DOUBLE;
       case DECIMAL:
         return DataType.DECIMAL;
+      case DATE:
+        return DataType.DATE;
       case TIMESTAMP:
         return DataType.TIMESTAMP;
       case ARRAY:

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 5808d61..775238d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -68,6 +68,7 @@ public final class DataTypeUtil {
     dataTypeDisplayNames.put(DataType.ARRAY.toString(), DataType.ARRAY.getName());
     dataTypeDisplayNames.put(DataType.STRUCT.toString(), DataType.STRUCT.getName());
     dataTypeDisplayNames.put(DataType.TIMESTAMP.toString(), DataType.TIMESTAMP.getName());
+    dataTypeDisplayNames.put(DataType.DATE.toString(), DataType.DATE.getName());
     dataTypeDisplayNames.put(DataType.SHORT.toString(), DataType.SHORT.getName());
     dataTypeDisplayNames.put(DataType.STRING.toString(), DataType.STRING.getName());
   }
@@ -182,6 +183,9 @@ public final class DataTypeUtil {
   public static DataType getDataType(String dataTypeStr) {
     DataType dataType = null;
     switch (dataTypeStr) {
+      case "DATE":
+        dataType = DataType.DATE;
+        break;
       case "TIMESTAMP":
         dataType = DataType.TIMESTAMP;
         break;
@@ -250,6 +254,7 @@ public final class DataTypeUtil {
             return null;
           }
           return Long.parseLong(data);
+        case DATE:
         case TIMESTAMP:
           if (data.isEmpty()) {
             return null;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java
index 0d3040a..c90d41c 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java
@@ -102,6 +102,7 @@ public class CarbonDictionarySortModel implements Comparable<CarbonDictionarySor
           return -1;
         }
         return val1.compareTo(val2);
+      case DATE:
       case TIMESTAMP:
         SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
index 11f4651..d5a5acc 100644
--- a/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
@@ -139,6 +139,8 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
         return BooleanType$.MODULE$;
       case TIMESTAMP:
         return TimestampType$.MODULE$;
+      case DATE:
+        return DateType$.MODULE$;
       default:
         return IntegerType$.MODULE$;
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/expression/ExpressionResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/expression/ExpressionResult.java b/core/src/main/java/org/apache/carbondata/scan/expression/ExpressionResult.java
index 0da0cb2..407a1d7 100644
--- a/core/src/main/java/org/apache/carbondata/scan/expression/ExpressionResult.java
+++ b/core/src/main/java/org/apache/carbondata/scan/expression/ExpressionResult.java
@@ -40,11 +40,18 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
 
   private List<ExpressionResult> expressionResults;
 
+  private boolean isLiteral = false;
+
   public ExpressionResult(DataType dataType, Object value) {
     this.dataType = dataType;
     this.value = value;
   }
 
+  public ExpressionResult(DataType dataType, Object value, boolean isLiteral) {
+    this(dataType, value);
+    this.isLiteral = isLiteral;
+  }
+
   public ExpressionResult(List<ExpressionResult> expressionResults) {
     this.expressionResults = expressionResults;
   }
@@ -79,10 +86,20 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
             return ((Double) value).intValue();
           }
           return (Integer) value;
+        case DATE:
+          if (value instanceof java.sql.Date) {
+            return (int) (((java.sql.Date) value).getTime());
+          } else {
+            return (Integer) value;
+          }
         case TIMESTAMP:
           if (value instanceof Timestamp) {
-            return (int) (((Timestamp) value).getTime() % 1000);
+            return (int) (((Timestamp) value).getTime());
           } else {
+            if (isLiteral) {
+              Long l = (Long) value /1000;
+              return l.intValue();
+            }
             return (Integer) value;
           }
         default:
@@ -119,11 +136,22 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
           }
           return (Short) value;
 
+        case DATE:
+
+          if (value instanceof java.sql.Date) {
+            return (short) (((java.sql.Date) value).getTime());
+          } else {
+            return (Short) value;
+          }
         case TIMESTAMP:
 
           if (value instanceof Timestamp) {
-            return (short) (((Timestamp) value).getTime() % 1000);
+            return (short) (((Timestamp) value).getTime());
           } else {
+            if (isLiteral) {
+              Long l = ((long)value/1000);
+              return l.shortValue();
+            }
             return (Short) value;
           }
 
@@ -144,16 +172,24 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
     }
     try {
       switch (this.getDataType()) {
+        case DATE:
         case TIMESTAMP:
           SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
               .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
                   CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
           if (value instanceof Timestamp) {
             return parser.format((Timestamp) value);
-          } else {
-            return parser.format(new Timestamp((long) value / 1000));
+          } else if (value instanceof java.sql.Date) {
+            return parser.format((java.sql.Date) value);
+          } else if (value instanceof Long) {
+            if (isLiteral) {
+              return parser.format(new Timestamp((long) value/1000));
+            }
+            return parser.format(new Timestamp((long) value));
+          } else if (value instanceof Integer) {
+            return parser.format(new java.sql.Date((int)value));
           }
-
+          return value.toString();
         default:
           return value.toString();
       }
@@ -183,10 +219,20 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
           return ((Long) value).doubleValue();
         case DOUBLE:
           return (Double) value;
+        case DATE:
+          if (value instanceof java.sql.Date) {
+            return (double) ((java.sql.Date) value).getTime();
+          } else {
+            return (Double) (value);
+          }
         case TIMESTAMP:
           if (value instanceof Timestamp) {
-            return (double) ((Timestamp) value).getTime() * 1000;
+            return (double) ((Timestamp) value).getTime();
           } else {
+            if (isLiteral) {
+              Long l = (Long) value/1000;
+              return l.doubleValue();
+            }
             return (Double) (value);
           }
         default:
@@ -219,9 +265,15 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
           return (Long) value;
         case DOUBLE:
           return (Long) value;
+        case DATE:
+          if (value instanceof java.sql.Date) {
+            return ((java.sql.Date) value).getTime();
+          } else {
+            return (Long) value;
+          }
         case TIMESTAMP:
           if (value instanceof Timestamp) {
-            return 1000 * ((Timestamp) value).getTime();
+            return ((Timestamp) value).getTime();
           } else {
             return (Long) value;
           }
@@ -259,10 +311,19 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
           return new BigDecimal(value.toString());
         case DECIMAL:
           return new BigDecimal(value.toString());
+        case DATE:
+          if (value instanceof java.sql.Date) {
+            return new BigDecimal(((java.sql.Date) value).getTime());
+          } else {
+            return new BigDecimal((long) value);
+          }
         case TIMESTAMP:
           if (value instanceof Timestamp) {
-            return new BigDecimal(1000 * ((Timestamp) value).getTime());
+            return new BigDecimal(((Timestamp) value).getTime());
           } else {
+            if (isLiteral) {
+              return new BigDecimal((long)value/1000);
+            }
             return new BigDecimal((long) value);
           }
         default:
@@ -292,7 +353,7 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
           Date dateToStr;
           try {
             dateToStr = parser.parse(value.toString());
-            return dateToStr.getTime() * 1000;
+            return dateToStr.getTime();
           } catch (ParseException e) {
             throw new FilterIllegalMemberException(
                 "Cannot convert" + this.getDataType().name() + " to Time/Long type value");
@@ -304,10 +365,19 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
           return (Long) value;
         case DOUBLE:
           return (Long) value;
+        case DATE:
+          if (value instanceof java.sql.Date) {
+            return ((Date) value).getTime();
+          } else {
+            return (Long) value;
+          }
         case TIMESTAMP:
           if (value instanceof Timestamp) {
-            return ((Timestamp) value).getTime() * 1000;
+            return ((Timestamp) value).getTime();
           } else {
+            if (isLiteral) {
+              return (Long) value/1000;
+            }
             return (Long) value;
           }
         default:
@@ -350,7 +420,7 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
   public List<ExpressionResult> getList() {
     if (null == expressionResults) {
       List<ExpressionResult> a = new ArrayList<ExpressionResult>(20);
-      a.add(new ExpressionResult(dataType, value));
+      a.add(new ExpressionResult(dataType, value, isLiteral));
       return a;
     } else {
       return expressionResults;
@@ -361,11 +431,12 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
     List<String> evaluateResultListFinal = new ArrayList<String>(20);
     List<ExpressionResult> evaluateResultList = getList();
     for (ExpressionResult result : evaluateResultList) {
-      if (result.getString() == null) {
+      String resultString = result.getString();
+      if (resultString == null) {
         evaluateResultListFinal.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL);
         continue;
       }
-      evaluateResultListFinal.add(result.getString());
+      evaluateResultListFinal.add(resultString);
     }
     return evaluateResultListFinal;
   }
@@ -411,6 +482,7 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
           result = this.getInt().equals(objToCompare.getInt());
           break;
         case LONG:
+        case DATE:
         case TIMESTAMP:
           result = this.getLong().equals(objToCompare.getLong());
           break;
@@ -448,6 +520,7 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
           java.math.BigDecimal val1 = this.getDecimal();
           java.math.BigDecimal val2 = o.getDecimal();
           return val1.compareTo(val2);
+        case DATE:
         case TIMESTAMP:
           SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
               .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/expression/LiteralExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/expression/LiteralExpression.java b/core/src/main/java/org/apache/carbondata/scan/expression/LiteralExpression.java
index 671c209..4ffdd9b 100644
--- a/core/src/main/java/org/apache/carbondata/scan/expression/LiteralExpression.java
+++ b/core/src/main/java/org/apache/carbondata/scan/expression/LiteralExpression.java
@@ -38,12 +38,12 @@ public class LiteralExpression extends LeafExpression {
   }
 
   @Override public ExpressionResult evaluate(RowIntf value) {
-    ExpressionResult expressionResult = new ExpressionResult(dataType, this.value);
+    ExpressionResult expressionResult = new ExpressionResult(dataType, this.value, true);
     return expressionResult;
   }
 
   public ExpressionResult getExpressionResult() {
-    ExpressionResult expressionResult = new ExpressionResult(dataType, this.value);
+    ExpressionResult expressionResult = new ExpressionResult(dataType, this.value, true);
     return expressionResult;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/expression/conditional/EqualToExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/EqualToExpression.java b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/EqualToExpression.java
index 011b29f..4070565 100644
--- a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/EqualToExpression.java
+++ b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/EqualToExpression.java
@@ -80,6 +80,7 @@ public class EqualToExpression extends BinaryConditionalExpression {
       case DOUBLE:
         result = FilterUtil.nanSafeEqualsDoubles(val1.getDouble(), val2.getDouble());
         break;
+      case DATE:
       case TIMESTAMP:
         result = val1.getTime().equals(val2.getTime());
         break;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/expression/conditional/GreaterThanEqualToExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/GreaterThanEqualToExpression.java b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/GreaterThanEqualToExpression.java
index 98c438a..2818041 100644
--- a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/GreaterThanEqualToExpression.java
+++ b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/GreaterThanEqualToExpression.java
@@ -63,6 +63,7 @@ public class GreaterThanEqualToExpression extends BinaryConditionalExpression {
       case DOUBLE:
         result = elRes.getDouble() >= (erRes.getDouble());
         break;
+      case DATE:
       case TIMESTAMP:
         result = elRes.getTime() >= (erRes.getTime());
         break;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/expression/conditional/GreaterThanExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/GreaterThanExpression.java b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/GreaterThanExpression.java
index a477d7b..ef89368 100644
--- a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/GreaterThanExpression.java
+++ b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/GreaterThanExpression.java
@@ -65,6 +65,7 @@ public class GreaterThanExpression extends BinaryConditionalExpression {
       case INT:
         result = exprLeftRes.getInt() > (exprRightRes.getInt());
         break;
+      case DATE:
       case TIMESTAMP:
         result = exprLeftRes.getTime() > (exprRightRes.getTime());
         break;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/expression/conditional/InExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/InExpression.java b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/InExpression.java
index 1e0d3b9..abfd78d 100644
--- a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/InExpression.java
+++ b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/InExpression.java
@@ -70,6 +70,7 @@ public class InExpression extends BinaryConditionalExpression {
           case LONG:
             val = new ExpressionResult(val.getDataType(), expressionResVal.getLong());
             break;
+          case DATE:
           case TIMESTAMP:
             val = new ExpressionResult(val.getDataType(), expressionResVal.getTime());
             break;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/expression/conditional/LessThanEqualToExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/LessThanEqualToExpression.java b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/LessThanEqualToExpression.java
index e78a112..9739efe 100644
--- a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/LessThanEqualToExpression.java
+++ b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/LessThanEqualToExpression.java
@@ -63,6 +63,7 @@ public class LessThanEqualToExpression extends BinaryConditionalExpression {
       case DOUBLE:
         result = elRes.getDouble() <= (erRes.getDouble());
         break;
+      case DATE:
       case TIMESTAMP:
         result = elRes.getTime() <= (erRes.getTime());
         break;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/expression/conditional/LessThanExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/LessThanExpression.java b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/LessThanExpression.java
index 5b19afe..18f50f4 100644
--- a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/LessThanExpression.java
+++ b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/LessThanExpression.java
@@ -67,6 +67,7 @@ public class LessThanExpression extends BinaryConditionalExpression {
       case DOUBLE:
         result = elRes.getDouble() < (erRes.getDouble());
         break;
+      case DATE:
       case TIMESTAMP:
         result = elRes.getTime() < (erRes.getTime());
         break;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/expression/conditional/NotEqualsExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/NotEqualsExpression.java b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/NotEqualsExpression.java
index 9181946..dbfaea5 100644
--- a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/NotEqualsExpression.java
+++ b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/NotEqualsExpression.java
@@ -77,6 +77,7 @@ public class NotEqualsExpression extends BinaryConditionalExpression {
       case DOUBLE:
         result = val1.getDouble().doubleValue() != val2.getDouble().doubleValue();
         break;
+      case DATE:
       case TIMESTAMP:
         result = val1.getTime().longValue() != val2.getTime().longValue();
         break;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/expression/conditional/NotInExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/NotInExpression.java b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/NotInExpression.java
index 2ed0797..0328abd 100644
--- a/core/src/main/java/org/apache/carbondata/scan/expression/conditional/NotInExpression.java
+++ b/core/src/main/java/org/apache/carbondata/scan/expression/conditional/NotInExpression.java
@@ -65,6 +65,7 @@ public class NotInExpression extends BinaryConditionalExpression {
           case DOUBLE:
             val = new ExpressionResult(val.getDataType(), exprResVal.getDouble());
             break;
+          case DATE:
           case TIMESTAMP:
             val = new ExpressionResult(val.getDataType(), exprResVal.getTime());
             break;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java
index 3f71ced..c1985bc 100644
--- a/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java
@@ -221,8 +221,8 @@ public final class FilterUtil {
    */
   public static boolean checkIfDataTypeNotTimeStamp(Expression expression) {
     if (expression.getFilterExpressionType() == ExpressionType.LITERAL) {
-      if (!(((LiteralExpression) expression).getLiteralExpDataType()
-          == DataType.TIMESTAMP)) {
+      DataType dataType = ((LiteralExpression) expression).getLiteralExpDataType();
+      if (!(dataType == DataType.TIMESTAMP || dataType == DataType.DATE)) {
         return true;
       }
     }
@@ -680,8 +680,9 @@ public final class FilterUtil {
    * Method will return the start key based on KeyGenerator for the respective
    * filter resolved instance.
    *
-   * @param dimColResolvedFilterInfo
-   * @param segmentProperties
+   * @param dimensionFilter
+   * @param startKey
+   * @param startKeyList
    * @return long[] start key
    */
   public static void getStartKey(Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter,
@@ -706,7 +707,6 @@ public final class FilterUtil {
    * all dimension and the indexes which will help to read the respective filter value.
    *
    * @param dimColResolvedFilterInfo
-   * @param segmentProperties
    * @param setOfStartKeyByteArray
    * @return
    */
@@ -761,7 +761,6 @@ public final class FilterUtil {
    * all dimension and the indexes which will help to read the respective filter value.
    *
    * @param dimColResolvedFilterInfo
-   * @param segmentProperties
    * @param setOfEndKeyByteArray
    * @return end key array
    */
@@ -1123,6 +1122,7 @@ public final class FilterUtil {
         case BOOLEAN:
           return Boolean
               .compare((Boolean.parseBoolean(dictionaryVal)), (Boolean.parseBoolean(memberVal)));
+        case DATE:
         case TIMESTAMP:
           SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
               .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -1247,6 +1247,7 @@ public final class FilterUtil {
           java.math.BigDecimal val1 = new BigDecimal(filterMember1);
           java.math.BigDecimal val2 = new BigDecimal(filterMember2);
           return val1.compareTo(val2);
+        case DATE:
         case TIMESTAMP:
           if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(filterMember1)) {
             return 1;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/filter/resolver/ConditionalFilterResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/resolver/ConditionalFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/scan/filter/resolver/ConditionalFilterResolverImpl.java
index 612ea6f..b8e0b79 100644
--- a/core/src/main/java/org/apache/carbondata/scan/filter/resolver/ConditionalFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/resolver/ConditionalFilterResolverImpl.java
@@ -103,7 +103,8 @@ public class ConditionalFilterResolverImpl implements FilterResolverIntf {
         metadata.setColumnExpression(columnExpression);
         metadata.setExpression(leftExp);
         metadata.setIncludeFilter(isIncludeFilter);
-        if (columnExpression.getDataType().equals(DataType.TIMESTAMP)) {
+        if (columnExpression.getDataType().equals(DataType.TIMESTAMP) ||
+            columnExpression.getDataType().equals(DataType.DATE)) {
           isExpressionResolve = true;
         } else {
           // if imei=imei comes in filter condition then we need to

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/filter/resolver/RestructureFilterResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/resolver/RestructureFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/scan/filter/resolver/RestructureFilterResolverImpl.java
index a674698..15c015a 100644
--- a/core/src/main/java/org/apache/carbondata/scan/filter/resolver/RestructureFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/resolver/RestructureFilterResolverImpl.java
@@ -79,7 +79,8 @@ public class RestructureFilterResolverImpl implements FilterResolverIntf {
       Expression right = binaryConditionalExpression.getRight();
       if (left instanceof ColumnExpression) {
         ColumnExpression columnExpression = (ColumnExpression) left;
-        if (columnExpression.getDataType().equals(DataType.TIMESTAMP)) {
+        if (columnExpression.getDataType().equals(DataType.TIMESTAMP) ||
+            columnExpression.getDataType().equals(DataType.DATE) ) {
           isExpressionResolve = true;
         } else {
           // If imei=imei comes in filter condition then we need to
@@ -99,7 +100,8 @@ public class RestructureFilterResolverImpl implements FilterResolverIntf {
         }
       } else if (right instanceof ColumnExpression) {
         ColumnExpression columnExpression = (ColumnExpression) right;
-        if (columnExpression.getDataType().equals(DataType.TIMESTAMP)) {
+        if (columnExpression.getDataType().equals(DataType.TIMESTAMP) ||
+            columnExpression.getDataType().equals(DataType.DATE)) {
           isExpressionResolve = true;
         } else {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/filter/resolver/resolverinfo/visitor/CustomTypeDictionaryVisitor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/resolver/resolverinfo/visitor/CustomTypeDictionaryVisitor.java b/core/src/main/java/org/apache/carbondata/scan/filter/resolver/resolverinfo/visitor/CustomTypeDictionaryVisitor.java
index 5cc5170..95420be 100644
--- a/core/src/main/java/org/apache/carbondata/scan/filter/resolver/resolverinfo/visitor/CustomTypeDictionaryVisitor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/resolver/resolverinfo/visitor/CustomTypeDictionaryVisitor.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.scan.expression.ColumnExpression;
 import org.apache.carbondata.scan.expression.exception.FilterIllegalMemberException;
 import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.scan.filter.DimColumnFilterInfo;
-import org.apache.carbondata.scan.filter.FilterUtil;
 import org.apache.carbondata.scan.filter.resolver.metadata.FilterResolverMetadata;
 import org.apache.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 
@@ -61,10 +60,8 @@ public class CustomTypeDictionaryVisitor implements ResolvedFilterInfoVisitorInt
     } catch (FilterIllegalMemberException e) {
       throw new FilterUnsupportedException(e);
     }
-    boolean isNotTimestampType = FilterUtil.checkIfDataTypeNotTimeStamp(metadata.getExpression());
     resolvedFilterObject = getDirectDictionaryValKeyMemberForFilter(metadata.getTableIdentifier(),
-        metadata.getColumnExpression(), evaluateResultListFinal, metadata.isIncludeFilter(),
-        isNotTimestampType);
+        metadata.getColumnExpression(), evaluateResultListFinal, metadata.isIncludeFilter());
     if (!metadata.isIncludeFilter() && null != resolvedFilterObject && !resolvedFilterObject
         .getFilterList().contains(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY)) {
       // Adding default surrogate key of null member inorder to not display the same while
@@ -78,12 +75,12 @@ public class CustomTypeDictionaryVisitor implements ResolvedFilterInfoVisitorInt
 
   private DimColumnFilterInfo getDirectDictionaryValKeyMemberForFilter(
       AbsoluteTableIdentifier tableIdentifier, ColumnExpression columnExpression,
-      List<String> evaluateResultListFinal, boolean isIncludeFilter, boolean isNotTimestampType) {
+      List<String> evaluateResultListFinal, boolean isIncludeFilter) {
     List<Integer> surrogates = new ArrayList<Integer>(20);
     DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
         .getDirectDictionaryGenerator(columnExpression.getDimension().getDataType());
     // Reading the dictionary value direct
-    getSurrogateValuesForDictionary(evaluateResultListFinal, surrogates, isNotTimestampType,
+    getSurrogateValuesForDictionary(evaluateResultListFinal, surrogates,
         directDictionaryGenerator);
 
     Collections.sort(surrogates);
@@ -97,14 +94,10 @@ public class CustomTypeDictionaryVisitor implements ResolvedFilterInfoVisitorInt
   }
 
   private void getSurrogateValuesForDictionary(List<String> evaluateResultListFinal,
-      List<Integer> surrogates, boolean isNotTimestampType,
-      DirectDictionaryGenerator directDictionaryGenerator) {
+      List<Integer> surrogates, DirectDictionaryGenerator directDictionaryGenerator) {
     String timeFormat = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
             CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
-    if (isNotTimestampType) {
-      timeFormat = null;
-    }
     for (String filterMember : evaluateResultListFinal) {
       surrogates
           .add(directDictionaryGenerator.generateDirectSurrogateKey(filterMember, timeFormat));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/main/java/org/apache/carbondata/scan/filter/resolver/resolverinfo/visitor/ResolvedFilterInfoVisitorIntf.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/resolver/resolverinfo/visitor/ResolvedFilterInfoVisitorIntf.java b/core/src/main/java/org/apache/carbondata/scan/filter/resolver/resolverinfo/visitor/ResolvedFilterInfoVisitorIntf.java
index 19ad3aa..1b2610d 100644
--- a/core/src/main/java/org/apache/carbondata/scan/filter/resolver/resolverinfo/visitor/ResolvedFilterInfoVisitorIntf.java
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/resolver/resolverinfo/visitor/ResolvedFilterInfoVisitorIntf.java
@@ -33,7 +33,7 @@ public interface ResolvedFilterInfoVisitorIntf {
    *
    * @param visitableObj
    * @param metadata
-   * @throws QueryExecutionException
+   * @throws FilterUnsupportedException
    */
   void populateFilterResolvedInfo(DimColumnResolvedFilterInfo visitableObj,
       FilterResolverMetadata metadata) throws FilterUnsupportedException;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
index d3a3604..6c68a73 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
@@ -77,6 +77,7 @@ public class DataTypeUtilTest {
 
   @Test public void testGetDataType() {
     assertEquals(DataType.TIMESTAMP, getDataType("TIMESTAMP"));
+    assertEquals(DataType.DATE, getDataType("DATE"));
     assertEquals(DataType.STRING, getDataType("STRING"));
     assertEquals(DataType.INT, getDataType("INT"));
     assertEquals(DataType.SHORT, getDataType("SMALLINT"));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java b/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
index 8900cef..7f21d90 100644
--- a/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
+++ b/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
@@ -162,9 +162,12 @@ public class DictionaryBasedResultCollectorTest {
     new MockUp<DirectDictionaryKeyGeneratorFactory>() {
       @SuppressWarnings("unused") @Mock DirectDictionaryGenerator getDirectDictionaryGenerator(
           DataType dataType) {
-        if (dataType == DataType.TIMESTAMP) return new TimeStampDirectDictionaryGenerator(
-            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
-        else return null;
+        if (dataType == DataType.TIMESTAMP || dataType == DataType.DATE) {
+          return new TimeStampDirectDictionaryGenerator(
+                  CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+        }  else {
+          return null;
+        }
       }
     };
     new MockUp<TimeStampDirectDictionaryGenerator>() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/core/src/test/java/org/apache/carbondata/scan/expression/ExpressionResultTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/scan/expression/ExpressionResultTest.java b/core/src/test/java/org/apache/carbondata/scan/expression/ExpressionResultTest.java
index 14eb3d6..dd29191 100644
--- a/core/src/test/java/org/apache/carbondata/scan/expression/ExpressionResultTest.java
+++ b/core/src/test/java/org/apache/carbondata/scan/expression/ExpressionResultTest.java
@@ -295,7 +295,7 @@ public class ExpressionResultTest {
     Date dateToStr;
     try {
       dateToStr = parser.parse(value.toString());
-      return dateToStr.getTime() * 1000;
+      return dateToStr.getTime();
     } catch (ParseException e) {
       throw new FilterIllegalMemberException("Cannot convert value to Time/Long type value");
     }
@@ -464,7 +464,7 @@ public class ExpressionResultTest {
     ExpressionResult expressionResult =
         new ExpressionResult(DataType.TIMESTAMP, "2016-11-07 10:15:09");
     int actualValue = expressionResult.compareTo(obj);
-    int expectedValue = -1;
+    int expectedValue = 0;
     assertEquals(expectedValue, actualValue);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index f98d46d..c6c103b 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -37,7 +37,7 @@ object CarbonExample {
     cc.sql("""
            CREATE TABLE IF NOT EXISTS t3
            (ID Int, date Timestamp, country String,
-           name String, phonetype String, serialname String, salary Int)
+           name String, phonetype String, serialname char(10), salary Int)
            STORED BY 'carbondata'
            """)
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/examples/spark2/src/main/resources/data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv
index b44672f..2722edd 100644
--- a/examples/spark2/src/main/resources/data.csv
+++ b/examples/spark2/src/main/resources/data.csv
@@ -1,10 +1,10 @@
-1,10,100,48.4,spark,2015/4/23,1.23
-5,17,140,43.4,spark,2015/7/27,3.45
-1,11,100,44.4,flink,2015/5/23,23.23
-1,10,150,43.4,spark,2015/7/24,254.12
-1,10,100,47.4,spark,2015/7/23,876.14
-3,14,160,43.4,hive,2015/7/26,3454.32
-2,10,100,43.4,impala,2015/7/23,456.98
-1,10,100,43.4,spark,2015/5/23,32.53
-4,16,130,42.4,impala,2015/7/23,67.23
-1,10,100,43.4,spark,2015/7/23,832.23
\ No newline at end of file
+1,10,100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23 11:01:01,aaa
+5,17,140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27 11:01:02,bbb
+1,11,100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23 11:01:03,ccc
+1,10,150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24 11:01:04,ddd
+1,10,100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23 11:01:05,eeee
+3,14,160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26 11:01:06,ff
+2,10,100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23 11:01:07,ggg
+1,10,100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23 11:01:08,hhh
+4,16,130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23 11:01:09,iii
+1,10,100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23 11:01:10,jjj

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 0aeb8be..4aff45a 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -71,8 +71,13 @@ object CarbonExample {
          |    doubleField double,
          |    stringField string,
          |    timestampField timestamp,
-         |    decimalField decimal(18,2))
+         |    decimalField decimal(18,2),
+         |    dateField date,
+         |    charField char(5)
+         | )
          | USING org.apache.spark.sql.CarbonSource
+         | OPTIONS('DICTIONARY_INCLUDE'='dateField, charField',
+         |   'dbName'='default', 'tableName'='carbon_table')
        """.stripMargin)
 
     // val prop = s"$rootPath/conf/dataload.properties.template"
@@ -89,7 +94,9 @@ object CarbonExample {
          |    doubleField double,
          |    stringField string,
          |    timestampField string,
-         |    decimalField decimal(18,2))
+         |    decimalField decimal(18,2),
+         |    dateField string,
+         |    charField char(5))
          |    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
        """.stripMargin)
 
@@ -108,7 +115,8 @@ object CarbonExample {
       s"""
          | INSERT INTO TABLE carbon_table
          | SELECT shortField, intField, bigintField, doubleField, stringField,
-         | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField
+         | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField,
+         | cast(to_date(from_unixtime(unix_timestamp(dateField,'yyyy/M/dd'))) as date), charField
          | FROM csv_table
        """.stripMargin)
 
@@ -124,6 +132,15 @@ object CarbonExample {
               """).show
 
     spark.sql("""
+             SELECT *
+             FROM carbon_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
+              """).show
+
+    spark.sql("""
+             select count(stringField) from carbon_table
+              """.stripMargin).show
+
+    spark.sql("""
            SELECT sum(intField), stringField
            FROM carbon_table
            GROUP BY stringField

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/format/src/main/thrift/schema.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index 1df59a5..377c372 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -33,6 +33,7 @@ enum DataType {
 	DOUBLE = 4,
 	DECIMAL = 5,
 	TIMESTAMP = 6,
+	DATE = 7,
 	ARRAY = 20,
 	STRUCT = 21,
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 1cdd497..091281c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -76,6 +76,7 @@ object CarbonScalaUtil {
       case CarbonDataType.BOOLEAN => BooleanType
       case CarbonDataType.DECIMAL => DecimalType.SYSTEM_DEFAULT
       case CarbonDataType.TIMESTAMP => TimestampType
+      case CarbonDataType.DATE => DateType
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index 5e656d9..570d8ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -22,7 +22,7 @@ import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 object DataTypeConverterUtil {
   def convertToCarbonType(dataType: String): DataType = {
     dataType.toLowerCase match {
-      case "string" => DataType.STRING
+      case "string" | "char" => DataType.STRING
       case "int" => DataType.INT
       case "integer" => DataType.INT
       case "tinyint" => DataType.SHORT
@@ -33,6 +33,7 @@ object DataTypeConverterUtil {
       case "double" => DataType.DOUBLE
       case "decimal" => DataType.DECIMAL
       case "timestamp" => DataType.TIMESTAMP
+      case "date" => DataType.DATE
       case "array" => DataType.ARRAY
       case "struct" => DataType.STRUCT
       case _ => convertToCarbonTypeForSpark2(dataType)
@@ -52,6 +53,7 @@ object DataTypeConverterUtil {
       case "doubletype" => DataType.DOUBLE
       case "decimaltype" => DataType.DECIMAL
       case "timestamptype" => DataType.TIMESTAMP
+      case "datetype" => DataType.DATE
       case "arraytype" => DataType.ARRAY
       case "structtype" => DataType.STRUCT
       case _ => sys.error(s"Unsupported data type: $dataType")
@@ -67,6 +69,7 @@ object DataTypeConverterUtil {
       case DataType.DOUBLE => "double"
       case DataType.DECIMAL => "decimal"
       case DataType.TIMESTAMP => "timestamp"
+      case DataType.DATE => "date"
       case DataType.ARRAY => "array"
       case DataType.STRUCT => "struct"
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 7416c90..531b691 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -151,7 +151,7 @@ class TableNewProcessor(cm: TableModel, sqlContext: SQLContext) {
     if (highCardinalityDims.contains(colName)) {
       encoders.remove(encoders.remove(Encoding.DICTIONARY))
     }
-    if (dataType == DataType.TIMESTAMP) {
+    if (dataType == DataType.TIMESTAMP || dataType == DataType.DATE) {
       encoders.add(Encoding.DIRECT_DICTIONARY)
     }
     val colPropMap = new java.util.HashMap[String, String]()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 42c67b9..68f923d 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.carbondata.spark.readsupport;
 
+import java.sql.Date;
 import java.sql.Timestamp;
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
@@ -41,16 +42,25 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
 
   @Override public Row readRow(Object[] data) {
     for (int i = 0; i < dictionaries.length; i++) {
+      if (data[i] == null) {
+        continue;
+      }
       if (dictionaries[i] != null) {
         data[i] = DataTypeUtil
             .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKey((int) data[i]),
                 dataTypes[i]);
+        if (data[i] == null) {
+          continue;
+        }
         switch (dataTypes[i]) {
           case STRING:
             data[i] = UTF8String.fromString(data[i].toString());
             break;
           case TIMESTAMP:
-            data[i] = new Timestamp((long) data[i] / 1000);
+            data[i] = new Timestamp((long) data[i]);
+            break;
+          case DATE:
+            data[i] = new Date((long) data[i]);
             break;
           case LONG:
             data[i] = data[i];
@@ -61,7 +71,9 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
       else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         //convert the long to timestamp in case of direct dictionary column
         if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
-          data[i] = new Timestamp((long) data[i] / 1000);
+          data[i] = new Timestamp((long) data[i]);
+        } else if (DataType.DATE == carbonColumns[i].getDataType()) {
+          data[i] = new Date((long) data[i]);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index 41595d5..0954374 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -147,6 +147,7 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) {
       case DoubleType => CarbonType.DOUBLE.getName
       case BooleanType => CarbonType.DOUBLE.getName
       case TimestampType => CarbonType.TIMESTAMP.getName
+      case DateType => CarbonType.DATE.getName
       case other => sys.error(s"unsupported type: $other")
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 397d479..dfc1bc9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -108,6 +108,7 @@ case class CarbonDictionaryDecoder(
           DecimalType(precision, scale)
         }
       case DataType.TIMESTAMP => TimestampType
+      case DataType.DATE => DateType
       case DataType.STRUCT =>
         CarbonMetastoreTypes
           .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 3f371aa..21864d1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -144,6 +144,8 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
   protected val STRING = carbonKeyWord("STRING")
   protected val INTEGER = carbonKeyWord("INTEGER")
   protected val TIMESTAMP = carbonKeyWord("TIMESTAMP")
+  protected val DATE = carbonKeyWord("DATE")
+  protected val CHAR = carbonKeyWord("CHAR")
   protected val NUMERIC = carbonKeyWord("NUMERIC")
   protected val DECIMAL = carbonKeyWord("DECIMAL")
   protected val DOUBLE = carbonKeyWord("DOUBLE")
@@ -381,6 +383,9 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
                     f.scale = scale
                     f.dataType = Some("decimal")
                   }
+                  if(f.dataType.getOrElse("").startsWith("char")) {
+                    f.dataType = Some("char")
+                  }
                   fields ++= Seq(f)
                 }
               }
@@ -815,7 +820,8 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
     fields.foreach(field => {
 
       if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
-        if (DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.TIMESTAMP) {
+        val dataType = DataTypeUtil.getDataType(field.dataType.get.toUpperCase())
+        if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE ) {
           noDictionaryDims :+= field.column
         }
         dimFields += field
@@ -853,7 +859,7 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
    * @param dimensionDatatype
    */
   def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
-    val dimensionType = Array("string", "array", "struct", "timestamp")
+    val dimensionType = Array("string", "array", "struct", "timestamp", "date", "char")
     dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
   }
 
@@ -1090,7 +1096,16 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
     STRING ^^^ "string" | INTEGER ^^^ "integer" |
     TIMESTAMP ^^^ "timestamp" | NUMERIC ^^^ "numeric" |
     BIGINT ^^^ "bigint" | SHORT ^^^ "smallint" |
-    INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType
+    INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType | DATE ^^^ "date" | charType
+
+  /**
+   * Matching the decimal(10,0) data type and returning the same.
+   */
+  private lazy val charType =
+    CHAR ~ ("(" ~>numericLit <~ ")").? ^^ {
+      case char ~ digit =>
+        s"$char($digit)"
+    }
 
   /**
    * Matching the decimal(10,0) data type and returning the same.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index b065850..62803c7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -498,7 +498,9 @@ object CarbonMetastoreTypes extends RegexParsers {
     fixedDecimalType |
     "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
     "varchar\\((\\d+)\\)".r ^^^ StringType |
-    "timestamp" ^^^ TimestampType
+    "timestamp" ^^^ TimestampType |
+    "date" ^^^ DateType |
+    "char\\((\\d+)\\)".r ^^^ StringType
 
   protected lazy val fixedDecimalType: Parser[DataType] =
     "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
@@ -556,6 +558,7 @@ object CarbonMetastoreTypes extends RegexParsers {
       case BinaryType => "binary"
       case BooleanType => "boolean"
       case DecimalType() => "decimal"
+      case DateType => "date"
       case TimestampType => "timestamp"
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/test/resources/datasamplefordate.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/datasamplefordate.csv b/integration/spark/src/test/resources/datasamplefordate.csv
new file mode 100644
index 0000000..70d32d2
--- /dev/null
+++ b/integration/spark/src/test/resources/datasamplefordate.csv
@@ -0,0 +1,4 @@
+empno,doj,salary
+11,2016-04-14,5040.56
+12,2016-03-14,1040.56
+13,,1040.56

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/test/resources/datasamplenull.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/datasamplenull.csv b/integration/spark/src/test/resources/datasamplenull.csv
index 475bbe1..fc7e248 100644
--- a/integration/spark/src/test/resources/datasamplenull.csv
+++ b/integration/spark/src/test/resources/datasamplenull.csv
@@ -1,3 +1,3 @@
-ID,date,country,name,phonetype,serialname,salary
+ID,dateField,country,name,phonetype,serialname,salary
 1,2015/7/23,china,aaa1,phone197,ASD69643,15000
 2,,china,aaa2,phone756,ASD42892,15001
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeDirectDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeDirectDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeDirectDictionaryTest.scala
new file mode 100644
index 0000000..e5cf72d
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeDirectDictionaryTest.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.directdictionary
+
+import java.io.File
+import java.sql.Date
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext.{sql, _}
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+
+/**
+  * Test Class for detailed query on timestamp datatypes
+  *
+  *
+  */
+class DateDataTypeDirectDictionaryTest extends QueryTest with BeforeAndAfterAll {
+  var hiveContext: HiveContext = _
+
+  override def beforeAll {
+    try {
+      CarbonProperties.getInstance().addProperty("carbon.direct.dictionary", "true")
+      sql("drop table if exists directDictionaryTable ")
+      sql("drop table if exists directDictionaryTable_hive ")
+      sql(
+        "CREATE TABLE if not exists directDictionaryTable (empno int,doj date, " +
+          "salary int) " +
+          "STORED BY 'org.apache.carbondata.format'"
+      )
+
+      sql(
+        "CREATE TABLE if not exists directDictionaryTable_hive (empno int,doj date, " +
+          "salary int) " +
+          "row format delimited fields terminated by ','"
+      )
+
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd")
+      val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+        .getCanonicalPath
+      val csvFilePath = currentDirectory + "/src/test/resources/datasamplefordate.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE directDictionaryTable OPTIONS" +
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')" )
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE directDictionaryTable_hive")
+      sql("select * from directDictionaryTable_hive").show(false)
+    } catch {
+      case x: Throwable =>
+        x.printStackTrace()
+        CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    }
+  }
+
+  test("test direct dictionary for not null condition") {
+    checkAnswer(
+      sql("select doj from directDictionaryTable where doj is not null"),
+      Seq(Row(Date.valueOf("2016-03-14")),
+        Row(Date.valueOf("2016-04-14"))
+      )
+    )
+  }
+
+  test("test direct dictionary for getting all the values") {
+    checkAnswer(
+      sql("select doj from directDictionaryTable"),
+      Seq(Row(Date.valueOf("2016-03-14")),
+        Row(Date.valueOf("2016-04-14")),
+        Row(null)
+      )
+    )
+  }
+
+  test("test direct dictionary for not equals condition") {
+    checkAnswer(
+      sql("select doj from directDictionaryTable where doj != '2016-04-14 00:00:00'"),
+      Seq(Row(Date.valueOf("2016-03-14"))
+      )
+    )
+  }
+
+  test("test direct dictionary for null condition") {
+    checkAnswer(
+      sql("select doj from directDictionaryTable where doj is null"),
+      Seq(Row(null)
+      )
+    )
+  }
+
+  test("select doj from directDictionaryTable with equals filter") {
+    checkAnswer(
+      sql("select doj from directDictionaryTable where doj = '2016-03-14 00:00:00'"),
+      Seq(Row(Date.valueOf("2016-03-14")))
+    )
+
+  }
+
+  test("select doj from directDictionaryTable with regexp_replace equals filter") {
+    checkAnswer(
+      sql("select doj from directDictionaryTable where regexp_replace(doj, '-', '/') = '2016/03/14'"),
+      Seq(Row(Date.valueOf("2016-03-14")))
+    )
+  }
+
+  test("select doj from directDictionaryTable with regexp_replace NOT IN filter") {
+    checkAnswer(
+      sql("select doj from directDictionaryTable where regexp_replace(doj, '-', '/') NOT IN ('2016/03/14')"),
+      sql("select doj from directDictionaryTable_hive where regexp_replace(doj, '-', '/') NOT IN ('2016/03/14')")
+    )
+  }
+
+  test("select doj from directDictionaryTable with greater than filter") {
+    checkAnswer(
+      sql("select doj from directDictionaryTable where doj > '2016-03-14 00:00:00'"),
+      Seq(Row(Date.valueOf("2016-04-14")))
+    )
+  }
+
+  test("select count(doj) from directDictionaryTable") {
+    checkAnswer(
+      sql("select count(doj) from directDictionaryTable"),
+      Seq(Row(2))
+    )
+  }
+
+  override def afterAll {
+    sql("drop table directDictionaryTable")
+    sql("drop table directDictionaryTable_hive")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance().addProperty("carbon.direct.dictionary", "false")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeDirectDictionaryWithNoDictTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeDirectDictionaryWithNoDictTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeDirectDictionaryWithNoDictTestCase.scala
new file mode 100644
index 0000000..db2461e
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeDirectDictionaryWithNoDictTestCase.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.directdictionary
+
+import java.io.File
+import java.sql.{Date, Timestamp}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+
+/**
+  * Test Class for detailed query on timestamp datatypes
+  *
+  *
+  */
+class DateDataTypeDirectDictionaryWithNoDictTestCase extends QueryTest with BeforeAndAfterAll {
+  var hiveContext: HiveContext = _
+
+  override def beforeAll {
+    try {
+      CarbonProperties.getInstance().addProperty("carbon.direct.dictionary", "true")
+      sql(
+        """
+         CREATE TABLE IF NOT EXISTS directDictionaryTable
+        (empno String, doj Date, salary Int)
+         STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_EXCLUDE'='empno')"""
+      )
+
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss")
+      val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+        .getCanonicalPath
+      val csvFilePath = currentDirectory + "/src/test/resources/datasample.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE directDictionaryTable OPTIONS"
+        + "('DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+    } catch {
+      case x: Throwable =>
+        x.printStackTrace()
+        CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    }
+  }
+
+  test("select doj from directDictionaryTable") {
+    checkAnswer(
+      sql("select doj from directDictionaryTable"),
+      Seq(Row(Date.valueOf("2016-03-14")),
+        Row(Date.valueOf("2016-04-14")),
+        Row(null)
+      )
+    )
+  }
+
+
+  test("select doj from directDictionaryTable with equals filter") {
+    checkAnswer(
+      sql("select doj from directDictionaryTable where doj='2016-03-14 15:00:09'"),
+      Seq(Row(Date.valueOf("2016-03-14")))
+    )
+
+  }
+
+  test("select doj from directDictionaryTable with greater than filter") {
+    checkAnswer(
+      sql("select doj from directDictionaryTable where doj>'2016-03-14 15:00:09'"),
+      Seq(Row(Date.valueOf("2016-04-14")))
+    )
+
+  }
+
+
+  override def afterAll {
+    sql("drop table directDictionaryTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance().addProperty("carbon.direct.dictionary", "false")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d73f4bfe/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeNullDataTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeNullDataTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeNullDataTest.scala
new file mode 100644
index 0000000..82b6783
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/DateDataTypeNullDataTest.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.directdictionary
+
+import java.io.File
+import java.sql.{Date, Timestamp}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+
+/**
+  * Test Class for detailed query on timestamp datatypes
+  *
+  *
+  */
+class DateDataTypeNullDataTest extends QueryTest with BeforeAndAfterAll {
+  var hiveContext: HiveContext = _
+
+  override def beforeAll {
+    try {
+      sql(
+        """CREATE TABLE IF NOT EXISTS timestampTyeNullData
+                     (ID Int, dateField date, country String,
+                     name String, phonetype String, serialname String, salary Int)
+                    STORED BY 'org.apache.carbondata.format'"""
+      )
+
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+      val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+        .getCanonicalPath
+      val csvFilePath = currentDirectory + "/src/test/resources/datasamplenull.csv"
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath + "' INTO TABLE timestampTyeNullData").collect();
+
+    } catch {
+      case x: Throwable =>
+        x.printStackTrace()
+        CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    }
+  }
+
+  test("SELECT max(dateField) FROM timestampTyeNullData where dateField is not null") {
+    checkAnswer(
+      sql("SELECT max(dateField) FROM timestampTyeNullData where dateField is not null"),
+      Seq(Row(Date.valueOf("2015-07-23"))
+      )
+    )
+  }
+  test("SELECT * FROM timestampTyeNullData where dateField is null") {
+    checkAnswer(
+      sql("SELECT dateField FROM timestampTyeNullData where dateField is null"),
+      Seq(Row(null)
+      ))
+  }
+
+  override def afterAll {
+    sql("drop table timestampTyeNullData")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance().addProperty("carbon.direct.dictionary", "false")
+  }
+
+}
\ No newline at end of file