You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/12/20 06:35:29 UTC

carbondata git commit: [SDV] Add datasource testcases for Spark File Format

Repository: carbondata
Updated Branches:
  refs/heads/master e01916b6b -> d7ff3e688


[SDV] Add datasource testcases for Spark File Format

Added datasource test cases for Spark File Format.

This closes #2951


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d7ff3e68
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d7ff3e68
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d7ff3e68

Branch: refs/heads/master
Commit: d7ff3e6880a8dbedbd838a89ad5f7c5f0233da9f
Parents: e01916b
Author: shivamasn <sh...@gmail.com>
Authored: Mon Nov 26 16:04:08 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Thu Dec 20 12:02:38 2018 +0530

----------------------------------------------------------------------
 integration/spark-common-cluster-test/pom.xml   |    6 +
 .../CarbonV1toV3CompatabilityTestCase.scala     |   17 +-
 ...ableUsingSparkCarbonFileFormatTestCase.scala |  336 +++++
 .../SparkCarbonDataSourceTestCase.scala         | 1380 ++++++++++++++++++
 .../cluster/sdv/suite/SDVSuites.scala           |    5 +-
 .../spark/sql/common/util/QueryTest.scala       |    5 +-
 6 files changed, 1742 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml
index 8648edf..a3ec125 100644
--- a/integration/spark-common-cluster-test/pom.xml
+++ b/integration/spark-common-cluster-test/pom.xml
@@ -37,6 +37,12 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark-datasource</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-spark-common</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
index f34b657..900c658 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
@@ -31,15 +31,22 @@ import org.apache.carbondata.core.util.CarbonProperties
 class CarbonV1toV3CompatabilityTestCase extends QueryTest with BeforeAndAfterAll {
 
   var localspark: CarbonSession = null
-  val storeLocation = s"${TestQueryExecutor.integrationPath}/spark-common-test/src/test/resources/Data/v1_version/store"
-  val metaLocation = s"${TestQueryExecutor.integrationPath}/spark-common-test/src/test/resources/Data/v1_version"
+  val storeLocation = s"${
+    TestQueryExecutor
+      .integrationPath
+  }/spark-common-test/src/test/resources/Data/v1_version/store"
+  val metaLocation = s"${
+    TestQueryExecutor
+      .integrationPath
+  }/spark-common-test/src/test/resources/Data/v1_version"
 
   override def beforeAll {
     sqlContext.sparkSession.stop()
     CarbonEnv.carbonEnvMap.clear()
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
     import org.apache.spark.sql.CarbonSession._
-    println(s"store path for CarbonV1toV3CompatabilityTestCase is $storeLocation and metastore is $metaLocation")
+    println(s"store path for CarbonV1toV3CompatabilityTestCase is $storeLocation and metastore is" +
+            s" $metaLocation")
     localspark = SparkSession
       .builder()
       .master("local")
@@ -49,8 +56,8 @@ class CarbonV1toV3CompatabilityTestCase extends QueryTest with BeforeAndAfterAll
     println("store path : " + CarbonProperties.getStorePath)
     localspark.sparkContext.setLogLevel("WARN")
     hiveClient.runSqlHive(
-        s"ALTER TABLE default.t3 SET SERDEPROPERTIES" +
-        s"('tablePath'='$storeLocation/default/t3', 'dbname'='default', 'tablename'='t3')")
+      s"ALTER TABLE default.t3 SET SERDEPROPERTIES" +
+      s"('tablePath'='$storeLocation/default/t3', 'dbname'='default', 'tablename'='t3')")
     localspark.sql("show tables").show()
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala
new file mode 100644
index 0000000..b96fe10
--- /dev/null
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cluster.sdv.generated.datasource
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.{Date, Random}
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.lang.RandomStringUtils
+import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.util.SparkUtil
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+
+class CreateTableUsingSparkCarbonFileFormatTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  val writerPath = s"${TestQueryExecutor.projectPath}/integration/spark-common-cluster-test/src/test/resources/SparkCarbonFileFormat/WriterOutput/"
+
+  def buildTestData(): Any = {
+
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema))
+          .writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase").build()
+      var i = 0
+      while (i < 100) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case _: Throwable => None
+    }
+  }
+
+  def cleanTestData() = {
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  def deleteIndexFile(path: String, extension: String): Unit = {
+    val file: CarbonFile = FileFactory
+      .getCarbonFile(path, FileFactory.getFileType(path))
+
+    for (eachDir <- file.listFiles) {
+      if (!eachDir.isDirectory) {
+        if (eachDir.getName.endsWith(extension)) {
+          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+        }
+      } else {
+        deleteIndexFile(eachDir.getPath, extension)
+      }
+    }
+  }
+
+  test(
+    "Running SQL directly and read carbondata files (sdk Writer Output) using the " +
+    "SparkCarbonFileFormat ") {
+    buildTestData()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //data source file format
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
+           |'$writerPath' """.stripMargin)
+    }
+
+    val directSQL = sql(s"""select * FROM  carbon.`$writerPath`""".stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), directSQL)
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  // TODO: Make the sparkCarbonFileFormat to work without index file
+  test("Read sdk writer output file without Carbondata file should fail") {
+    buildTestData()
+    deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[Exception] {
+      //    data source file format
+      if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+        //data source file format
+        sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+        //data source file format
+        sql(
+          s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
+             |'$writerPath' """.stripMargin)
+      }
+    }
+    assert(exception.getMessage()
+      .contains("CarbonData file is not present in the table location"))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file without index file should not fail") {
+    buildTestData()
+    deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
+           |'$writerPath' """.stripMargin)
+    }
+    //org.apache.spark.SparkException: Index file not present to read the carbondata file
+    assert(sql("select * from sdkOutputTable").collect().length == 100)
+    assert(sql("select * from sdkOutputTable where name='robot0'").collect().length == 1)
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+  test("Read data having multi blocklet") {
+    buildTestDataMuliBlockLet(750000, 50000)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
+    } else {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
+           |'$writerPath' """.stripMargin)
+    }
+    checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(800000)))
+    checkAnswer(sql(
+      "select count(*) from sdkOutputTable where from_email='Email for testing min max for " +
+      "allowed chars'"),
+      Seq(Row(50000)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    clearDataMapCache
+    cleanTestData()
+  }
+
+  def buildTestDataMuliBlockLet(recordsInBlocklet1: Int, recordsInBlocklet2: Int): Unit = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    val fields = new Array[Field](8)
+    fields(0) = new Field("myid", DataTypes.INT)
+    fields(1) = new Field("event_id", DataTypes.STRING)
+    fields(2) = new Field("eve_time", DataTypes.DATE)
+    fields(3) = new Field("ingestion_time", DataTypes.TIMESTAMP)
+    fields(4) = new Field("alldate", DataTypes.createArrayType(DataTypes.DATE))
+    fields(5) = new Field("subject", DataTypes.STRING)
+    fields(6) = new Field("from_email", DataTypes.STRING)
+    fields(7) = new Field("sal", DataTypes.DOUBLE)
+    import scala.collection.JavaConverters._
+    val emailDataBlocklet1 = "FromEmail"
+    val emailDataBlocklet2 = "Email for testing min max for allowed chars"
+    try {
+      val options = Map("bad_records_action" -> "FORCE", "complex_delimiter_level_1" -> "$").asJava
+      val writer = CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16)
+        .sortBy(Array("myid", "ingestion_time", "event_id")).withLoadOptions(options)
+        .withCsvInput(new Schema(fields)).writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase")
+        .build()
+      val timeF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+      val date_F = new SimpleDateFormat("yyyy-MM-dd")
+      for (i <- 1 to recordsInBlocklet1) {
+        val time = new Date(System.currentTimeMillis())
+        writer
+          .write(Array("" + i,
+            "event_" + i,
+            "" + date_F.format(time),
+            "" + timeF.format(time),
+            "" + date_F.format(time) + "$" + date_F.format(time),
+            "Subject_0",
+            emailDataBlocklet1,
+            "" + new Random().nextDouble()))
+      }
+      for (i <- 1 to recordsInBlocklet2) {
+        val time = new Date(System.currentTimeMillis())
+        writer
+          .write(Array("" + i,
+            "event_" + i,
+            "" + date_F.format(time),
+            "" + timeF.format(time),
+            "" + date_F.format(time) + "$" + date_F.format(time),
+            "Subject_0",
+            emailDataBlocklet2,
+            "" + new Random().nextDouble()))
+      }
+      writer.close()
+    }
+  }
+
+  private def clearDataMapCache(): Unit = {
+    if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+      val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+      DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(writerPath))
+      assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+    }
+  }
+
+  test("Test with long string columns") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    // here we specify the long string column as varchar
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"address\":\"varchar\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"note\":\"varchar\"}\n")
+      .append("]")
+      .toString()
+    val builder = CarbonWriter.builder()
+    val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema))
+      .writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase").build()
+    val totalRecordsNum = 3
+    for (i <- 0 until totalRecordsNum) {
+      // write a varchar with 75,000 length
+      writer
+        .write(Array[String](s"name_$i",
+          RandomStringUtils.randomAlphabetic(75000),
+          i.toString,
+          RandomStringUtils.randomAlphabetic(75000)))
+    }
+    writer.close()
+
+    //--------------- data source external table with schema ---------------------------
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string)
+           |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address, note") """
+          .stripMargin)
+    } else {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string) USING
+           | carbon
+           |OPTIONS("long_String_columns"="address, note") LOCATION
+           |'$writerPath' """.stripMargin)
+    }
+    checkAnswer(sql("select count(*) from sdkOutputTable where age = 0"), Seq(Row(1)))
+    checkAnswer(sql(
+      "SELECT COUNT(*) FROM (select address,age,note from sdkOutputTable where length(address)" +
+      "=75000 and length(note)=75000)"),
+      Seq(Row(totalRecordsNum)))
+    checkAnswer(sql("select name from  sdkOutputTable where age = 2"), Seq(Row("name_2")))
+    sql("DROP TABLE sdkOutputTable")
+
+    //--------------- data source external table without schema ---------------------------
+    sql("DROP TABLE IF EXISTS sdkOutputTableWithoutSchema")
+    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
+           |'$writerPath', "long_String_columns" "address, note") """.stripMargin)
+    } else {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
+           |("long_String_columns"="address, note") LOCATION '$writerPath' """.stripMargin)
+    }
+    checkAnswer(sql("select count(*) from sdkOutputTableWithoutSchema where age = 0"), Seq(Row(1)))
+    checkAnswer(sql(
+      "SELECT COUNT(*) FROM (select address,age,note from sdkOutputTableWithoutSchema where " +
+      "length(address)=75000 and length(note)=75000)"),
+      Seq(Row(totalRecordsNum)))
+    checkAnswer(sql("select name from sdkOutputTableWithoutSchema where age = 2"),
+      Seq(Row("name_2")))
+    sql("DROP TABLE sdkOutputTableWithoutSchema")
+    clearDataMapCache
+    cleanTestData()
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala
new file mode 100644
index 0000000..8f41ba7
--- /dev/null
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala
@@ -0,0 +1,1380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.cluster.sdv.generated.datasource
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File, InputStream}
+
+import scala.collection.mutable
+
+import org.apache.avro
+import org.apache.avro.file.DataFileWriter
+import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
+import org.apache.avro.io.{DecoderFactory, Encoder}
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.junit.Assert
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
+import org.apache.carbondata.hadoop.testutil.StoreCreator
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+
+class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
+
+  val warehouse1 = s"${TestQueryExecutor.projectPath}/integration/spark-datasource/target/warehouse"
+
+  test("test write using dataframe") {
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x))
+      .toDF("c1", "c2", "number")
+    sql("drop table if exists testformat")
+    // Saves dataframe to carbon file
+    df.write
+      .format("carbon").saveAsTable("testformat")
+    assert(sql("select * from testformat").count() == 10)
+    assert(sql("select * from testformat where c1='a0'").count() == 1)
+    checkAnswer(sql("select c1 from testformat where number = 7"), Seq(Row("a7")))
+    sql("drop table if exists testformat")
+  }
+
+  test("test write using ddl") {
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x))
+      .toDF("c1", "c2", "number")
+    sql("drop table if exists testparquet")
+    sql("drop table if exists testformat")
+    // Saves dataframe to carbon file
+    df.write
+      .format("parquet").saveAsTable("testparquet")
+    sql("create table carbon_table(c1 string, c2 string, number int) using carbon")
+    sql("insert into carbon_table select * from testparquet")
+    checkAnswer(sql("select * from carbon_table where c1='a1'"),
+      sql("select * from testparquet where c1='a1'"))
+    if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+      val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+      DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
+      assert(mapSize >= DataMapStoreManager.getInstance().getAllDataMaps.size())
+    }
+    sql("drop table if exists testparquet")
+    sql("drop table if exists testformat")
+  }
+
+  test("test read with df write") {
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x))
+      .toDF("c1", "c2", "number")
+
+    // Saves dataframe to carbon file
+    df.write.format("carbon").save(warehouse1 + "/test_folder/")
+
+    val frame = sqlContext.read.format("carbon").load(warehouse1 + "/test_folder")
+    assert(frame.count() == 10)
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+  }
+
+  test("test write using subfolder") {
+    if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+      import sqlContext.implicits._
+      val df = sqlContext.sparkContext.parallelize(1 to 10)
+        .map(x => ("a" + x % 10, "b", x))
+        .toDF("c1", "c2", "number")
+
+      // Saves dataframe to carbon file
+      df.write.format("carbon").save(warehouse1 + "/test_folder/" + System.nanoTime())
+      df.write.format("carbon").save(warehouse1 + "/test_folder/" + System.nanoTime())
+      df.write.format("carbon").save(warehouse1 + "/test_folder/" + System.nanoTime())
+
+      val frame = sqlContext.read.format("carbon").load(warehouse1 + "/test_folder")
+      assert(frame.where("c1='a1'").count() == 3)
+
+      val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+      DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/test_folder"))
+      assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    }
+  }
+
+  test("test write using partition ddl") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists testparquet")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x))
+      .toDF("c1", "c2", "number")
+
+    // Saves dataframe to carbon file
+    df.write
+      .format("parquet").partitionBy("c2").saveAsTable("testparquet")
+    sql(
+      "create table carbon_table(c1 string, c2 string, number int) using carbon  PARTITIONED by " +
+      "(c2)")
+    sql("insert into carbon_table select * from testparquet")
+    // TODO fix in 2.1
+    if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+      assert(sql("select * from carbon_table").count() == 10)
+      checkAnswer(sql("select * from carbon_table"),
+        sql("select * from testparquet"))
+    }
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists testparquet")
+  }
+
+  test("test write with struct type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, ("b", "c"), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql(
+      "create table carbon_table(c1 string, c2 struct<a1:string, a2:string>, number int) using " +
+      "carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with array type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Array("b", "c"), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql("create table carbon_table(c1 string, c2 array<string>, number int) using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with nested array and struct type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Array(("1", "2"), ("3", "4")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql(
+      "create table carbon_table(c1 string, c2 array<struct<a1:string, a2:string>>, number int) " +
+      "using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with nested struct and array type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, (Array("1", "2"), ("3", "4")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql(
+      "create table carbon_table(c1 string, c2 struct<a1:array<string>, a2:struct<a1:string, " +
+      "a2:string>>, number int) using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with array type with value as nested map type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Array(Map("b" -> "c")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql(
+      "create table carbon_table(c1 string, c2 array<map<string,string>>, number int) using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with array type with value as nested array<array<map>> type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Array(Array(Map("b" -> "c"))), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql(
+      "create table carbon_table(c1 string, c2 array<array<map<string,string>>>, number int) " +
+      "using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with struct type with value as nested map type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, ("a", Map("b" -> "c")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql(
+      "create table carbon_table(c1 string, c2 struct<a1:string, a2:map<string,string>>, number " +
+      "int) using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with struct type with value as nested struct<array<map>> type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, ("a", Array(Map("b" -> "c"))), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql(
+      "create table carbon_table(c1 string, c2 struct<a1:string, a2:array<map<string,string>>>, " +
+      "number int) using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with map type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Map("b" -> "c"), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql("create table carbon_table(c1 string, c2 map<string, string>, number int) using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with map type with Int data type as key") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Map(99 -> "c"), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql("create table carbon_table(c1 string, c2 map<int, string>, number int) using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with map type with value as nested map type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Map("a" -> Map("b" -> "c")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql(
+      "create table carbon_table(c1 string, c2 map<string, map<string, string>>, number int) " +
+      "using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with map type with value as nested struct type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Map("a" -> ("b", "c")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql(
+      "create table carbon_table(c1 string, c2 map<string, struct<a1:string, a2:string>>, number " +
+      "int) using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with map type with value as nested array type") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Map("a" -> Array("b", "c")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql(
+      "create table carbon_table(c1 string, c2 map<string, array<string>>, number int) using " +
+      "carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write using ddl and options") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists testparquet")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x))
+      .toDF("c1", "c2", "number")
+
+    // Saves dataframe to carbon file
+    df.write
+      .format("parquet").saveAsTable("testparquet")
+    sql(
+      "create table carbon_table(c1 string, c2 string, number int) using carbon options" +
+      "('table_blocksize'='256','inverted_index'='c1')")
+    checkExistence(sql("describe formatted carbon_table"), true, "table_blocksize")
+    checkExistence(sql("describe formatted carbon_table"), true, "inverted_index")
+    sql("insert into carbon_table select * from testparquet")
+    checkAnswer(sql("select * from carbon_table"), sql("select * from testparquet"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists testparquet")
+  }
+
+  test("test read with nested struct and array type without creating table") {
+    FileFactory
+      .deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_carbon_folder"))
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, (Array("1", "2"), ("3", "4")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    val frame = sql("select * from parquet_table")
+    frame.write.format("carbon").save(warehouse1 + "/test_carbon_folder")
+    val dfread = sqlContext.read.format("carbon").load(warehouse1 + "/test_carbon_folder")
+    FileFactory
+      .deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_carbon_folder"))
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test read and write with date datatype") {
+    sql("drop table if exists date_table")
+    sql("drop table if exists date_parquet_table")
+    sql("create table date_table(empno int, empname string, projdate Date) using carbon")
+    sql("insert into  date_table select 11, 'ravi', '2017-11-11'")
+    sql("create table date_parquet_table(empno int, empname string, projdate Date) using parquet")
+    sql("insert into  date_parquet_table select 11, 'ravi', '2017-11-11'")
+    checkAnswer(sql("select * from date_table"), sql("select * from date_parquet_table"))
+    sql("drop table if exists date_table")
+    sql("drop table if exists date_parquet_table")
+  }
+
+  test("test read and write with date datatype with wrong format") {
+    sql("drop table if exists date_table")
+    sql("drop table if exists date_parquet_table")
+    sql("create table date_table(empno int, empname string, projdate Date) using carbon")
+    sql("insert into  date_table select 11, 'ravi', '11-11-2017'")
+    sql("create table date_parquet_table(empno int, empname string, projdate Date) using parquet")
+    sql("insert into  date_parquet_table select 11, 'ravi', '11-11-2017'")
+    checkAnswer(sql("select * from date_table"), sql("select * from date_parquet_table"))
+    sql("drop table if exists date_table")
+    sql("drop table if exists date_parquet_table")
+  }
+
+  test("test read and write with timestamp datatype") {
+    sql("drop table if exists date_table")
+    sql("drop table if exists date_parquet_table")
+    sql("create table date_table(empno int, empname string, projdate timestamp) using carbon")
+    sql("insert into  date_table select 11, 'ravi', '2017-11-11 00:00:01'")
+    sql(
+      "create table date_parquet_table(empno int, empname string, projdate timestamp) using " +
+      "parquet")
+    sql("insert into  date_parquet_table select 11, 'ravi', '2017-11-11 00:00:01'")
+    checkAnswer(sql("select * from date_table"), sql("select * from date_parquet_table"))
+    sql("drop table if exists date_table")
+    sql("drop table if exists date_parquet_table")
+  }
+
+  test("test read and write with timestamp datatype with wrong format") {
+    sql("drop table if exists date_table")
+    sql("drop table if exists date_parquet_table")
+    sql("create table date_table(empno int, empname string, projdate timestamp) using carbon")
+    sql("insert into  date_table select 11, 'ravi', '11-11-2017 00:00:01'")
+    sql(
+      "create table date_parquet_table(empno int, empname string, projdate timestamp) using " +
+      "parquet")
+    sql("insert into  date_parquet_table select 11, 'ravi', '11-11-2017 00:00:01'")
+    checkAnswer(sql("select * from date_table"), sql("select * from date_parquet_table"))
+    sql("drop table if exists date_table")
+    sql("drop table if exists date_parquet_table")
+  }
+
+  test("test write with array type with filter") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Array("b", "c"), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql("create table carbon_table(c1 string, c2 array<string>, number int) using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table where c1='a1' and c2[0]='b'"),
+      sql("select * from parquet_table where c1='a1' and c2[0]='b'"))
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write with struct type with filter") {
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, (Array("1", "2"), ("3", "4")), Array(("1", 1), ("2", 2)), x))
+      .toDF("c1", "c2", "c3", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    sql(
+      "create table carbon_table(c1 string, c2 struct<a1:array<string>, a2:struct<a1:string, " +
+      "a2:string>>, c3 array<struct<a1:string, a2:int>>, number int) using carbon")
+    sql("insert into carbon_table select * from parquet_table")
+    assert(sql("select * from carbon_table").count() == 10)
+    checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+    checkAnswer(sql("select * from carbon_table where c2.a1[0]='1' and c1='a1'"),
+      sql("select * from parquet_table where c2._1[0]='1' and c1='a1'"))
+    checkAnswer(sql("select * from carbon_table where c2.a1[0]='1' and c3[0].a2=1"),
+      sql("select * from parquet_table where c2._1[0]='1' and c3[0]._2=1"))
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test read with df write string issue") {
+    sql("drop table if exists test123")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x.toShort, x, x.toLong, x.toDouble, BigDecimal.apply(x),
+        Array(x + 1,
+          x), ("b", BigDecimal.apply(x))))
+      .toDF("c1", "c2", "shortc", "intc", "longc", "doublec", "bigdecimalc", "arrayc", "structc")
+
+    // Saves dataframe to carbon file
+    df.write.format("carbon").save(warehouse1 + "/test_folder/")
+    if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+      sql(s"create table test123 (c1 string, c2 string, shortc smallint,intc int, longc bigint,  " +
+          s"doublec double, bigdecimalc decimal(38,18), arrayc array<int>, structc " +
+          s"struct<_1:string, _2:decimal(38,18)>) using carbon location '$warehouse1/test_folder/'")
+
+      checkAnswer(sql("select * from test123"),
+        sqlContext.read.format("carbon").load(warehouse1 + "/test_folder/"))
+    }
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    sql("drop table if exists test123")
+  }
+
+  test("test read with df write with empty data") {
+    sql("drop table if exists test123")
+    sql("drop table if exists test123_par")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    // Saves dataframe to carbon file
+    if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+      sql(s"create table test123 (c1 string, c2 string, arrayc array<int>, structc " +
+          s"struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint,  " +
+          s"doublec double, bigdecimalc decimal(38,18)) using carbon location " +
+          s"'$warehouse1/test_folder/'")
+
+      sql(s"create table test123_par (c1 string, c2 string, arrayc array<int>, structc " +
+          s"struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint,  " +
+          s"doublec double, bigdecimalc decimal(38,18)) using carbon location " +
+          s"'$warehouse1/test_folder/'")
+      checkAnswer(sql("select count(*) from test123"),
+        sql("select count(*) from test123_par"))
+    }
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    sql("drop table if exists test123")
+    sql("drop table if exists test123_par")
+  }
+
+  test("test write with nosort columns") {
+    sql("drop table if exists test123")
+    sql("drop table if exists test123_par")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x.toShort, x, x.toLong, x.toDouble, BigDecimal.apply(x),
+        Array(x + 1,
+          x), ("b", BigDecimal.apply(x))))
+      .toDF("c1", "c2", "shortc", "intc", "longc", "doublec", "bigdecimalc", "arrayc", "structc")
+
+    // Saves dataframe to carbon file
+    df.write.format("parquet").saveAsTable("test123_par")
+    if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+      sql(s"create table test123 (c1 string, c2 string, shortc smallint,intc int, longc bigint,  " +
+          s"doublec double, bigdecimalc decimal(38,18), arrayc array<int>, structc " +
+          s"struct<_1:string, _2:decimal(38,18)>) using carbon options('sort_columns'='') " +
+          s"location '$warehouse1/test_folder/'")
+
+      sql(s"insert into test123 select * from test123_par")
+      checkAnswer(sql("select * from test123"), sql(s"select * from test123_par"))
+    }
+    sql("drop table if exists test123")
+    sql("drop table if exists test123_par")
+  }
+
+  test("test complex columns mismatch") {
+    sql("drop table if exists array_com_hive")
+    sql(s"drop table if exists array_com")
+    sql(
+      "create table array_com_hive (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, " +
+      "EDUCATED string, IS_MARRIED string, ARRAY_INT array<int>,ARRAY_STRING array<string>," +
+      "ARRAY_DATE array<timestamp>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT " +
+      "double, HQ_DEPOSIT double) row format delimited fields terminated by ',' collection items " +
+      "terminated by '$'")
+    val sourceFile = FileFactory
+      .getPath(s"$resourcesPath" + "../../../../../spark-datasource/src/test/resources/Array.csv")
+      .toString
+    sql(s"load data local inpath '$sourceFile' into table array_com_hive")
+    sql(
+      "create table Array_com (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, " +
+      "EDUCATED string, IS_MARRIED string, ARRAY_INT array<int>,ARRAY_STRING array<string>," +
+      "ARRAY_DATE array<timestamp>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT " +
+      "double, HQ_DEPOSIT double) using carbon")
+    sql("insert into Array_com select * from array_com_hive")
+    checkAnswer(sql("select * from Array_com order by CUST_ID ASC limit 3"),
+      sql("select * from array_com_hive order by CUST_ID ASC limit 3"))
+    sql("drop table if exists array_com_hive")
+    sql(s"drop table if exists array_com")
+  }
+
+  test("test complex columns fail while insert ") {
+    sql("drop table if exists STRUCT_OF_ARRAY_com_hive")
+    sql(s"drop table if exists STRUCT_OF_ARRAY_com")
+    sql(
+      " create table STRUCT_OF_ARRAY_com_hive (CUST_ID string, YEAR int, MONTH int, AGE int, " +
+      "GENDER string, EDUCATED string, IS_MARRIED string, STRUCT_OF_ARRAY struct<ID: int," +
+      "CHECK_DATE: timestamp ,SNo: array<int>,sal1: array<double>,state: array<string>," +
+      "date1: array<timestamp>>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT float, " +
+      "HQ_DEPOSIT double) row format delimited fields terminated by ',' collection items " +
+      "terminated by '$' map keys terminated by '&'")
+    val sourceFile = FileFactory
+      .getPath(
+        s"$resourcesPath" + "../../../../../spark-datasource/src/test/resources/structofarray.csv")
+      .toString
+    sql(s"load data local inpath '$sourceFile' into table STRUCT_OF_ARRAY_com_hive")
+    sql(
+      "create table STRUCT_OF_ARRAY_com (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER " +
+      "string, EDUCATED string, IS_MARRIED string, STRUCT_OF_ARRAY struct<ID: int," +
+      "CHECK_DATE: timestamp,SNo: array<int>,sal1: array<double>,state: array<string>," +
+      "date1: array<timestamp>>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT double," +
+      " HQ_DEPOSIT double) using carbon")
+    sql(" insert into STRUCT_OF_ARRAY_com select * from STRUCT_OF_ARRAY_com_hive")
+    checkAnswer(sql("select * from STRUCT_OF_ARRAY_com  order by CUST_ID ASC"),
+      sql("select * from STRUCT_OF_ARRAY_com_hive  order by CUST_ID ASC"))
+    sql("drop table if exists STRUCT_OF_ARRAY_com_hive")
+    sql(s"drop table if exists STRUCT_OF_ARRAY_com")
+  }
+
+  test("test partition error in carbon") {
+    sql("drop table if exists carbon_par")
+    sql("drop table if exists parquet_par")
+    sql(
+      "create table carbon_par (name string, age int, country string) using carbon partitioned by" +
+      " (country)")
+    sql("insert into carbon_par select 'b', '12', 'aa'")
+    sql(
+      "create table parquet_par (name string, age int, country string) using carbon partitioned " +
+      "by (country)")
+    sql("insert into parquet_par select 'b', '12', 'aa'")
+    checkAnswer(sql("select * from carbon_par"), sql("select * from parquet_par"))
+    sql("drop table if exists carbon_par")
+    sql("drop table if exists parquet_par")
+  }
+
+  test("test write and create table with sort columns not allow") {
+    sql("drop table if exists test123")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", "c" + x, "d" + x, x.toShort, x, x.toLong, x.toDouble, BigDecimal
+        .apply(x)))
+      .toDF("c1", "c2", "c3", "c4", "shortc", "intc", "longc", "doublec", "bigdecimalc")
+
+    // Saves dataframe to carbon file
+    df.write.format("carbon").save(s"$warehouse1/test_folder/")
+    if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+      intercept[UnsupportedOperationException] {
+        sql(s"create table test123 using carbon options('sort_columns'='shortc,c2') location " +
+            s"'$warehouse1/test_folder/'")
+      }
+    }
+    sql("drop table if exists test123")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+  }
+
+  test("valdate if path not specified during table creation") {
+    sql("drop table if exists test123")
+    val ex = intercept[AnalysisException] {
+      sql(s"create table test123 using carbon options('sort_columns'='shortc,c2')")
+    }
+    assert(ex.getMessage().contains("Unable to infer schema for carbon"))
+  }
+
+  test("test double boundary") {
+    sql("drop table if exists par")
+    sql("drop table if exists car")
+
+    sql("create table par (c1 string, c2 double, n int) using parquet")
+    sql("create table car (c1 string, c2 double, n int) using carbon")
+    sql("insert into par select 'a', 1.7986931348623157E308, 215565665556")
+    sql("insert into car select 'a', 1.7986931348623157E308, 215565665556")
+
+    checkAnswer(sql("select * from car"), sql("select * from par"))
+    sql("drop table if exists par")
+    sql("drop table if exists car")
+  }
+
+  test("test write using multi subfolder") {
+    if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+      import sqlContext.implicits._
+      val df = sqlContext.sparkContext.parallelize(1 to 10)
+        .map(x => ("a" + x % 10, "b", x))
+        .toDF("c1", "c2", "number")
+
+      // Saves dataframe to carbon file
+      df.write.format("carbon").save(warehouse1 + "/test_folder/1/" + System.nanoTime())
+      df.write.format("carbon").save(warehouse1 + "/test_folder/2/" + System.nanoTime())
+      df.write.format("carbon").save(warehouse1 + "/test_folder/3/" + System.nanoTime())
+
+      val frame = sqlContext.read.format("carbon").load(warehouse1 + "/test_folder")
+      assert(frame.count() == 30)
+      assert(frame.where("c1='a1'").count() == 3)
+      val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+      DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/test_folder"))
+      assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    }
+  }
+
+  test("test read using old data") {
+    val store = new StoreCreator(new File(warehouse1).getAbsolutePath,
+      new File(warehouse1 + "../../../../../hadoop/src/test/resources/data.csv").getCanonicalPath,
+      false)
+    store.createCarbonStore()
+    FileFactory
+      .deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_0/0"))
+    val dfread = sqlContext.read.format("carbon")
+      .load(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_0")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test write sdk and read with spark using different sort order data") {
+    sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk"))
+    buildTestDataOtherDataType(5, Array("age", "address"), warehouse1 + "/sdk")
+    sql(s"create table sdkout using carbon options(path='$warehouse1/sdk')")
+    assert(sql("select * from sdkout").collect().length == 5)
+    buildTestDataOtherDataType(5, Array("name", "salary"), warehouse1 + "/sdk")
+    sql("refresh table sdkout")
+    assert(sql("select * from sdkout where name = 'name1'").collect().length == 2)
+    assert(sql("select * from sdkout where salary=100").collect().length == 2)
+    buildTestDataOtherDataType(5, Array("name", "age"), warehouse1 + "/sdk")
+    sql("refresh table sdkout")
+    assert(sql("select * from sdkout where name='name0'").collect().length == 3)
+    assert(sql("select * from sdkout").collect().length == 15)
+    assert(sql("select * from sdkout where salary=100").collect().length == 3)
+    assert(sql("select * from sdkout where address='address1'").collect().length == 3)
+    buildTestDataOtherDataType(5, Array("name", "salary"), warehouse1 + "/sdk")
+    sql("refresh table sdkout")
+    assert(sql("select * from sdkout where name='name0'").collect().length == 4)
+    assert(sql("select * from sdkout").collect().length == 20)
+    assert(sql("select * from sdkout where salary=100").collect().length == 4)
+    assert(sql("select * from sdkout where address='address1'").collect().length == 4)
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk"))
+  }
+
+  test("test Float data type by giving schema explicitly and desc formatted") {
+    sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+    buildTestDataOtherDataType(5, Array("age", "address"), warehouse1 + "/sdk1")
+    sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+        s"string," +
+        s"salary long, floatField float, bytefield byte) using carbon options " +
+        s"(path='$warehouse1/sdk1')")
+    assert(sql("desc formatted sdkout").collect().take(7).reverse.head.get(1).equals("float"))
+    assert(sql("desc formatted sdkout").collect().take(8).reverse.head.get(1).equals
+    ("tinyint"))
+  }
+
+  test("test select * on table with float data type") {
+    sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+    buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+    sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+        s"string," +
+        s"salary long, floatField float, bytefield byte) using carbon options " +
+        s"(path='$warehouse1/sdk1')")
+    checkAnswer(sql("select * from par_table"), sql("select * from sdkout"))
+    checkAnswer(sql("select floatfield from par_table"), sql("select floatfield from sdkout"))
+  }
+
+  test("test various filters on float data") {
+    sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+    buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+    sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+        s"string," +
+        s"salary long, floatField float, bytefield byte) using carbon options " +
+        s"(path='$warehouse1/sdk1')")
+    checkAnswer(sql("select * from par_table where floatfield < 10"),
+      sql("select * from sdkout where floatfield < 10"))
+    checkAnswer(sql("select * from par_table where floatfield > 5.3"),
+      sql("select * from sdkout where floatfield > 5.3"))
+    checkAnswer(sql("select * from par_table where floatfield >= 4.1"),
+      sql("select * from sdkout where floatfield >= 4.1"))
+    checkAnswer(sql("select * from par_table where floatfield != 5.5"),
+      sql("select * from sdkout where floatfield != 5.5"))
+    checkAnswer(sql("select * from par_table where floatfield <= 5"),
+      sql("select * from sdkout where floatfield <= 5"))
+    checkAnswer(sql("select * from par_table where floatfield >= 5"),
+      sql("select * from sdkout where floatfield >= 5"))
+    checkAnswer(sql("select * from par_table where floatfield IN ('5.5','6.6')"),
+      sql("select * from sdkout where floatfield IN ('5.5','6.6')"))
+    checkAnswer(sql("select * from par_table where floatfield NOT IN ('5.5','6.6')"),
+      sql("select * from sdkout where floatfield NOT IN ('5.5','6.6')"))
+    checkAnswer(sql("select * from par_table where floatfield = cast('6.6' as float)"),
+      sql("select * from sdkout where floatfield = cast('6.6' as float)"))
+  }
+
+  test("test select * on table with byte data type") {
+    sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+    buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+    sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+        s"string," +
+        s"salary long, floatField float, bytefield byte) using carbon options " +
+        s"(path='$warehouse1/sdk1')")
+    checkAnswer(sql("select * from par_table"), sql("select * from sdkout"))
+    checkAnswer(sql("select byteField from par_table"), sql("select bytefield from sdkout"))
+  }
+
+  test("test various filters on byte data") {
+    sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+    buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+    sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+        s"string," +
+        s"salary long, floatField float, bytefield byte) using carbon options " +
+        s"(path='$warehouse1/sdk1')")
+    checkAnswer(sql("select * from par_table where bytefield < 10"),
+      sql("select * from sdkout where bytefield < 10"))
+    checkAnswer(sql("select * from par_table where bytefield > 5"),
+      sql("select * from sdkout where bytefield > 5"))
+    checkAnswer(sql("select * from par_table where bytefield >= 4"),
+      sql("select * from sdkout where bytefield >= 4"))
+    checkAnswer(sql("select * from par_table where bytefield != 5"),
+      sql("select * from sdkout where bytefield != 5"))
+    checkAnswer(sql("select * from par_table where bytefield <= 5"),
+      sql("select * from sdkout where bytefield <= 5"))
+    checkAnswer(sql("select * from par_table where bytefield >= 5"),
+      sql("select * from sdkout where bytefield >= 5"))
+    checkAnswer(sql("select * from par_table where bytefield IN ('5','6')"),
+      sql("select * from sdkout where bytefield IN ('5','6')"))
+    checkAnswer(sql("select * from par_table where bytefield NOT IN ('5','6')"),
+      sql("select * from sdkout where bytefield NOT IN ('5','6')"))
+  }
+
+  test("test struct of float type and byte type") {
+    import scala.collection.JavaConverters._
+    val path = FileFactory.getPath(warehouse1 + "/sdk1").toString
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+    sql("drop table if exists complextable")
+    val fields = List(new StructField
+    ("byteField", DataTypes.BYTE), new StructField("floatField", DataTypes.FLOAT))
+    val structType = Array(new Field("stringfield", DataTypes.STRING), new Field
+    ("structField", "struct", fields.asJava))
+
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(path)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
+          .withCsvInput(new Schema(structType)).writtenBy("SparkCarbonDataSourceTestCase").build()
+
+      var i = 0
+      while (i < 11) {
+        val array = Array[String](s"name$i", s"$i" + "$" + s"$i.${ i }12")
+        writer.write(array)
+        i += 1
+      }
+      writer.close()
+      sql("create table complextable (stringfield string, structfield struct<bytefield: " +
+          "byte, floatfield: float>) " +
+          s"using carbon location '$path'")
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+      case _ => None
+    }
+    checkAnswer(sql("select * from complextable limit 1"), Seq(Row("name0", Row(0
+      .asInstanceOf[Byte], 0.012.asInstanceOf[Float]))))
+    checkAnswer(sql("select * from complextable where structfield.bytefield > 9"), Seq(Row
+    ("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+    checkAnswer(sql("select * from complextable where structfield.bytefield > 9"), Seq(Row
+    ("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+    checkAnswer(sql("select * from complextable where structfield.floatfield > 9.912"), Seq
+    (Row
+    ("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+    checkAnswer(sql("select * from complextable where structfield.floatfield > 9.912 and " +
+                    "structfield.bytefield < 11"),
+      Seq(Row("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+  }
+
+  test("test bytefield as sort column") {
+    val path = FileFactory.getPath(warehouse1 + "/sdk1").toString
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+    var fields: Array[Field] = new Array[Field](8)
+    // same column name, but name as boolean type
+    fields(0) = new Field("age", DataTypes.INT)
+    fields(1) = new Field("height", DataTypes.DOUBLE)
+    fields(2) = new Field("name", DataTypes.STRING)
+    fields(3) = new Field("address", DataTypes.STRING)
+    fields(4) = new Field("salary", DataTypes.LONG)
+    fields(5) = new Field("bytefield", DataTypes.BYTE)
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(path)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(Array("bytefield"))
+          .withCsvInput(new Schema(fields)).writtenBy("SparkCarbonDataSourceTestCase").build()
+
+      var i = 0
+      while (i < 11) {
+        val array = Array[String](
+          String.valueOf(i),
+          String.valueOf(i.toDouble / 2),
+          "name" + i,
+          "address" + i,
+          (i * 100).toString,
+          s"${ 10 - i }")
+        writer.write(array)
+        i += 1
+      }
+      writer.close()
+      sql("drop table if exists sorted_par")
+      sql("drop table if exists sort_table")
+      sql(s"create table sort_table (age int, height double, name string, address string," +
+          s" salary long, bytefield byte) using carbon location '$path'")
+      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
+      sql(s"create table sorted_par(age int, height double, name string, address " +
+          s"string," +
+          s"salary long, bytefield byte) using parquet location " +
+          s"'$warehouse1/../warehouse2'")
+      (0 to 10).foreach {
+        i =>
+          sql(s"insert into sorted_par select '$i', ${ i.toDouble / 2 }, 'name$i', " +
+              s"'address$i', ${ i * 100 }, '${ 10 - i }'")
+      }
+      checkAnswer(sql("select * from sorted_par order by bytefield"),
+        sql("select * from sort_table"))
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+      case _ => None
+    }
+  }
+
+  test("test array of float type and byte type") {
+    import scala.collection.JavaConverters._
+    val path = FileFactory.getPath(warehouse1 + "/sdk1").toString
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+    sql("drop table if exists complextable")
+    val structType =
+      Array(new Field("stringfield", DataTypes.STRING),
+        new Field("bytearray", "array", List(new StructField("byteField", DataTypes.BYTE))
+          .asJava),
+        new Field("floatarray", "array", List(new StructField("floatfield", DataTypes.FLOAT))
+          .asJava))
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(path)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
+          .withCsvInput(new Schema(structType)).writtenBy("SparkCarbonDataSourceTestCase").build()
+
+      var i = 0
+      while (i < 10) {
+        val array = Array[String](s"name$i",
+          s"$i" + "$" + s"${ i * 2 }",
+          s"${ i / 2 }" + "$" + s"${ i / 3 }")
+        writer.write(array)
+        i += 1
+      }
+      writer.close()
+      sql(s"create table complextable (stringfield string, bytearray " +
+          s"array<byte>, floatarray array<float>) using carbon " +
+          s"location " +
+          s"'$path'")
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+      case _ => None
+    }
+    checkAnswer(sql("select * from complextable limit 1"), Seq(Row("name0", mutable
+      .WrappedArray.make(Array[Byte](0, 0)), mutable.WrappedArray.make(Array[Float](0.0f, 0.0f)))))
+    checkAnswer(sql("select * from complextable where bytearray[0] = 1"), Seq(Row("name1",
+      mutable.WrappedArray.make(Array[Byte](1, 2)), mutable.WrappedArray.make(Array[Float](0.0f,
+        0.0f)))))
+    checkAnswer(sql("select * from complextable where bytearray[0] > 8"), Seq(Row("name9",
+      mutable.WrappedArray.make(Array[Byte](9, 18)), mutable.WrappedArray.make(Array[Float](4.0f,
+        3.0f)))))
+    checkAnswer(sql(
+      "select * from complextable where floatarray[0] IN (4.0) and stringfield = 'name8'"), Seq(Row
+    ("name8",
+      mutable.WrappedArray.make(Array[Byte](8, 16)), mutable.WrappedArray.make(Array[Float](4.0f,
+      2.0f)))))
+  }
+
+  private def createParquetTable {
+    val path = FileFactory.getUpdatedFilePath(s"$warehouse1/../warehouse2")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$path"))
+    sql(s"create table par_table(male boolean, age int, height double, name string, address " +
+        s"string," +
+        s"salary long, floatField float, bytefield byte) using parquet location '$path'")
+    (0 to 10).foreach {
+      i =>
+        sql(s"insert into par_table select 'true','$i', ${ i.toDouble / 2 }, 'name$i', " +
+            s"'address$i', ${ i * 100 }, $i.$i, '$i'")
+    }
+  }
+
+  // prepare sdk writer output with other schema
+  def buildTestDataOtherDataType(rows: Int,
+      sortColumns: Array[String],
+      writerPath: String,
+      colCount: Int = -1): Any = {
+    var fields: Array[Field] = new Array[Field](8)
+    // same column name, but name as boolean type
+    fields(0) = new Field("male", DataTypes.BOOLEAN)
+    fields(1) = new Field("age", DataTypes.INT)
+    fields(2) = new Field("height", DataTypes.DOUBLE)
+    fields(3) = new Field("name", DataTypes.STRING)
+    fields(4) = new Field("address", DataTypes.STRING)
+    fields(5) = new Field("salary", DataTypes.LONG)
+    fields(6) = new Field("floatField", DataTypes.FLOAT)
+    fields(7) = new Field("bytefield", DataTypes.BYTE)
+
+    if (colCount > 0) {
+      val fieldsToWrite: Array[Field] = new Array[Field](colCount)
+      var i = 0
+      while (i < colCount) {
+        fieldsToWrite(i) = fields(i)
+        i += 1
+      }
+      fields = fieldsToWrite
+    }
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(writerPath)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns)
+          .withCsvInput(new Schema(fields)).writtenBy("SparkCarbonDataSourceTestCase").build()
+
+      var i = 0
+      while (i < rows) {
+        val array = Array[String]("true",
+          String.valueOf(i),
+          String.valueOf(i.toDouble / 2),
+          "name" + i,
+          "address" + i,
+          (i * 100).toString,
+          s"$i.$i", s"$i")
+        if (colCount > 0) {
+          writer.write(array.slice(0, colCount))
+        } else {
+          writer.write(array)
+        }
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+      case _ => None
+    }
+  }
+
+  def buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath: String, rows: Int): Unit = {
+    FileFactory.deleteAllFilesOfDir(new File(writerPath))
+    val mySchema =
+      """
+        |{
+        |  "name": "address",
+        |  "type": "record",
+        |  "fields": [
+        |    {
+        |      "name": "name",
+        |      "type": "string"
+        |    },
+        |    {
+        |      "name": "age",
+        |      "type": "int"
+        |    },
+        |    {
+        |      "name": "structRecord",
+        |      "type": {
+        |        "type": "record",
+        |        "name": "my_address",
+        |        "fields": [
+        |          {
+        |            "name": "street",
+        |            "type": "string"
+        |          },
+        |          {
+        |            "name": "houseDetails",
+        |            "type": {
+        |               "type": "array",
+        |               "items": {
+        |                   "name": "memberDetails",
+        |                   "type": "map",
+        |                   "values": "string"
+        |                }
+        |             }
+        |          }
+        |        ]
+        |      }
+        |    }
+        |  ]
+        |}
+      """.stripMargin
+    val json =
+      """ {"name":"bob", "age":10, "structRecord": {"street":"street1",
+        |"houseDetails": [{"101": "Rahul", "102": "Pawan"}]}} """
+        .stripMargin
+    WriteFilesWithAvroWriter(writerPath, rows, mySchema, json)
+  }
+
+  test("test external table with struct type with value as nested struct<array<map>> type") {
+    val writerPath: String = FileFactory.getUpdatedFilePath(warehouse1 + "/sdk1")
+    val rowCount = 3
+    buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath, rowCount)
+    sql("drop table if exists carbon_external")
+    sql(s"create table carbon_external using carbon location '$writerPath'")
+    assert(sql("select * from carbon_external").count() == rowCount)
+    sql("drop table if exists carbon_external")
+  }
+
+  test("test byte and float for multiple pages") {
+    val path = FileFactory.getPath(warehouse1 + "/sdk1").toString
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+    sql("drop table if exists multi_page")
+    var fields: Array[Field] = new Array[Field](8)
+    // same column name, but name as boolean type
+    fields(0) = new Field("a", DataTypes.STRING)
+    fields(1) = new Field("b", DataTypes.FLOAT)
+    fields(2) = new Field("c", DataTypes.BYTE)
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(path)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
+          .withCsvInput(new Schema(fields)).writtenBy("SparkCarbonDataSourceTestCase").build()
+
+      var i = 0
+      while (i < 33000) {
+        val array = Array[String](
+          String.valueOf(i),
+          s"$i.3200", "32")
+        writer.write(array)
+        i += 1
+      }
+      writer.close()
+      sql(s"create table multi_page (a string, b float, c byte) using carbon location " +
+          s"'$path'")
+      assert(sql("select * from multi_page").count() == 33000)
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+    } finally {
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+    }
+  }
+
+  test("test partition issue with add location") {
+    sql("drop table if exists partitionTable_obs")
+    sql("drop table if exists partitionTable_obs_par")
+    sql(s"create table partitionTable_obs (id int,name String,email String) using carbon " +
+        s"partitioned by(email) ")
+    sql(s"create table partitionTable_obs_par (id int,name String,email String) using parquet " +
+        s"partitioned by(email) ")
+    sql("insert into partitionTable_obs select 1,'huawei','abc'")
+    sql("insert into partitionTable_obs select 1,'huawei','bcd'")
+    sql(s"alter table partitionTable_obs add partition (email='def') location " +
+        s"'$warehouse1/test_folder121/'")
+    sql("insert into partitionTable_obs select 1,'huawei','def'")
+
+    sql("insert into partitionTable_obs_par select 1,'huawei','abc'")
+    sql("insert into partitionTable_obs_par select 1,'huawei','bcd'")
+    sql(s"alter table partitionTable_obs_par add partition (email='def') location " +
+        s"'$warehouse1/test_folder122/'")
+    sql("insert into partitionTable_obs_par select 1,'huawei','def'")
+
+    checkAnswer(sql("select * from partitionTable_obs"),
+      sql("select * from partitionTable_obs_par"))
+    sql("drop table if exists partitionTable_obs")
+    sql("drop table if exists partitionTable_obs_par")
+  }
+
+  test("test multiple partition  select issue") {
+    sql("drop table if exists t_carbn01b_hive")
+    sql(s"drop table if exists t_carbn01b")
+    sql(
+      "create table t_carbn01b_hive(Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep " +
+      "DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String," +
+      "Create_date String,Active_status String,Item_type_cd INT, Update_time TIMESTAMP, " +
+      "Discount_price DOUBLE)  using parquet partitioned by (Active_status,Item_type_cd, " +
+      "Update_time, Discount_price)")
+    sql(
+      "create table t_carbn01b(Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep " +
+      "DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String," +
+      "Create_date String,Active_status String,Item_type_cd INT, Update_time TIMESTAMP, " +
+      "Discount_price DOUBLE)  using carbon partitioned by (Active_status,Item_type_cd, " +
+      "Update_time, Discount_price)")
+    sql(
+      "insert into t_carbn01b partition(Active_status, Item_type_cd,Update_time,Discount_price) " +
+      "select * from t_carbn01b_hive")
+    sql(
+      "alter table t_carbn01b add partition (active_status='xyz',Item_type_cd=12," +
+      "Update_time=NULL,Discount_price='3000')")
+    sql(
+      "insert overwrite table t_carbn01b select 'xyz', 12, 74,3000,20000000,121.5,4.99,2.44," +
+      "'RE3423ee','dddd', 'ssss','2012-01-02 23:04:05.12', '2012-01-20'")
+    sql(
+      "insert overwrite table t_carbn01b_hive select 'xyz', 12, 74,3000,20000000,121.5,4.99,2.44," +
+      "'RE3423ee','dddd', 'ssss','2012-01-02 23:04:05.12', '2012-01-20'")
+    checkAnswer(sql("select * from t_carbn01b_hive"), sql("select * from t_carbn01b"))
+    sql("drop table if exists t_carbn01b_hive")
+    sql(s"drop table if exists t_carbn01b")
+  }
+
+  test("Test Float value by having negative exponents") {
+    sql("DROP TABLE IF EXISTS float_p")
+    sql("DROP TABLE IF EXISTS float_c")
+    sql("CREATE TABLE float_p(f float) using parquet")
+    sql("CREATE TABLE float_c(f float) using carbon")
+    sql("INSERT INTO float_p select \"1.4E-3\"")
+    sql("INSERT INTO float_p select \"1.4E-38\"")
+    sql("INSERT INTO float_c select \"1.4E-3\"")
+    sql("INSERT INTO float_c select \"1.4E-38\"")
+    checkAnswer(sql("SELECT * FROM float_p"),
+      sql("SELECT * FROM float_c"))
+    sql("DROP TABLE float_p")
+    sql("DROP TABLE float_c")
+  }
+
+  test("test fileformat flow with drop and query on same table") {
+    sql("drop table if exists fileformat_drop")
+    sql("drop table if exists fileformat_drop_hive")
+    sql(
+      "create table fileformat_drop (imei string,AMSize string,channelsId string,ActiveCountry " +
+      "string, Activecity string,gamePointId double,deviceInformationId double,productionDate " +
+      "Timestamp,deliveryDate timestamp,deliverycharge double) using carbon options" +
+      "('table_blocksize'='1','LOCAL_DICTIONARY_ENABLE'='TRUE'," +
+      "'LOCAL_DICTIONARY_THRESHOLD'='1000')")
+    sql(
+      "create table fileformat_drop_hive(imei string,deviceInformationId double,AMSize string," +
+      "channelsId string,ActiveCountry string,Activecity string,gamePointId double,productionDate" +
+      " Timestamp,deliveryDate timestamp,deliverycharge double)row format delimited FIELDS " +
+      "terminated by ',' LINES terminated by '\n' stored as textfile")
+    val sourceFile = FileFactory
+      .getPath(s"$resourcesPath" +
+               "../../../../../spark-datasource/src/test/resources/vardhandaterestruct.csv")
+      .toString
+    sql(s"load data local inpath '$sourceFile' into table fileformat_drop_hive")
+    sql(
+      "insert into fileformat_drop select imei ,deviceInformationId ,AMSize ,channelsId ," +
+      "ActiveCountry ,Activecity ,gamePointId ,productionDate ,deliveryDate ,deliverycharge from " +
+      "fileformat_drop_hive")
+    assert(sql("select count(*) from fileformat_drop where imei='1AA10000'").collect().length == 1)
+
+    sql("drop table if exists fileformat_drop")
+    sql(
+      "create table fileformat_drop (imei string,deviceInformationId double,AMSize string," +
+      "channelsId string,ActiveCountry string,Activecity string,gamePointId float,productionDate " +
+      "timestamp,deliveryDate timestamp,deliverycharge decimal(10,2)) using carbon options" +
+      "('table_blocksize'='1','LOCAL_DICTIONARY_ENABLE'='true'," +
+      "'local_dictionary_threshold'='1000')")
+    sql(
+      "insert into fileformat_drop select imei ,deviceInformationId ,AMSize ,channelsId ," +
+      "ActiveCountry ,Activecity ,gamePointId ,productionDate ,deliveryDate ,deliverycharge from " +
+      "fileformat_drop_hive")
+    assert(sql("select count(*) from fileformat_drop where imei='1AA10000'").collect().length == 1)
+    sql("drop table if exists fileformat_drop")
+    sql("drop table if exists fileformat_drop_hive")
+  }
+
+  test("validate the columns not present in schema") {
+    sql("drop table if exists validate")
+    sql(
+      "create table validate (name string, age int, address string) using carbon options" +
+      "('inverted_index'='abc')")
+    val ex = intercept[Exception] {
+      sql("insert into validate select 'abc',4,'def'")
+    }
+    assert(ex.getMessage
+      .contains("column: abc specified in inverted index columns does not exist in schema"))
+  }
+
+  override protected def beforeAll(): Unit = {
+    drop
+    createParquetTable
+  }
+
+  override def afterAll(): Unit = {
+    drop
+  }
+
+  private def drop = {
+    sql("drop table if exists testformat")
+    sql("drop table if exists carbon_table")
+    sql("drop table if exists testparquet")
+    sql("drop table if exists par_table")
+    sql("drop table if exists sdkout")
+    sql("drop table if exists validate")
+  }
+
+  private def jsonToAvro(json: String, avroSchema: String): GenericRecord = {
+    var input: InputStream = null
+    var writer: DataFileWriter[GenericRecord] = null
+    var encoder: Encoder = null
+    var output: ByteArrayOutputStream = null
+    try {
+      val schema = new org.apache.avro.Schema.Parser().parse(avroSchema)
+      val reader = new GenericDatumReader[GenericRecord](schema)
+      input = new ByteArrayInputStream(json.getBytes())
+      output = new ByteArrayOutputStream()
+      val din = new DataInputStream(input)
+      writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
+      writer.create(schema, output)
+      val decoder = DecoderFactory.get().jsonDecoder(schema, din)
+      var datum: GenericRecord = reader.read(null, decoder)
+      return datum
+    } finally {
+      input.close()
+      writer.close()
+    }
+  }
+
+  def WriteFilesWithAvroWriter(writerPath: String,
+      rows: Int,
+      mySchema: String,
+      json: String) = {
+    // conversion to GenericData.Record
+    val nn = new avro.Schema.Parser().parse(mySchema)
+    val record = jsonToAvro(json, mySchema)
+    try {
+      val writer = CarbonWriter.builder
+        .outputPath(writerPath)
+        .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).writtenBy("DataSource")
+        .build()
+      var i = 0
+      while (i < rows) {
+        writer.write(record)
+        i = i + 1
+      }
+      writer.close()
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        Assert.fail(e.getMessage)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
index d117d69..f2ae2cb 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
@@ -20,6 +20,7 @@ import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.{BeforeAndAfterAll, Suites}
 
 import org.apache.carbondata.cluster.sdv.generated._
+import org.apache.carbondata.cluster.sdv.generated.datasource.{CreateTableUsingSparkCarbonFileFormatTestCase, SparkCarbonDataSourceTestCase}
 import org.apache.carbondata.cluster.sdv.register.TestRegisterCarbonTable
 import org.apache.carbondata.spark.testsuite.localdictionary.LoadTableWithLocalDictionaryTestCase
 
@@ -174,7 +175,9 @@ class SDVSuites3 extends Suites with BeforeAndAfterAll {
  */
 class SDVSuites4 extends Suites with BeforeAndAfterAll {
 
-  val suites =     new CarbonV1toV3CompatabilityTestCase :: Nil
+  val suites =     new CreateTableUsingSparkCarbonFileFormatTestCase ::
+                   new SparkCarbonDataSourceTestCase ::
+                   new CarbonV1toV3CompatabilityTestCase :: Nil
 
   override val nestedSuites = suites.toIndexedSeq
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 49aa7ff..3ee75ad 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -23,10 +23,11 @@ import java.util.{Locale, TimeZone}
 import org.apache.carbondata.common.logging.LogServiceFactory
 import scala.collection.JavaConversions._
 
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.command.LoadDataCommand
-import org.apache.spark.sql.hive.{CarbonSessionCatalog}
+import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor}
 import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext}
 import org.scalatest.Suite
@@ -138,6 +139,8 @@ class QueryTest extends PlanTest with Suite {
 
   val sqlContext: SQLContext = TestQueryExecutor.INSTANCE.sqlContext
 
+  sqlContext.sparkSession.experimental.extraOptimizations = Seq(new CarbonFileIndexReplaceRule)
+
   val resourcesPath = TestQueryExecutor.resourcesPath
 
   val hiveClient = sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]