You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:47 UTC

[22/50] [abbrv] carbondata git commit: [CARBONDATA-2481] Adding SDV for SDKwriter

[CARBONDATA-2481] Adding SDV for SDKwriter

Adding SDV testcases for SDKwriter

This closes #2308


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6cc86db8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6cc86db8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6cc86db8

Branch: refs/heads/spark-2.3
Commit: 6cc86db8f9a245827b9bcf72e15884722154a616
Parents: cf666c1
Author: Indhumathi27 <in...@gmail.com>
Authored: Fri May 11 10:29:42 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Thu May 24 17:13:38 2018 +0530

----------------------------------------------------------------------
 integration/spark-common-cluster-test/pom.xml   |  12 +
 .../sdv/generated/SDKwriterTestCase.scala       | 732 +++++++++++++++++++
 .../cluster/sdv/suite/SDVSuites.scala           |   1 +
 3 files changed, 745 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6cc86db8/integration/spark-common-cluster-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml
index 44453b3..d8aecc2 100644
--- a/integration/spark-common-cluster-test/pom.xml
+++ b/integration/spark-common-cluster-test/pom.xml
@@ -68,6 +68,18 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-store-sdk</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>tech.allegro.schema.json2avro</groupId>
+      <artifactId>converter</artifactId>
+      <version>0.2.5</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6cc86db8/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
new file mode 100644
index 0000000..012091d
--- /dev/null
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cluster.sdv.generated
+
+
+import java.util
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.avro
+import org.apache.commons.lang.CharEncoding
+import org.junit.Assert
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{AvroCarbonWriter, CarbonWriter, Schema}
+
+/**
+ * Test Class for SDKwriterTestcase to verify all scenarios
+ */
+
+class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
+
+  var writerPath =
+    s"${ resourcesPath }" + "/SparkCarbonFileFormat/WriterOutput1/"
+
+  override def beforeEach: Unit = {
+    sql("DROP TABLE IF EXISTS sdkTable1")
+    sql("DROP TABLE IF EXISTS sdkTable2")
+    sql("DROP TABLE IF EXISTS table1")
+    cleanTestData()
+  }
+
+  override def afterEach(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkTable1")
+    sql("DROP TABLE IF EXISTS sdkTable2")
+    sql("DROP TABLE IF EXISTS table1")
+    cleanTestData()
+  }
+
+  def cleanTestData() = {
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+  }
+
+  def buildTestDataSingleFile(): Any = {
+    buildTestData(3, false, null)
+  }
+
+  def buildTestDataWithBadRecordForce(writerPath: String): Any = {
+    var options = Map("bAd_RECords_action" -> "FORCE").asJava
+    buildTestData(3, false, options)
+  }
+
+  def buildTestDataWithBadRecordFail(writerPath: String): Any = {
+    var options = Map("bAd_RECords_action" -> "FAIL").asJava
+    buildTestData(15001, false, options)
+  }
+
+  def buildTestData(rows: Int,
+      persistSchema: Boolean,
+      options: util.Map[String, String]): Any = {
+    buildTestData(rows, persistSchema, options, List("name"), writerPath)
+  }
+
+  // prepare sdk writer output
+  def buildTestData(rows: Int,
+      persistSchema: Boolean,
+      options: util.Map[String, String],
+      sortColumns: List[String],
+      writerPath: String): Any = {
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        if (persistSchema) {
+          builder.persistSchemaFile(true)
+          builder
+            .sortBy(sortColumns.toArray)
+            .outputPath(writerPath)
+            .isTransactionalTable(false)
+            .uniqueIdentifier(System.currentTimeMillis)
+            .buildWriterForCSVInput(Schema.parseJson(schema))
+        } else {
+          if (options != null) {
+            builder.outputPath(writerPath)
+              .isTransactionalTable(false)
+              .sortBy(sortColumns.toArray)
+              .uniqueIdentifier(
+                System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
+              .buildWriterForCSVInput(Schema.parseJson(schema))
+          } else {
+            builder.outputPath(writerPath)
+              .isTransactionalTable(false)
+              .sortBy(sortColumns.toArray)
+              .uniqueIdentifier(
+                System.currentTimeMillis).withBlockSize(2)
+              .buildWriterForCSVInput(Schema.parseJson(schema))
+          }
+        }
+      var i = 0
+      while (i < rows) {
+        if ((options != null) && (i < 3)) {
+          // writing a bad record
+          writer.write(Array[String]("abc" + i, String.valueOf(i.toDouble / 2), "abc"))
+        } else {
+          writer.write(Array[String]("abc" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        }
+        i += 1
+      }
+      if (options != null) {
+        //Keep one valid record. else carbon data file will not generate
+        writer.write(Array[String]("abc" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+
+      case _ => None
+    }
+  }
+
+  def buildTestDataWithBadRecordIgnore(writerPath: String): Any = {
+    var options = Map("bAd_RECords_action" -> "IGNORE").asJava
+    buildTestData(3, false, options)
+  }
+
+  def buildTestDataWithBadRecordRedirect(writerPath: String): Any = {
+    var options = Map("bAd_RECords_action" -> "REDIRECT").asJava
+    buildTestData(3, false, options)
+  }
+
+  def deleteFile(path: String, extension: String): Unit = {
+    val file: CarbonFile = FileFactory
+      .getCarbonFile(path, FileFactory.getFileType(path))
+
+    for (eachDir <- file.listFiles) {
+      if (!eachDir.isDirectory) {
+        if (eachDir.getName.endsWith(extension)) {
+          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+        }
+      } else {
+        deleteFile(eachDir.getPath, extension)
+      }
+    }
+  }
+
+  test("test create External Table with WriterPath") {
+    buildTestDataSingleFile()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
+      Row("abc1", 1, 0.5),
+      Row("abc2", 2, 1.0)))
+  }
+
+  test("test create External Table with Comment") {
+    buildTestDataSingleFile()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable comment 'this is comment' STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
+      Row("abc1", 1, 0.5),
+      Row("abc2", 2, 1.0)))
+  }
+
+  test("test create External Table and test files written from sdk writer") {
+    buildTestDataSingleFile()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable(name string,age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
+      Row("abc1", 1, 0.5),
+      Row("abc2", 2, 1.0)))
+
+    checkAnswer(sql("select name from sdkTable"), Seq(Row("abc0"),
+      Row("abc1"),
+      Row("abc2")))
+
+    checkAnswer(sql("select age from sdkTable"), Seq(Row(0), Row(1), Row(2)))
+    checkAnswer(sql("select * from sdkTable where age > 1 and age < 8"),
+      Seq(Row("abc2", 2, 1.0)))
+
+    checkAnswer(sql("select * from sdkTable where name = 'abc2'"),
+      Seq(Row("abc2", 2, 1.0)))
+
+    checkAnswer(sql("select * from sdkTable where name like '%b%' limit 2"),
+      Seq(Row("abc0", 0, 0.0),
+        Row("abc1", 1, 0.5)))
+
+    checkAnswer(sql("select sum(age) from sdkTable where name like 'abc%'"), Seq(Row(3)))
+    checkAnswer(sql("select count(*) from sdkTable where name like 'abc%' "), Seq(Row(3)))
+    checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(3)))
+
+  }
+
+  test("test create External Table and test insert into external table") {
+    buildTestDataSingleFile()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable(name string,age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""),
+      Seq(Row(1)))
+
+    sql("insert into sdktable select 'def0',1,5.5")
+    sql("insert into sdktable select 'def1',5,6.6")
+
+    checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""),
+      Seq(Row(2)))
+  }
+
+  test("test create External Table and test insert into normal table with different schema") {
+    buildTestDataSingleFile()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql("DROP TABLE IF EXISTS table1")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    sql(
+      "create table if not exists table1 (name string, age int) STORED BY 'carbondata'")
+    sql("insert into table1 select * from sdkTable")
+    checkAnswer(sql("select * from table1"), Seq(Row("abc0", 0),
+      Row("abc1", 1),
+      Row("abc2", 2)))
+  }
+
+  test("test Insert into External Table from another External Table with Same Schema") {
+    buildTestDataSingleFile()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable1")
+    sql("DROP TABLE IF EXISTS sdkTable2")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable1(name string,age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable2(name string,age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("insert into sdkTable1 select *from sdkTable2")
+    checkAnswer(sql("select count(*) from sdkTable1"), Seq(Row(6)))
+  }
+
+  test("test create External Table with Schema with partition, external table should " +
+       "ignore schema and partition") {
+    buildTestDataSingleFile()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable(name string) PARTITIONED BY (age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
+      Row("abc1", 1, 0.5),
+      Row("abc2", 2, 1.0)))
+  }
+
+  test("test External Table with insert overwrite") {
+    buildTestDataSingleFile()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql("DROP TABLE IF EXISTS table1")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable(name string) PARTITIONED BY (age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
+      Row("abc1", 1, 0.5),
+      Row("abc2", 2, 1.0)))
+
+    sql(
+      "create table if not exists table1 (name string, age int, height double) STORED BY 'org" +
+      ".apache.carbondata.format'")
+    sql(s"""insert into table1 values ("aaaaa", 12, 20)""")
+
+    checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""),
+      Seq(Row(1)))
+
+    sql("insert overwrite table sdkTable select * from table1")
+
+    checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""),
+      Seq(Row(0)))
+  }
+
+  test("test create External Table with Table properties should ignore tblproperties") {
+    buildTestDataSingleFile()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable(name string,age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' TBLPROPERTIES('sort_scope'='batch_sort') """.stripMargin)
+
+    checkExistence(sql("Describe formatted sdkTable "), false, "batch_sort")
+  }
+
+  test("Read sdk writer output file and test without carbondata and carbonindex files should fail")
+  {
+    buildTestDataSingleFile()
+    deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+
+    val exception = intercept[Exception] {
+      //data source file format
+      sql(
+        s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+           |'$writerPath' """.stripMargin)
+    }
+    assert(exception.getMessage()
+      .contains("Operation not allowed: Invalid table path provided:"))
+  }
+
+  test("test create External Table and test CTAS") {
+    buildTestDataSingleFile()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
+      Row("abc1", 1, 0.5),
+      Row("abc2", 2, 1.0)))
+
+    sql("create table table1 stored by 'carbondata' as select *from sdkTable")
+
+    checkAnswer(sql("select * from table1"), Seq(Row("abc0", 0, 0.0),
+      Row("abc1", 1, 0.5),
+      Row("abc2", 2, 1.0)))
+  }
+
+  test("test create External Table and test JOIN on External Tables") {
+    buildTestDataSingleFile()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql("DROP TABLE IF EXISTS sdkTable1")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable1 STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkTable JOIN sdkTable1 on (sdkTable.age=sdkTable1.age)"),
+      Seq(Row("abc0", 0, 0.0, "abc0", 0, 0.0),
+        Row("abc1", 1, 0.5, "abc1", 1, 0.5),
+        Row("abc2", 2, 1.0, "abc2", 2, 1.0)))
+    checkAnswer(sql(
+      "select * from sdkTable LEFT OUTER JOIN sdkTable1 on (sdkTable.age=sdkTable1.age)"),
+      Seq(Row("abc0", 0, 0.0, "abc0", 0, 0.0),
+        Row("abc1", 1, 0.5, "abc1", 1, 0.5),
+        Row("abc2", 2, 1.0, "abc2", 2, 1.0)))
+    checkAnswer(sql(
+      "select * from sdkTable RIGHT OUTER JOIN sdkTable1 on (sdkTable.age=sdkTable1.age)"),
+      Seq(Row("abc0", 0, 0.0, "abc0", 0, 0.0),
+        Row("abc1", 1, 0.5, "abc1", 1, 0.5),
+        Row("abc2", 2, 1.0, "abc2", 2, 1.0)))
+  }
+
+  test("test create external table and test bad record") {
+    //1. Action = FORCE
+    buildTestDataWithBadRecordForce(writerPath)
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkTable"), Seq(
+      Row("abc0", null, null),
+      Row("abc1", null, null),
+      Row("abc2", null, null),
+      Row("abc3", 3, 1.5)))
+
+    sql("DROP TABLE sdkTable")
+    cleanTestData()
+
+    //2. Action = REDIRECT
+    buildTestDataWithBadRecordRedirect(writerPath)
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkTable"), Seq(
+      Row("abc3", 3, 1.5)))
+
+    sql("DROP TABLE sdkTable")
+    cleanTestData()
+
+    //3. Action = IGNORE
+    buildTestDataWithBadRecordIgnore(writerPath)
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkTable"), Seq(
+      Row("abc3", 3, 1.5)))
+
+  }
+
+  def buildAvroTestDataStructType(): Any = {
+    buildAvroTestDataStruct(3, null)
+  }
+
+  def buildAvroTestDataStruct(rows: Int,
+      options: util.Map[String, String]): Any = {
+
+    val mySchema =
+      """
+        |{"name": "address",
+        | "type": "record",
+        | "fields": [
+        |  { "name": "name", "type": "string"},
+        |  { "name": "age", "type": "int"},
+        |  { "name": "address",  "type": {
+        |    "type" : "record",  "name" : "my_address",
+        |        "fields" : [
+        |    {"name": "street", "type": "string"},
+        |    {"name": "city", "type": "string"}]}}
+        |]}
+      """.stripMargin
+
+    val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """
+    WriteFilesWithAvroWriter(rows, mySchema, json)
+  }
+
+  def buildAvroTestDataBothStructArrayType(): Any = {
+    buildAvroTestDataStructWithArrayType(3, null)
+  }
+
+  def buildAvroTestDataStructWithArrayType(rows: Int,
+      options: util.Map[String, String]): Any = {
+
+    val mySchema =
+      """
+                     {
+                     |     "name": "address",
+                     |     "type": "record",
+                     |     "fields": [
+                     |     { "name": "name", "type": "string"},
+                     |     { "name": "age", "type": "int"},
+                     |     {
+                     |     "name": "address",
+                     |     "type": {
+                     |     "type" : "record",
+                     |     "name" : "my_address",
+                     |     "fields" : [
+                     |     {"name": "street", "type": "string"},
+                     |     {"name": "city", "type": "string"}
+                     |     ]}
+                     |     },
+                     |     {"name" :"doorNum",
+                     |     "type" : {
+                     |     "type" :"array",
+                     |     "items":{
+                     |     "name" :"EachdoorNums",
+                     |     "type" : "int",
+                     |     "default":-1
+                     |     }}
+                     |     }]}
+                     """.stripMargin
+
+    val json =
+      """ {"name":"bob", "age":10,
+        |"address" : {"street":"abc", "city":"bang"},
+        |"doorNum" : [1,2,3,4]}""".stripMargin
+    WriteFilesWithAvroWriter(rows, mySchema, json)
+  }
+
+  private def WriteFilesWithAvroWriter(rows: Int,
+      mySchema: String,
+      json: String): Unit = {
+    // conversion to GenericData.Record
+    val nn = new avro.Schema.Parser().parse(mySchema)
+    val converter = new JsonAvroConverter
+    val record = converter
+      .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn)
+
+    try {
+      val writer = CarbonWriter.builder
+        .outputPath(writerPath).isTransactionalTable(false)
+        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+      var i = 0
+      while (i < rows) {
+        writer.write(record)
+        i = i + 1
+      }
+      writer.close()
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        Assert.fail(e.getMessage)
+      }
+    }
+  }
+
+  def buildAvroTestDataArrayOfStructType(): Any = {
+    buildAvroTestDataArrayOfStruct(3, null)
+  }
+
+  def buildAvroTestDataArrayOfStruct(rows: Int,
+      options: util.Map[String, String]): Any = {
+
+    val mySchema =
+      """ {
+        |	"name": "address",
+        |	"type": "record",
+        |	"fields": [
+        |		{
+        |			"name": "name",
+        |			"type": "string"
+        |		},
+        |		{
+        |			"name": "age",
+        |			"type": "int"
+        |		},
+        |		{
+        |			"name": "doorNum",
+        |			"type": {
+        |				"type": "array",
+        |				"items": {
+        |					"type": "record",
+        |					"name": "my_address",
+        |					"fields": [
+        |						{
+        |							"name": "street",
+        |							"type": "string"
+        |						},
+        |						{
+        |							"name": "city",
+        |							"type": "string"
+        |						}
+        |					]
+        |				}
+        |			}
+        |		}
+        |	]
+        |} """.stripMargin
+    val json =
+      """ {"name":"bob","age":10,"doorNum" :
+        |[{"street":"abc","city":"city1"},
+        |{"street":"def","city":"city2"},
+        |{"street":"ghi","city":"city3"},
+        |{"street":"jkl","city":"city4"}]} """.stripMargin
+    WriteFilesWithAvroWriter(rows, mySchema, json)
+  }
+
+  def buildAvroTestDataStructOfArrayType(): Any = {
+    buildAvroTestDataStructOfArray(3, null)
+  }
+
+  def buildAvroTestDataStructOfArray(rows: Int,
+      options: util.Map[String, String]): Any = {
+
+    val mySchema =
+      """ {
+        |	"name": "address",
+        |	"type": "record",
+        |	"fields": [
+        |		{
+        |			"name": "name",
+        |			"type": "string"
+        |		},
+        |		{
+        |			"name": "age",
+        |			"type": "int"
+        |		},
+        |		{
+        |			"name": "address",
+        |			"type": {
+        |				"type": "record",
+        |				"name": "my_address",
+        |				"fields": [
+        |					{
+        |						"name": "street",
+        |						"type": "string"
+        |					},
+        |					{
+        |						"name": "city",
+        |						"type": "string"
+        |					},
+        |					{
+        |						"name": "doorNum",
+        |						"type": {
+        |							"type": "array",
+        |							"items": {
+        |								"name": "EachdoorNums",
+        |								"type": "int",
+        |								"default": -1
+        |							}
+        |						}
+        |					}
+        |				]
+        |			}
+        |		}
+        |	]
+        |} """.stripMargin
+
+    val json =
+      """ {
+        |	"name": "bob",
+        |	"age": 10,
+        |	"address": {
+        |		"street": "abc",
+        |		"city": "bang",
+        |		"doorNum": [
+        |			1,
+        |			2,
+        |			3,
+        |			4
+        |		]
+        |	}
+        |} """.stripMargin
+    WriteFilesWithAvroWriter(rows, mySchema, json)
+  }
+
+  test("Read sdk writer Avro output Record Type for nontransactional table") {
+    buildAvroTestDataStructType()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkTable"), Seq(
+      Row("bob", 10, Row("abc", "bang")),
+      Row("bob", 10, Row("abc", "bang")),
+      Row("bob", 10, Row("abc", "bang"))))
+
+  }
+
+  test("Read sdk writer Avro output with both Array and Struct Type for nontransactional table") {
+    buildAvroTestDataBothStructArrayType()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkTable"), Seq(
+      Row("bob", 10, Row("abc", "bang"), mutable.WrappedArray.newBuilder[Int].+=(1, 2, 3, 4)),
+      Row("bob", 10, Row("abc", "bang"), mutable.WrappedArray.newBuilder[Int].+=(1, 2, 3, 4)),
+      Row("bob", 10, Row("abc", "bang"), mutable.WrappedArray.newBuilder[Int].+=(1, 2, 3, 4))))
+  }
+
+  test("Read sdk writer Avro output with Array of struct for external table") {
+    buildAvroTestDataArrayOfStructType()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql(s"""select count(*) from sdkTable"""),
+      Seq(Row(3)))
+  }
+
+  test("Read sdk writer Avro output with struct of Array for nontransactional table") {
+    buildAvroTestDataStructOfArrayType()
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql(s"""select count(*) from sdkTable"""),
+      Seq(Row(3)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6cc86db8/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
index 2f7d98b..c5aceaa 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
@@ -150,6 +150,7 @@ class SDVSuites3 extends Suites with BeforeAndAfterAll {
                     new LuceneTestCase ::
                     new TimeSeriesPreAggregateTestCase :: 
                     new TestPartitionWithGlobalSort ::
+                    new SDKwriterTestCase ::
                     new SetParameterTestCase ::
                     new PartitionWithPreAggregateTestCase :: Nil