You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:41:12 UTC
[47/50] [abbrv] carbondata git commit: [CARBONDATA-2554] Added
support for logical type
[CARBONDATA-2554] Added support for logical type
Added support for date and timestamp logical types in AvroCarbonWriter.
This closes #2347
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f234869
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f234869
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f234869
Branch: refs/heads/spark-2.3
Commit: 2f2348690964ac87c2f38939280958f2469d212d
Parents: 27d7059
Author: kunal642 <ku...@gmail.com>
Authored: Mon May 28 11:41:59 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Tue Jun 5 11:52:09 2018 +0530
----------------------------------------------------------------------
.../DirectDictionaryGenerator.java | 2 +
.../DateDirectDictionaryGenerator.java | 2 +-
.../TimeStampDirectDictionaryGenerator.java | 2 +-
.../TestNonTransactionalCarbonTable.scala | 145 ++++++++++++++++++-
.../processing/datatypes/PrimitiveDataType.java | 44 +++++-
.../loading/dictionary/DirectDictionary.java | 4 +
.../InputProcessorStepWithNoConverterImpl.java | 24 ++-
.../carbondata/sdk/file/AvroCarbonWriter.java | 71 ++++++++-
8 files changed, 279 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
index 469fe1e..2139f31 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
@@ -40,6 +40,8 @@ public interface DirectDictionaryGenerator {
*/
Object getValueFromSurrogate(int key);
+ int generateKey(long value);
+
/**
* The method generate and returns the dictionary / surrogate key for direct dictionary column
* This Method is called while executing filter queries for getting direct surrogate members.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
index c49af9c..329e260 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -163,7 +163,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
}
}
- private int generateKey(long timeValue) {
+ public int generateKey(long timeValue) {
if (timeValue < MIN_VALUE || timeValue > MAX_VALUE) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Value for date type column is not in valid range. Value considered as null.");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
index d218e99..c7a4194 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
@@ -206,7 +206,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
}
}
- private int generateKey(long timeValue) {
+ public int generateKey(long timeValue) {
long time = (timeValue - cutOffTimeStamp) / granularityFactor;
int keyValue = -1;
if (time >= (long) Integer.MIN_VALUE && time <= (long) Integer.MAX_VALUE) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 5beb9c4..095d12d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -17,7 +17,7 @@
package org.apache.carbondata.spark.testsuite.createTable
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
import java.io.{File, FileFilter, IOException}
import java.util
import java.util.concurrent.TimeUnit
@@ -42,6 +42,7 @@ import scala.concurrent.duration.Duration
import org.apache.avro
import org.apache.commons.lang.CharEncoding
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import tech.allegro.schema.json2avro.converter.JsonAvroConverter
import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
@@ -2151,4 +2152,146 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
writer.close()
}
+ test("test logical type date") {
+ sql("drop table if exists sdkOutputTable")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+ val schema1 =
+ """{
+ | "namespace": "com.apache.schema",
+ | "type": "record",
+ | "name": "StudentActivity",
+ | "fields": [
+ | {
+ | "name": "id",
+ | "type": {"type" : "int", "logicalType": "date"}
+ | },
+ | {
+ | "name": "course_details",
+ | "type": {
+ | "name": "course_details",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "course_struct_course_time",
+ | "type": {"type" : "int", "logicalType": "date"}
+ | }
+ | ]
+ | }
+ | }
+ | ]
+ |}""".stripMargin
+
+ val json1 =
+ """{"id": 101, "course_details": { "course_struct_course_time":10}}""".stripMargin
+ val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+ val converter = new JsonAvroConverter
+ val record = converter
+ .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+ val writer = CarbonWriter.builder
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ writer.write(record)
+ writer.close()
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType date, course_details struct<course_struct_course_time: date>) STORED BY
+ |'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(java.sql.Date.valueOf("1970-04-12"), Row(java.sql.Date.valueOf("1970-01-11")))))
+ }
+
+ test("test logical type timestamp-millis") {
+ sql("drop table if exists sdkOutputTable")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+ val schema1 =
+ """{
+ | "namespace": "com.apache.schema",
+ | "type": "record",
+ | "name": "StudentActivity",
+ | "fields": [
+ | {
+ | "name": "id",
+ | "type": {"type" : "long", "logicalType": "timestamp-millis"}
+ | },
+ | {
+ | "name": "course_details",
+ | "type": {
+ | "name": "course_details",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "course_struct_course_time",
+ | "type": {"type" : "long", "logicalType": "timestamp-millis"}
+ | }
+ | ]
+ | }
+ | }
+ | ]
+ |}""".stripMargin
+
+ val json1 =
+ """{"id": 172800000,"course_details": { "course_struct_course_time":172800000}}""".stripMargin
+
+ val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+ val converter = new JsonAvroConverter
+ val record = converter
+ .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+ val writer = CarbonWriter.builder
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ writer.write(record)
+ writer.close()
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType timestamp, course_details struct<course_struct_course_time: timestamp>) STORED BY
+ |'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Timestamp.valueOf("1970-01-02 16:00:00"), Row(Timestamp.valueOf("1970-01-02 16:00:00")))))
+ }
+
+ test("test logical type-micros timestamp") {
+ sql("drop table if exists sdkOutputTable")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+ val schema1 =
+ """{
+ | "namespace": "com.apache.schema",
+ | "type": "record",
+ | "name": "StudentActivity",
+ | "fields": [
+ | {
+ | "name": "id",
+ | "type": {"type" : "long", "logicalType": "timestamp-micros"}
+ | },
+ | {
+ | "name": "course_details",
+ | "type": {
+ | "name": "course_details",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "course_struct_course_time",
+ | "type": {"type" : "long", "logicalType": "timestamp-micros"}
+ | }
+ | ]
+ | }
+ | }
+ | ]
+ |}""".stripMargin
+
+ val json1 =
+ """{"id": 172800000000,"course_details": { "course_struct_course_time":172800000000}}""".stripMargin
+
+ val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+ val converter = new JsonAvroConverter
+ val record = converter
+ .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+ val writer = CarbonWriter.builder
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ writer.write(record)
+ writer.close()
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable(dateType timestamp, course_details struct<course_struct_course_time: timestamp>) STORED BY
+ |'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Timestamp.valueOf("1970-01-02 16:00:00"), Row(Timestamp.valueOf("1970-01-02 16:00:00")))))
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 7450b82..3a477ce 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -288,7 +288,11 @@ public class PrimitiveDataType implements GenericDataType<Object> {
logHolder.setReason(message);
}
} else {
- surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+ if (dictionaryGenerator instanceof DirectDictionary && input instanceof Long) {
+ surrogateKey = ((DirectDictionary) dictionaryGenerator).generateKey((long) input);
+ } else {
+ surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+ }
if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
message = CarbonDataProcessorUtil
@@ -316,15 +320,36 @@ public class PrimitiveDataType implements GenericDataType<Object> {
if (!this.carbonDimension.getUseActualData()) {
byte[] value = null;
if (isDirectDictionary) {
- int surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+ int surrogateKey;
+ // If the input is a long value then this means that logical type was provided by
+ // the user using AvroCarbonWriter. In this case directly generate surrogate key
+ // using dictionaryGenerator.
+ if (dictionaryGenerator instanceof DirectDictionary && input instanceof Long) {
+ surrogateKey = ((DirectDictionary) dictionaryGenerator).generateKey((long) input);
+ } else {
+ surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+ }
if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
value = new byte[0];
} else {
value = ByteUtil.toBytes(surrogateKey);
}
} else {
- value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
- this.carbonDimension.getDataType(), dateFormat);
+ // If the input is a long value then this means that logical type was provided by
+ // the user using AvroCarbonWriter. In this case directly generate Bytes from value.
+ if (this.carbonDimension.getDataType().equals(DataTypes.DATE)
+ || this.carbonDimension.getDataType().equals(DataTypes.TIMESTAMP)
+ && input instanceof Long) {
+ if (dictionaryGenerator != null) {
+ value = ByteUtil.toBytes(((DirectDictionary) dictionaryGenerator)
+ .generateKey((long) input));
+ } else {
+ value = ByteUtil.toBytes(Long.parseLong(parsedValue));
+ }
+ } else {
+ value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
+ this.carbonDimension.getDataType(), dateFormat);
+ }
if (this.carbonDimension.getDataType() == DataTypes.STRING
&& value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
@@ -333,8 +358,15 @@ public class PrimitiveDataType implements GenericDataType<Object> {
}
updateValueToByteStream(dataOutputStream, value);
} else {
- Object value = DataTypeUtil.getDataDataTypeForNoDictionaryColumn(parsedValue,
- this.carbonDimension.getDataType(), dateFormat);
+ Object value;
+ if (dictionaryGenerator instanceof DirectDictionary
+ && input instanceof Long) {
+ value = ByteUtil.toBytes(
+ ((DirectDictionary) dictionaryGenerator).generateKey((long) input));
+ } else {
+ value = DataTypeUtil.getDataDataTypeForNoDictionaryColumn(parsedValue,
+ this.carbonDimension.getDataType(), dateFormat);
+ }
if (this.carbonDimension.getDataType() == DataTypes.STRING
&& value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java
index 165e5a4..33dc8e3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java
@@ -46,6 +46,10 @@ public class DirectDictionary implements BiDictionary<Integer, Object> {
return dictionaryGenerator.generateDirectSurrogateKey(value.toString());
}
+ public Integer generateKey(long value) {
+ return dictionaryGenerator.generateKey(value);
+ }
+
@Override
public Object getValue(Integer key) {
return dictionaryGenerator.getValueFromSurrogate(key);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index c99a413..5f7a94c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -215,6 +217,10 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
+ private DirectDictionaryGenerator dateDictionaryGenerator;
+
+ private DirectDictionaryGenerator timestampDictionaryGenerator;
+
public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
DataType[] dataTypes, CarbonDataLoadConfiguration configuration,
@@ -313,7 +319,23 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
throw new CarbonDataLoadingException("Loading Exception", e);
}
} else {
- newData[i] = data[orderOfData[i]];
+ DataType dataType = dataFields[i].getColumn().getDataType();
+ if (dataType == DataTypes.DATE && data[orderOfData[i]] instanceof Long) {
+ if (dateDictionaryGenerator == null) {
+ dateDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(dataType, dataFields[i].getDateFormat());
+ }
+ newData[i] = dateDictionaryGenerator.generateKey((long) data[orderOfData[i]]);
+ } else if (dataType == DataTypes.TIMESTAMP && data[orderOfData[i]] instanceof Long) {
+ if (timestampDictionaryGenerator == null) {
+ timestampDictionaryGenerator =
+ DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(dataType, dataFields[i].getTimestampFormat());
+ }
+ newData[i] = timestampDictionaryGenerator.generateKey((long) data[orderOfData[i]]);
+ } else {
+ newData[i] = data[orderOfData[i]];
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f234869/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 8bbf364..edecd6b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -24,15 +24,21 @@ import java.util.Random;
import java.util.UUID;
import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
import org.apache.carbondata.processing.loading.complexobjects.StructObject;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
@@ -55,6 +61,8 @@ public class AvroCarbonWriter extends CarbonWriter {
private TaskAttemptContext context;
private ObjectArrayWritable writable;
private Schema avroSchema;
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonTable.class.getName());
AvroCarbonWriter(CarbonLoadModel loadModel) throws IOException {
Configuration hadoopConf = new Configuration();
@@ -88,10 +96,35 @@ public class AvroCarbonWriter extends CarbonWriter {
private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
Object out;
Schema.Type type = avroField.schema().getType();
+ LogicalType logicalType = avroField.schema().getLogicalType();
switch (type) {
- case BOOLEAN:
case INT:
+ if (logicalType != null) {
+ if (logicalType instanceof LogicalTypes.Date) {
+ int dateIntValue = (int) fieldValue;
+ out = dateIntValue * DateDirectDictionaryGenerator.MILLIS_PER_DAY;
+ } else {
+ LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName());
+ out = fieldValue;
+ }
+ } else {
+ out = fieldValue;
+ }
+ break;
+ case BOOLEAN:
case LONG:
+ if (logicalType != null && !(logicalType instanceof LogicalTypes.TimestampMillis)) {
+ if (logicalType instanceof LogicalTypes.TimestampMicros) {
+ long dateIntValue = (long) fieldValue;
+ out = dateIntValue / 1000L;
+ } else {
+ LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName());
+ out = fieldValue;
+ }
+ } else {
+ out = fieldValue;
+ }
+ break;
case DOUBLE:
case STRING:
out = fieldValue;
@@ -177,13 +210,27 @@ public class AvroCarbonWriter extends CarbonWriter {
String FieldName = avroField.name();
Schema childSchema = avroField.schema();
Schema.Type type = childSchema.getType();
+ LogicalType logicalType = childSchema.getLogicalType();
switch (type) {
case BOOLEAN:
return new Field(FieldName, DataTypes.BOOLEAN);
case INT:
- return new Field(FieldName, DataTypes.INT);
+ if (logicalType instanceof LogicalTypes.Date) {
+ return new Field(FieldName, DataTypes.DATE);
+ } else {
+ LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema
+ .getName());
+ return new Field(FieldName, DataTypes.INT);
+ }
case LONG:
- return new Field(FieldName, DataTypes.LONG);
+ if (logicalType instanceof LogicalTypes.TimestampMillis
+ || logicalType instanceof LogicalTypes.TimestampMicros) {
+ return new Field(FieldName, DataTypes.TIMESTAMP);
+ } else {
+ LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema
+ .getName());
+ return new Field(FieldName, DataTypes.LONG);
+ }
case DOUBLE:
return new Field(FieldName, DataTypes.DOUBLE);
case STRING:
@@ -221,13 +268,27 @@ public class AvroCarbonWriter extends CarbonWriter {
private static StructField prepareSubFields(String FieldName, Schema childSchema) {
Schema.Type type = childSchema.getType();
+ LogicalType logicalType = childSchema.getLogicalType();
switch (type) {
case BOOLEAN:
return new StructField(FieldName, DataTypes.BOOLEAN);
case INT:
- return new StructField(FieldName, DataTypes.INT);
+ if (logicalType instanceof LogicalTypes.Date) {
+ return new StructField(FieldName, DataTypes.DATE);
+ } else {
+ LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema
+ .getName());
+ return new StructField(FieldName, DataTypes.INT);
+ }
case LONG:
- return new StructField(FieldName, DataTypes.LONG);
+ if (logicalType instanceof LogicalTypes.TimestampMillis
+ || logicalType instanceof LogicalTypes.TimestampMicros) {
+ return new StructField(FieldName, DataTypes.TIMESTAMP);
+ } else {
+ LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema
+ .getName());
+ return new StructField(FieldName, DataTypes.LONG);
+ }
case DOUBLE:
return new StructField(FieldName, DataTypes.DOUBLE);
case STRING: