You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/09/10 06:40:05 UTC

carbondata git commit: [CARBONDATA-2876]Fix Avro decimal datatype with precision and scale

Repository: carbondata
Updated Branches:
  refs/heads/master 0483b46e9 -> 9ebab5748


[CARBONDATA-2876]Fix Avro decimal datatype with precision and scale

1.Add precision and scale for fieldvalue for Avro Decimal logical type.
2.If Avro schema is of union type with multiple record or multiple enum, then add check for schema.

This closes #2687


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9ebab574
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9ebab574
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9ebab574

Branch: refs/heads/master
Commit: 9ebab5748969398cf12969eedd4701c30bc028cd
Parents: 0483b46
Author: Indhumathi27 <in...@gmail.com>
Authored: Mon Sep 3 17:35:01 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Mon Sep 10 12:14:24 2018 +0530

----------------------------------------------------------------------
 ...ansactionalCarbonTableWithAvroDataType.scala | 470 ++++++++++++++++++-
 .../carbondata/sdk/file/AvroCarbonWriter.java   |  77 ++-
 2 files changed, 505 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ebab574/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
index 29aa2de..dc13b16 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
@@ -18,8 +18,14 @@
 package org.apache.carbondata.spark.testsuite.createTable
 
 import java.io.File
+import java.nio.ByteBuffer
+import javax.xml.bind.DatatypeConverter
+
 import scala.collection.mutable
 
+import org.apache.avro.Conversions.DecimalConversion
+import org.apache.avro.{LogicalTypes, Schema}
+import org.apache.avro.generic.GenericData
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -46,6 +52,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
 
   writerPath = writerPath.replace("\\", "/")
 
+  val decimalConversion = new DecimalConversion
+
   override def beforeAll(): Unit = {
     sql("DROP TABLE IF EXISTS sdkOutputTable")
     CarbonProperties.getInstance()
@@ -678,7 +686,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
         |	"name": "StudentActivity",
         |	"fields": [
         |      {
-        |		  "name": "enum_field", "type": [{
+        |		  "name": "union_field", "type": [{
         |          "namespace": "org.example.avro",
         |          "name": "dec",
         |          "type": "bytes",
@@ -689,15 +697,27 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
         |	}]
         |}""".stripMargin
 
-    val json1 =
-      """{"enum_field":{"bytes":"1010"}}""".stripMargin
-
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val decimalConversion = new DecimalConversion
+    val logicalType = LogicalTypes.decimal(10, 2)
+    val decimal = new java.math.BigDecimal("1010").setScale(2)
+    //get unscaled 2's complement bytearray
+    val bytes =
+      decimalConversion.toBytes(decimal, nn.getField("union_field").schema, logicalType)
+    val data = DatatypeConverter.printBase64Binary(bytes.array())
+    val json1 =
+      s"""{"union_field":{"bytes":"$data"}}""".stripMargin
     val record = testUtil.jsonToAvro(json1, schema1)
+    val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
+      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
+    val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
+    val avroRec = new GenericData. Record(nn)
+    avroRec.put("union_field", bytes1)
+
 
     val writer = CarbonWriter.builder
       .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
-    writer.write(record)
+    writer.write(avroRec)
     writer.close()
     sql(
       s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
@@ -711,16 +731,16 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
     val schema1 =
       """
-        |{"name": "address",
+        |{"name": "struct_field",
         | "type": "record",
         | "fields": [
-        |  { "name": "name", "type": "string"},
-        |  { "name": "age", "type": "float"},
-        |  { "name": "address",  "type": {
-        |    "type" : "record",  "name" : "my_address",
+        |  { "name": "record1", "type": "string"},
+        |  { "name": "record2", "type": "float"},
+        |  { "name": "struct_field_decimal",  "type": {
+        |    "type" : "record",  "name" : "my_record",
         |        "fields" : [
-        |    {"name": "street", "type": "string"},
-        |    {"name": "city", "type": {"type" : "bytes",
+        |    {"name": "record3", "type": "string"},
+        |    {"name": "record4", "type": {"type" : "bytes",
         |                     "logicalType": "decimal",
         |                     "precision": 4,
         |                     "scale": 2
@@ -728,14 +748,46 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
         |]}
       """.stripMargin
 
-    val json1 = """ {"name":"bob", "age":10.24, "address" : {"street":"abc", "city":"32"}} """
-
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+
+    val logicalType = LogicalTypes.decimal(4, 2)
+    val decimal1 = new java.math.BigDecimal("32").setScale(2)
+    //get unscaled 2's complement bytearray
+    val bytes =
+      decimalConversion.toBytes(decimal1, nn.getField("struct_field_decimal").schema, logicalType)
+    val data = DatatypeConverter.printBase64Binary(bytes.array())
+    val json1 = s""" {"record1":"bob", "record2":10.24, "struct_field_decimal" : {"record3":"abc", "record4":"$data"}} """
     val record = testUtil.jsonToAvro(json1, schema1)
 
+    val jsonData = new String(record.get(2).asInstanceOf[GenericData.Record].get(1)
+      .asInstanceOf[ByteBuffer].array(),
+      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
+    val bytesValue = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(jsonData))
+    val mySchema =
+      """
+        |{"name": "struct_field_decimal",
+        | "type": "record",
+        | "fields": [
+        |  { "name": "record3", "type": "string"},
+        |  { "name": "record4", "type": {"type" : "bytes",
+        |                     "logicalType": "decimal",
+        |                     "precision": 4,
+        |                     "scale": 2
+        |                    }}
+        |]}
+      """.stripMargin
+    val schema = new org.apache.avro.Schema.Parser().parse(mySchema)
+    val genericByteArray = new GenericData.Record(schema)
+    genericByteArray.put("record3", "abc")
+    genericByteArray.put("record4", bytesValue)
+    val avroRec = new GenericData.Record(nn)
+    avroRec.put("record1", "bob")
+    avroRec.put("record2", 10.24)
+    avroRec.put("struct_field_decimal", genericByteArray)
+
     val writer = CarbonWriter.builder
       .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
-    writer.write(record)
+    writer.write(avroRec)
     writer.close()
     sql(
       s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
@@ -761,11 +813,11 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
         |      "type": "int"
         |      },
         |      {
-        |      "name": "address",
+        |      "name": "dec_fields",
         |      "type": {
         |      "type": "array",
         |      "items": {
-        |      "name": "street",
+        |      "name": "dec_field",
         |      "type": "bytes",
         |      "logicalType": "decimal",
         |      "precision": 4,
@@ -774,14 +826,40 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
         |  }
       """.stripMargin
 
-    val json1: String = """ {"name": "bob","age": 10,"address": ["32", "42"]} """
-
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val logicalType = LogicalTypes.decimal(4, 1)
+    val decimal1 = new java.math.BigDecimal("32").setScale(1)
+    val decimal2 = new java.math.BigDecimal("42").setScale(1)
+    //get unscaled 2's complement bytearray
+    val bytes1 =
+      decimalConversion.toBytes(decimal1, nn.getField("dec_fields").schema, logicalType)
+    val bytes2 =
+      decimalConversion.toBytes(decimal2, nn.getField("dec_fields").schema, logicalType)
+    val data1 = DatatypeConverter.printBase64Binary(bytes1.array())
+    val data2 = DatatypeConverter.printBase64Binary(bytes2.array())
+    val json1: String = s""" {"name": "bob","age": 10,"dec_fields":["$data1","$data2"]} """
     val record = testUtil.jsonToAvro(json1, schema1)
 
+    val jsonData1 = new String(record.get(2).asInstanceOf[GenericData.Array[ByteBuffer]].get(0)
+      .array(),
+      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
+    val jsonData2 = new String(record.get(2).asInstanceOf[GenericData.Array[ByteBuffer]].get(1)
+      .array(),
+      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
+    val bytesValue1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(jsonData1))
+    val bytesValue2 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(jsonData2))
+    val genericByteArray = new GenericData.Array[ByteBuffer](2,
+      Schema.createArray(Schema.create(Schema.Type.BYTES)))
+    genericByteArray.add(bytesValue1)
+    genericByteArray.add(bytesValue2)
+    val avroRec = new GenericData.Record(nn)
+    avroRec.put("name", "bob")
+    avroRec.put("age", 10)
+    avroRec.put("dec_fields", genericByteArray)
+
     val writer = CarbonWriter.builder
       .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
-    writer.write(record)
+    writer.write(avroRec)
     writer.close()
     sql(
       s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
@@ -884,4 +962,356 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(1728000, Row(1728000))))
   }
 
+  test("test logical type decimal through Json") {
+    sql("drop table if exists sdkOutputTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "id",
+        |						"type": {"type" : "bytes",
+        |                     "logicalType": "decimal",
+        |                     "precision": 5,
+        |                     "scale": 2
+        |                    }
+        |}
+        |	]
+        |}""".stripMargin
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val logicalType = LogicalTypes.decimal(5, 2)
+    val decimal = new java.math.BigDecimal("12.8").setScale(2)
+    //get unscaled 2's complement bytearray
+    val bytes =
+      decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType)
+    val data = DatatypeConverter.printBase64Binary(bytes.array())
+    val json1 =
+      s"""{"id":"$data"}""".stripMargin
+    val record = testUtil.jsonToAvro(json1, schema1)
+    val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
+      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
+    val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
+    val avroRec = new GenericData. Record(nn)
+    avroRec.put("id", bytes1)
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(avroRec)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(decimal)))
+  }
+
+  test("test logical type decimal through Json with big decimal value") {
+    sql("drop table if exists sdkOutputTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "dec_field",
+        |						"type": {"type" : "bytes",
+        |                     "logicalType": "decimal",
+        |                     "precision": 30,
+        |                     "scale": 10
+        |                    }
+        |}
+        |	]
+        |}""".stripMargin
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val logicalType = LogicalTypes.decimal(30, 10)
+    val decimal = new java.math.BigDecimal("12672346879023.845789").setScale(10)
+    //get unscaled 2's complement bytearray
+    val bytes =
+      decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType)
+    val data = DatatypeConverter.printBase64Binary(bytes.array())
+    val json1 =
+      s"""{"dec_field":"$data"}""".stripMargin
+    val record = testUtil.jsonToAvro(json1, schema1)
+    val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
+      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
+    val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
+    val avroRec = new GenericData. Record(nn)
+    avroRec.put("dec_field", bytes1)
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(avroRec)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(decimal)))
+  }
+
+  test("test logical type decimal through Json with negative decimal value") {
+    sql("drop table if exists sdkOutputTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "dec_field",
+        |						"type": {"type" : "bytes",
+        |                     "logicalType": "decimal",
+        |                     "precision": 30,
+        |                     "scale": 6
+        |                    }
+        |}
+        |	]
+        |}""".stripMargin
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val logicalType = LogicalTypes.decimal(30, 6)
+    val decimal = new java.math.BigDecimal("-12672346879023.845").setScale(6)
+    //get unscaled 2's complement bytearray
+    val bytes =
+      decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType)
+    val data = DatatypeConverter.printBase64Binary(bytes.array())
+    val json1 =
+      s"""{"dec_field":"$data"}""".stripMargin
+    val record = testUtil.jsonToAvro(json1, schema1)
+    val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
+      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
+    val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
+    val avroRec = new GenericData. Record(nn)
+    avroRec.put("dec_field", bytes1)
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(avroRec)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(decimal)))
+  }
+
+  test("test logical type decimal through Avro") {
+    sql("drop table if exists sdkOutputTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "dec_field",
+        |						"type": {"type" : "bytes",
+        |                     "logicalType": "decimal",
+        |                     "precision": 5,
+        |                     "scale": 2
+        |                    }
+        |}
+        |	]
+        |}""".stripMargin
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val logicalType = LogicalTypes.decimal(5, 2)
+    val decimal = new java.math.BigDecimal("12.8").setScale(2)
+    //get unscaled 2's complement bytearray
+    val bytes =
+      decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType)
+    val data = DatatypeConverter.printBase64Binary(bytes.array())
+    val json1 =
+      s"""{"dec_field":"$data"}""".stripMargin
+    val avroRec = new GenericData. Record(nn)
+    avroRec.put("dec_field", bytes)
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(avroRec)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(decimal)))
+  }
+
+  test("test logical type decimal with data having greater precision than specified precision") {
+    sql("drop table if exists sdkOutputTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "dec_field",
+        |						"type": {"type" : "bytes",
+        |                     "logicalType": "decimal",
+        |                     "precision": 5,
+        |                     "scale": 2
+        |                    }
+        |}
+        |	]
+        |}""".stripMargin
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val logicalType = LogicalTypes.decimal(5, 2)
+    val decimal = new java.math.BigDecimal("1218").setScale(2)
+    //get unscaled 2's complement bytearray
+    val bytes =
+      decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType)
+    val data = DatatypeConverter.printBase64Binary(bytes.array())
+    val json1 =
+      s"""{"dec_field":"$data"}""".stripMargin
+    val avroRec = new GenericData. Record(nn)
+    avroRec.put("dec_field", bytes)
+    val exception1 = intercept[Exception] {
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(avroRec)
+    writer.close()
+    }
+    assert(exception1.getMessage
+      .contains("Data Loading failed as value Precision 6 is greater than specified Precision 5 in Avro Schema"))
+  }
+
+  test("test union with multiple record type") {
+    sql("drop table if exists sdkOutputTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |	"namespace": "test.avro",
+        |	"type": "record",
+        |	"name": "NewCar2",
+        |	"fields": [
+        |      {
+        |		  "name": "optionalExtra",
+        |    "type": ["null",{
+        |       "type":"record",
+        |       "name":"Stereo",
+        |       "fields" :[{
+        |       "name":"make",
+        |       "type":"string"
+        |       },
+        |       {
+        |       "name":"speakers",
+        |       "type":"int"
+        |       }]
+        |       },{
+        |       "type":"record",
+        |       "name":"LeatherTrim",
+        |       "fields":[{
+        |       "name":"colour",
+        |       "type":"string"
+        |       }]
+        |       }],
+        |       "default":null
+        |       }]
+        |
+        |}""".stripMargin
+
+    val json1 =
+      """{"optionalExtra":{"test.avro.LeatherTrim":{"colour":"ab"}}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(Row(null,null),Row("ab")))))
+  }
+
+  test("test union with multiple Enum type") {
+    sql("drop table if exists sdkOutputTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |	"namespace": "test.avro",
+        |	"type": "record",
+        |	"name": "Union_data3",
+        |	"fields": [
+        |      {
+        |		  "name": "enum_record",
+        |    "type":
+        |    ["long","null","string",
+        |    {"type":"enum","name":"t1","symbols":["red","blue","yellow"]},
+        |    {"type":"enum","name":"t2","symbols":["sun","mon","tue","wed","thu","fri","sat"]},
+        |    "int"
+        |    ]}]
+        |}""".stripMargin
+
+    val json1 =
+      """{"enum_record":{"test.avro.t2":"sun"}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null,null,null,"sun",null))))
+  }
+
+  test("test spark file format") {
+    sql("drop table if exists sdkOutputTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |      {
+        |		  "name": "union_field", "type": [{
+        |          "namespace": "org.example.avro",
+        |          "name": "dec",
+        |          "type": "bytes",
+        |         "logicalType": "decimal",
+        |                     "precision": 10,
+        |                     "scale": 2
+        |        },"int"]
+        |	}]
+        |}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val decimalConversion = new DecimalConversion
+    val logicalType = LogicalTypes.decimal(10, 2)
+    val decimal = new java.math.BigDecimal("1010").setScale(2)
+    //get unscaled 2's complement bytearray
+    val bytes =
+      decimalConversion.toBytes(decimal, nn.getField("union_field").schema, logicalType)
+    val data = DatatypeConverter.printBase64Binary(bytes.array())
+    val json1 =
+      s"""{"union_field":{"bytes":"$data"}}""".stripMargin
+    val record = testUtil.jsonToAvro(json1, schema1)
+    val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
+      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
+    val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
+    val avroRec = new GenericData. Record(nn)
+    avroRec.put("union_field", bytes1)
+
+
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(avroRec)
+    writer.close()
+    sql(s"create table sdkOutputTable(union_field struct<union_field0:decimal(10,2),union_field1:int>) " +
+        s"using carbon options(path='$writerPath')")
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(decimal,null))))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ebab574/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 14dbe16..dd70cc9 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.sdk.file;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -31,7 +32,6 @@ import java.util.UUID;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -43,6 +43,7 @@ import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
 import org.apache.carbondata.processing.loading.complexobjects.StructObject;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
 import org.apache.avro.LogicalType;
@@ -103,7 +104,7 @@ public class AvroCarbonWriter extends CarbonWriter {
   }
 
   private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
-    Object out;
+    Object out = null;
     Schema.Type type = avroField.schema().getType();
     LogicalType logicalType = avroField.schema().getLogicalType();
     switch (type) {
@@ -118,7 +119,7 @@ public class AvroCarbonWriter extends CarbonWriter {
           while (iterator.hasNext()) {
             // size is 2 because map will have key and value
             Object[] mapChildObjects = new Object[2];
-            Map.Entry mapEntry = (HashMap.Entry) iterator.next();
+            Map.Entry mapEntry = (Map.Entry) iterator.next();
             // evaluate key
             Object keyObject = avroFieldToObject(
                 new Schema.Field(avroField.name(), Schema.create(Schema.Type.STRING),
@@ -198,7 +199,10 @@ public class AvroCarbonWriter extends CarbonWriter {
           if (unionField.getType().equals(Schema.Type.NULL)) {
             continue;
           }
-          if (checkFieldValueType(unionField.getType(), fieldValue)) {
+          // Union may not contain more than one schema with the same type,
+          // except for the named types record,fixed and enum
+          // hence check for schema also in case of union of multiple record or enum or fixed type
+          if (validateUnionFieldValue(unionField.getType(), fieldValue, unionField)) {
             values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, avroField);
             break;
           }
@@ -206,6 +210,15 @@ public class AvroCarbonWriter extends CarbonWriter {
         }
         out = new StructObject(values);
         break;
+      case BYTES:
+        // DECIMAL type is defined in Avro as a BYTE type with the logicalType property
+        // set to "decimal" and a specified precision and scale
+        if (logicalType instanceof LogicalTypes.Decimal) {
+          out = extractDecimalValue(fieldValue,
+              ((LogicalTypes.Decimal) avroField.schema().getLogicalType()).getScale(),
+              ((LogicalTypes.Decimal) avroField.schema().getLogicalType()).getPrecision());
+        }
+        break;
       default:
         out = avroPrimitiveFieldToObject(type, logicalType, fieldValue);
     }
@@ -218,9 +231,10 @@ public class AvroCarbonWriter extends CarbonWriter {
    *
    * @param type
    * @param fieldValue
+   * @param unionField
    * @return
    */
-  private boolean checkFieldValueType(Schema.Type type, Object fieldValue) {
+  private boolean validateUnionFieldValue(Schema.Type type, Object fieldValue, Schema unionField) {
     switch (type) {
       case INT:
         return (fieldValue instanceof Integer);
@@ -235,7 +249,8 @@ public class AvroCarbonWriter extends CarbonWriter {
       case FLOAT:
         return (fieldValue instanceof Float);
       case RECORD:
-        return (fieldValue instanceof GenericData.Record);
+        return (fieldValue instanceof GenericData.Record && unionField
+            .equals(((GenericData.Record) fieldValue).getSchema()));
       case ARRAY:
         return (fieldValue instanceof GenericData.Array || fieldValue instanceof ArrayList);
       case BYTES:
@@ -243,7 +258,8 @@ public class AvroCarbonWriter extends CarbonWriter {
       case MAP:
         return (fieldValue instanceof HashMap);
       case ENUM:
-        return (fieldValue instanceof GenericData.EnumSymbol);
+        return (fieldValue instanceof GenericData.EnumSymbol && unionField
+            .equals(((GenericData.EnumSymbol) fieldValue).getSchema()));
       default:
         return false;
     }
@@ -251,7 +267,7 @@ public class AvroCarbonWriter extends CarbonWriter {
 
   private Object avroPrimitiveFieldToObject(Schema.Type type, LogicalType logicalType,
       Object fieldValue) {
-    Object out = null;
+    Object out;
     switch (type) {
       case INT:
         if (logicalType != null) {
@@ -290,15 +306,6 @@ public class AvroCarbonWriter extends CarbonWriter {
         // also carbon internally needs float as double
         out = Double.parseDouble(fieldValue.toString());
         break;
-      case BYTES:
-        // DECIMAL type is defined in Avro as a BYTE type with the logicalType property
-        // set to "decimal" and a specified precision and scale
-        // As binary type is not supported yet,value will be null
-        if (logicalType instanceof LogicalTypes.Decimal) {
-          out = new BigDecimal(new String(((ByteBuffer) fieldValue).array(),
-              CarbonCommonConstants.DEFAULT_CHARSET_CLASS));
-        }
-        break;
       case NULL:
         out = null;
         break;
@@ -319,7 +326,7 @@ public class AvroCarbonWriter extends CarbonWriter {
    */
   private Object avroFieldToObjectForUnionType(Schema avroField, Object fieldValue,
       Schema.Field avroFields) {
-    Object out;
+    Object out = null;
     Schema.Type type = avroField.getType();
     LogicalType logicalType = avroField.getLogicalType();
     switch (type) {
@@ -383,7 +390,7 @@ public class AvroCarbonWriter extends CarbonWriter {
             while (iterator.hasNext()) {
               // size is 2 because map will have key and value
               Object[] mapChildObjects = new Object[2];
-              Map.Entry mapEntry = (HashMap.Entry) iterator.next();
+              Map.Entry mapEntry = (Map.Entry) iterator.next();
               // evaluate key
               Object keyObject = avroFieldToObject(
                   new Schema.Field(avroFields.name(), Schema.create(Schema.Type.STRING),
@@ -407,12 +414,32 @@ public class AvroCarbonWriter extends CarbonWriter {
           out = null;
         }
         break;
+      case BYTES:
+        // DECIMAL type is defined in Avro as a BYTE type with the logicalType property
+        // set to "decimal" and a specified precision and scale
+        if (logicalType instanceof LogicalTypes.Decimal) {
+          out = extractDecimalValue(fieldValue,
+              ((LogicalTypes.Decimal) avroField.getLogicalType()).getScale(),
+              ((LogicalTypes.Decimal) avroField.getLogicalType()).getPrecision());
+        }
+        break;
       default:
         out = avroPrimitiveFieldToObject(type, logicalType, fieldValue);
     }
     return out;
   }
 
+  private Object extractDecimalValue(Object fieldValue, int scale, int precision) {
+    BigDecimal dataValue = new BigDecimal(new BigInteger(((ByteBuffer) fieldValue).array()), scale);
+    if (!(dataValue.precision() > precision)) {
+      return dataValue;
+    } else {
+      throw new CarbonDataLoadingException(
+          "Data Loading failed as value Precision " + dataValue.precision()
+              + " is greater than specified Precision " + precision + " in Avro Schema");
+    }
+  }
+
   /**
    * converts avro schema to carbon schema required by carbonWriter
    *
@@ -525,8 +552,10 @@ public class AvroCarbonWriter extends CarbonWriter {
           int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision();
           int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale();
           return new Field(fieldName, DataTypes.createDecimalType(precision, scale));
+        } else {
+          throw new UnsupportedOperationException(
+              "carbon not support " + type.toString() + " avro type yet");
         }
-        return null;
       case NULL:
         return null;
       default:
@@ -621,8 +650,10 @@ public class AvroCarbonWriter extends CarbonWriter {
           int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision();
           int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale();
           return new StructField(fieldName, DataTypes.createDecimalType(precision, scale));
+        } else {
+          throw new UnsupportedOperationException(
+              "carbon not support " + type.toString() + " avro type yet");
         }
-        return null;
       case NULL:
         return null;
       default:
@@ -714,8 +745,10 @@ public class AvroCarbonWriter extends CarbonWriter {
           int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision();
           int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale();
           return DataTypes.createDecimalType(precision, scale);
+        } else {
+          throw new UnsupportedOperationException(
+              "carbon not support " + childSchema.getType().toString() + " avro type yet");
         }
-        return null;
       case NULL:
         return null;
       default: