You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/06/05 06:22:39 UTC

carbondata git commit: [CARBONDATA-2554] Added support for logical type

Repository: carbondata
Updated Branches:
  refs/heads/master 27d705998 -> 2f2348690


[CARBONDATA-2554] Added support for logical type

Added support for date and timestamp logical types in AvroCarbonWriter.

This closes #2347


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f234869
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f234869
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f234869

Branch: refs/heads/master
Commit: 2f2348690964ac87c2f38939280958f2469d212d
Parents: 27d7059
Author: kunal642 <ku...@gmail.com>
Authored: Mon May 28 11:41:59 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Tue Jun 5 11:52:09 2018 +0530

----------------------------------------------------------------------
 .../DirectDictionaryGenerator.java              |   2 +
 .../DateDirectDictionaryGenerator.java          |   2 +-
 .../TimeStampDirectDictionaryGenerator.java     |   2 +-
 .../TestNonTransactionalCarbonTable.scala       | 145 ++++++++++++++++++-
 .../processing/datatypes/PrimitiveDataType.java |  44 +++++-
 .../loading/dictionary/DirectDictionary.java    |   4 +
 .../InputProcessorStepWithNoConverterImpl.java  |  24 ++-
 .../carbondata/sdk/file/AvroCarbonWriter.java   |  71 ++++++++-
 8 files changed, 279 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
index 469fe1e..2139f31 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
@@ -40,6 +40,8 @@ public interface DirectDictionaryGenerator {
    */
   Object getValueFromSurrogate(int key);
 
+  int generateKey(long value);
+
   /**
    * The method generate and returns the dictionary / surrogate key for direct dictionary column
    * This Method is called while executing filter queries for getting direct surrogate members.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
index c49af9c..329e260 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -163,7 +163,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
     }
   }
 
-  private int generateKey(long timeValue) {
+  public int generateKey(long timeValue) {
     if (timeValue < MIN_VALUE || timeValue > MAX_VALUE) {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Value for date type column is not in valid range. Value considered as null.");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
index d218e99..c7a4194 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
@@ -206,7 +206,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
     }
   }
 
-  private int generateKey(long timeValue) {
+  public int generateKey(long timeValue) {
     long time = (timeValue - cutOffTimeStamp) / granularityFactor;
     int keyValue = -1;
     if (time >= (long) Integer.MIN_VALUE && time <= (long) Integer.MAX_VALUE) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 5beb9c4..095d12d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -17,7 +17,7 @@
 
 package org.apache.carbondata.spark.testsuite.createTable
 
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
 import java.io.{File, FileFilter, IOException}
 import java.util
 import java.util.concurrent.TimeUnit
@@ -42,6 +42,7 @@ import scala.concurrent.duration.Duration
 
 import org.apache.avro
 import org.apache.commons.lang.CharEncoding
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import tech.allegro.schema.json2avro.converter.JsonAvroConverter
 
 import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
@@ -2151,4 +2152,146 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     writer.close()
   }
 
+  test("test logical type date") {
+    sql("drop table if exists sdkOutputTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "id",
+        |						"type": {"type" : "int", "logicalType": "date"}
+        |		},
+        |		{
+        |			"name": "course_details",
+        |			"type": {
+        |				"name": "course_details",
+        |				"type": "record",
+        |				"fields": [
+        |					{
+        |						"name": "course_struct_course_time",
+        |						"type": {"type" : "int", "logicalType": "date"}
+        |					}
+        |				]
+        |			}
+        |		}
+        |	]
+        |}""".stripMargin
+
+    val json1 =
+      """{"id": 101, "course_details": { "course_struct_course_time":10}}""".stripMargin
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val converter = new JsonAvroConverter
+    val record = converter
+      .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType date, course_details struct<course_struct_course_time: date>) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(java.sql.Date.valueOf("1970-04-12"), Row(java.sql.Date.valueOf("1970-01-11")))))
+  }
+
+  test("test logical type timestamp-millis") {
+    sql("drop table if exists sdkOutputTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "id",
+        |						"type": {"type" : "long", "logicalType": "timestamp-millis"}
+        |		},
+        |		{
+        |			"name": "course_details",
+        |			"type": {
+        |				"name": "course_details",
+        |				"type": "record",
+        |				"fields": [
+        |					{
+        |						"name": "course_struct_course_time",
+        |						"type": {"type" : "long", "logicalType": "timestamp-millis"}
+        |					}
+        |				]
+        |			}
+        |		}
+        |	]
+        |}""".stripMargin
+
+    val json1 =
+      """{"id": 172800000,"course_details": { "course_struct_course_time":172800000}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val converter = new JsonAvroConverter
+    val record = converter
+      .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType timestamp, course_details struct<course_struct_course_time: timestamp>) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Timestamp.valueOf("1970-01-02 16:00:00"), Row(Timestamp.valueOf("1970-01-02 16:00:00")))))
+  }
+
+  test("test logical type-micros timestamp") {
+    sql("drop table if exists sdkOutputTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "id",
+        |						"type": {"type" : "long", "logicalType": "timestamp-micros"}
+        |		},
+        |		{
+        |			"name": "course_details",
+        |			"type": {
+        |				"name": "course_details",
+        |				"type": "record",
+        |				"fields": [
+        |					{
+        |						"name": "course_struct_course_time",
+        |						"type": {"type" : "long", "logicalType": "timestamp-micros"}
+        |					}
+        |				]
+        |			}
+        |		}
+        |	]
+        |}""".stripMargin
+
+    val json1 =
+      """{"id": 172800000000,"course_details": { "course_struct_course_time":172800000000}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val converter = new JsonAvroConverter
+    val record = converter
+      .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType timestamp, course_details struct<course_struct_course_time: timestamp>) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Timestamp.valueOf("1970-01-02 16:00:00"), Row(Timestamp.valueOf("1970-01-02 16:00:00")))))
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 7450b82..3a477ce 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -288,7 +288,11 @@ public class PrimitiveDataType implements GenericDataType<Object> {
           logHolder.setReason(message);
         }
       } else {
-        surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+        if (dictionaryGenerator instanceof DirectDictionary && input instanceof Long) {
+          surrogateKey = ((DirectDictionary) dictionaryGenerator).generateKey((long) input);
+        } else {
+          surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+        }
         if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
           surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
           message = CarbonDataProcessorUtil
@@ -316,15 +320,36 @@ public class PrimitiveDataType implements GenericDataType<Object> {
           if (!this.carbonDimension.getUseActualData()) {
             byte[] value = null;
             if (isDirectDictionary) {
-              int surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+              int surrogateKey;
+              // If the input is a long value then this means that logical type was provided by
+              // the user using AvroCarbonWriter. In this case directly generate surrogate key
+              // using dictionaryGenerator.
+              if (dictionaryGenerator instanceof DirectDictionary && input instanceof Long) {
+                surrogateKey = ((DirectDictionary) dictionaryGenerator).generateKey((long) input);
+              } else {
+                surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+              }
               if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
                 value = new byte[0];
               } else {
                 value = ByteUtil.toBytes(surrogateKey);
               }
             } else {
-              value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
-                  this.carbonDimension.getDataType(), dateFormat);
+              // If the input is a long value then this means that logical type was provided by
+              // the user using AvroCarbonWriter. In this case directly generate Bytes from value.
+              if (this.carbonDimension.getDataType().equals(DataTypes.DATE)
+                  || this.carbonDimension.getDataType().equals(DataTypes.TIMESTAMP)
+                  && input instanceof Long) {
+                if (dictionaryGenerator != null) {
+                  value = ByteUtil.toBytes(((DirectDictionary) dictionaryGenerator)
+                      .generateKey((long) input));
+                } else {
+                  value = ByteUtil.toBytes(Long.parseLong(parsedValue));
+                }
+              } else {
+                value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
+                    this.carbonDimension.getDataType(), dateFormat);
+              }
               if (this.carbonDimension.getDataType() == DataTypes.STRING
                   && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
                 throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
@@ -333,8 +358,15 @@ public class PrimitiveDataType implements GenericDataType<Object> {
             }
             updateValueToByteStream(dataOutputStream, value);
           } else {
-            Object value = DataTypeUtil.getDataDataTypeForNoDictionaryColumn(parsedValue,
-                this.carbonDimension.getDataType(), dateFormat);
+            Object value;
+            if (dictionaryGenerator instanceof DirectDictionary
+                && input instanceof Long) {
+              value = ByteUtil.toBytes(
+                  ((DirectDictionary) dictionaryGenerator).generateKey((long) input));
+            } else {
+              value = DataTypeUtil.getDataDataTypeForNoDictionaryColumn(parsedValue,
+                  this.carbonDimension.getDataType(), dateFormat);
+            }
             if (this.carbonDimension.getDataType() == DataTypes.STRING
                 && value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
               throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java
index 165e5a4..33dc8e3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java
@@ -46,6 +46,10 @@ public class DirectDictionary implements BiDictionary<Integer, Object> {
     return dictionaryGenerator.generateDirectSurrogateKey(value.toString());
   }
 
+  public Integer generateKey(long value) {
+    return dictionaryGenerator.generateKey(value);
+  }
+
   @Override
   public Object getValue(Integer key) {
     return dictionaryGenerator.getValueFromSurrogate(key);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index c99a413..5f7a94c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -215,6 +217,10 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
 
     private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
 
+    private DirectDictionaryGenerator dateDictionaryGenerator;
+
+    private DirectDictionaryGenerator timestampDictionaryGenerator;
+
     public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
         boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
         DataType[] dataTypes, CarbonDataLoadConfiguration configuration,
@@ -313,7 +319,23 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
               throw new CarbonDataLoadingException("Loading Exception", e);
             }
           } else {
-            newData[i] = data[orderOfData[i]];
+            DataType dataType = dataFields[i].getColumn().getDataType();
+            if (dataType == DataTypes.DATE && data[orderOfData[i]] instanceof Long) {
+              if (dateDictionaryGenerator == null) {
+                dateDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+                    .getDirectDictionaryGenerator(dataType, dataFields[i].getDateFormat());
+              }
+              newData[i] = dateDictionaryGenerator.generateKey((long) data[orderOfData[i]]);
+            } else if (dataType == DataTypes.TIMESTAMP && data[orderOfData[i]] instanceof Long) {
+              if (timestampDictionaryGenerator == null) {
+                timestampDictionaryGenerator =
+                    DirectDictionaryKeyGeneratorFactory
+                        .getDirectDictionaryGenerator(dataType, dataFields[i].getTimestampFormat());
+              }
+              newData[i] = timestampDictionaryGenerator.generateKey((long) data[orderOfData[i]]);
+            } else {
+              newData[i] = data[orderOfData[i]];
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 8bbf364..edecd6b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -24,15 +24,21 @@ import java.util.Random;
 import java.util.UUID;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
 import org.apache.carbondata.processing.loading.complexobjects.StructObject;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.hadoop.conf.Configuration;
@@ -55,6 +61,8 @@ public class AvroCarbonWriter extends CarbonWriter {
   private TaskAttemptContext context;
   private ObjectArrayWritable writable;
   private Schema avroSchema;
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonTable.class.getName());
 
   AvroCarbonWriter(CarbonLoadModel loadModel) throws IOException {
     Configuration hadoopConf = new Configuration();
@@ -88,10 +96,35 @@ public class AvroCarbonWriter extends CarbonWriter {
   private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
     Object out;
     Schema.Type type = avroField.schema().getType();
+    LogicalType logicalType = avroField.schema().getLogicalType();
     switch (type) {
-      case BOOLEAN:
       case INT:
+        if (logicalType != null) {
+          if (logicalType instanceof LogicalTypes.Date) {
+            int dateIntValue = (int) fieldValue;
+            out = dateIntValue * DateDirectDictionaryGenerator.MILLIS_PER_DAY;
+          } else {
+            LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName());
+            out = fieldValue;
+          }
+        } else {
+          out = fieldValue;
+        }
+        break;
+      case BOOLEAN:
       case LONG:
+        if (logicalType != null && !(logicalType instanceof LogicalTypes.TimestampMillis)) {
+          if (logicalType instanceof LogicalTypes.TimestampMicros) {
+            long dateIntValue = (long) fieldValue;
+            out = dateIntValue / 1000L;
+          } else {
+            LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName());
+            out = fieldValue;
+          }
+        } else {
+          out = fieldValue;
+        }
+        break;
       case DOUBLE:
       case STRING:
         out = fieldValue;
@@ -177,13 +210,27 @@ public class AvroCarbonWriter extends CarbonWriter {
     String FieldName = avroField.name();
     Schema childSchema = avroField.schema();
     Schema.Type type = childSchema.getType();
+    LogicalType logicalType = childSchema.getLogicalType();
     switch (type) {
       case BOOLEAN:
         return new Field(FieldName, DataTypes.BOOLEAN);
       case INT:
-        return new Field(FieldName, DataTypes.INT);
+        if (logicalType instanceof LogicalTypes.Date) {
+          return new Field(FieldName, DataTypes.DATE);
+        } else {
+          LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema
+              .getName());
+          return new Field(FieldName, DataTypes.INT);
+        }
       case LONG:
-        return new Field(FieldName, DataTypes.LONG);
+        if (logicalType instanceof LogicalTypes.TimestampMillis
+            || logicalType instanceof LogicalTypes.TimestampMicros) {
+          return new Field(FieldName, DataTypes.TIMESTAMP);
+        } else {
+          LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema
+              .getName());
+          return new Field(FieldName, DataTypes.LONG);
+        }
       case DOUBLE:
         return new Field(FieldName, DataTypes.DOUBLE);
       case STRING:
@@ -221,13 +268,27 @@ public class AvroCarbonWriter extends CarbonWriter {
 
   private static StructField prepareSubFields(String FieldName, Schema childSchema) {
     Schema.Type type = childSchema.getType();
+    LogicalType logicalType = childSchema.getLogicalType();
     switch (type) {
       case BOOLEAN:
         return new StructField(FieldName, DataTypes.BOOLEAN);
       case INT:
-        return new StructField(FieldName, DataTypes.INT);
+        if (logicalType instanceof LogicalTypes.Date) {
+          return new StructField(FieldName, DataTypes.DATE);
+        } else {
+          LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema
+              .getName());
+          return new StructField(FieldName, DataTypes.INT);
+        }
       case LONG:
-        return new StructField(FieldName, DataTypes.LONG);
+        if (logicalType instanceof LogicalTypes.TimestampMillis
+            || logicalType instanceof LogicalTypes.TimestampMicros) {
+          return new StructField(FieldName, DataTypes.TIMESTAMP);
+        } else {
+          LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema
+              .getName());
+          return new StructField(FieldName, DataTypes.LONG);
+        }
       case DOUBLE:
         return new StructField(FieldName, DataTypes.DOUBLE);
       case STRING: