You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/12/20 06:35:29 UTC
carbondata git commit: [SDV] Add datasource testcases for Spark File
Format
Repository: carbondata
Updated Branches:
refs/heads/master e01916b6b -> d7ff3e688
[SDV] Add datasource testcases for Spark File Format
Added datasource test cases for Spark File Format.
This closes #2951
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d7ff3e68
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d7ff3e68
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d7ff3e68
Branch: refs/heads/master
Commit: d7ff3e6880a8dbedbd838a89ad5f7c5f0233da9f
Parents: e01916b
Author: shivamasn <sh...@gmail.com>
Authored: Mon Nov 26 16:04:08 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Thu Dec 20 12:02:38 2018 +0530
----------------------------------------------------------------------
integration/spark-common-cluster-test/pom.xml | 6 +
.../CarbonV1toV3CompatabilityTestCase.scala | 17 +-
...ableUsingSparkCarbonFileFormatTestCase.scala | 336 +++++
.../SparkCarbonDataSourceTestCase.scala | 1380 ++++++++++++++++++
.../cluster/sdv/suite/SDVSuites.scala | 5 +-
.../spark/sql/common/util/QueryTest.scala | 5 +-
6 files changed, 1742 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml
index 8648edf..a3ec125 100644
--- a/integration/spark-common-cluster-test/pom.xml
+++ b/integration/spark-common-cluster-test/pom.xml
@@ -37,6 +37,12 @@
<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-spark-datasource</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
index f34b657..900c658 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
@@ -31,15 +31,22 @@ import org.apache.carbondata.core.util.CarbonProperties
class CarbonV1toV3CompatabilityTestCase extends QueryTest with BeforeAndAfterAll {
var localspark: CarbonSession = null
- val storeLocation = s"${TestQueryExecutor.integrationPath}/spark-common-test/src/test/resources/Data/v1_version/store"
- val metaLocation = s"${TestQueryExecutor.integrationPath}/spark-common-test/src/test/resources/Data/v1_version"
+ val storeLocation = s"${
+ TestQueryExecutor
+ .integrationPath
+ }/spark-common-test/src/test/resources/Data/v1_version/store"
+ val metaLocation = s"${
+ TestQueryExecutor
+ .integrationPath
+ }/spark-common-test/src/test/resources/Data/v1_version"
override def beforeAll {
sqlContext.sparkSession.stop()
CarbonEnv.carbonEnvMap.clear()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
import org.apache.spark.sql.CarbonSession._
- println(s"store path for CarbonV1toV3CompatabilityTestCase is $storeLocation and metastore is $metaLocation")
+ println(s"store path for CarbonV1toV3CompatabilityTestCase is $storeLocation and metastore is" +
+ s" $metaLocation")
localspark = SparkSession
.builder()
.master("local")
@@ -49,8 +56,8 @@ class CarbonV1toV3CompatabilityTestCase extends QueryTest with BeforeAndAfterAll
println("store path : " + CarbonProperties.getStorePath)
localspark.sparkContext.setLogLevel("WARN")
hiveClient.runSqlHive(
- s"ALTER TABLE default.t3 SET SERDEPROPERTIES" +
- s"('tablePath'='$storeLocation/default/t3', 'dbname'='default', 'tablename'='t3')")
+ s"ALTER TABLE default.t3 SET SERDEPROPERTIES" +
+ s"('tablePath'='$storeLocation/default/t3', 'dbname'='default', 'tablename'='t3')")
localspark.sql("show tables").show()
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala
new file mode 100644
index 0000000..b96fe10
--- /dev/null
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cluster.sdv.generated.datasource
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.{Date, Random}
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.lang.RandomStringUtils
+import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.util.SparkUtil
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+
+class CreateTableUsingSparkCarbonFileFormatTestCase extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ }
+
+ override def afterAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ }
+
+ val writerPath = s"${TestQueryExecutor.projectPath}/integration/spark-common-cluster-test/src/test/resources/SparkCarbonFileFormat/WriterOutput/"
+
+ def buildTestData(): Any = {
+
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val schema = new StringBuilder()
+ .append("[ \n")
+ .append(" {\"name\":\"string\"},\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"height\":\"double\"}\n")
+ .append("]")
+ .toString()
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema))
+ .writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase").build()
+ var i = 0
+ while (i < 100) {
+ writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+ i += 1
+ }
+ writer.close()
+ } catch {
+ case _: Throwable => None
+ }
+ }
+
+ def cleanTestData() = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ }
+
+ def deleteIndexFile(path: String, extension: String): Unit = {
+ val file: CarbonFile = FileFactory
+ .getCarbonFile(path, FileFactory.getFileType(path))
+
+ for (eachDir <- file.listFiles) {
+ if (!eachDir.isDirectory) {
+ if (eachDir.getName.endsWith(extension)) {
+ CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+ }
+ } else {
+ deleteIndexFile(eachDir.getPath, extension)
+ }
+ }
+ }
+
+ test(
+ "Running SQL directly and read carbondata files (sdk Writer Output) using the " +
+ "SparkCarbonFileFormat ") {
+ buildTestData()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //data source file format
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
+ |'$writerPath' """.stripMargin)
+ }
+
+ val directSQL = sql(s"""select * FROM carbon.`$writerPath`""".stripMargin)
+ checkAnswer(sql("select * from sdkOutputTable"), directSQL)
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ // TODO: Make the sparkCarbonFileFormat to work without index file
+ test("Read sdk writer output file without Carbondata file should fail") {
+ buildTestData()
+ deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ val exception = intercept[Exception] {
+ // data source file format
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
+ |'$writerPath' """.stripMargin)
+ }
+ }
+ assert(exception.getMessage()
+ .contains("CarbonData file is not present in the table location"))
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ test("Read sdk writer output file without index file should not fail") {
+ buildTestData()
+ deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
+ |'$writerPath' """.stripMargin)
+ }
+ //org.apache.spark.SparkException: Index file not present to read the carbondata file
+ assert(sql("select * from sdkOutputTable").collect().length == 100)
+ assert(sql("select * from sdkOutputTable where name='robot0'").collect().length == 1)
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+ test("Read data having multi blocklet") {
+ buildTestDataMuliBlockLet(750000, 50000)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
+ } else {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
+ |'$writerPath' """.stripMargin)
+ }
+ checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(800000)))
+ checkAnswer(sql(
+ "select count(*) from sdkOutputTable where from_email='Email for testing min max for " +
+ "allowed chars'"),
+ Seq(Row(50000)))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ clearDataMapCache
+ cleanTestData()
+ }
+
+ def buildTestDataMuliBlockLet(recordsInBlocklet1: Int, recordsInBlocklet2: Int): Unit = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ val fields = new Array[Field](8)
+ fields(0) = new Field("myid", DataTypes.INT)
+ fields(1) = new Field("event_id", DataTypes.STRING)
+ fields(2) = new Field("eve_time", DataTypes.DATE)
+ fields(3) = new Field("ingestion_time", DataTypes.TIMESTAMP)
+ fields(4) = new Field("alldate", DataTypes.createArrayType(DataTypes.DATE))
+ fields(5) = new Field("subject", DataTypes.STRING)
+ fields(6) = new Field("from_email", DataTypes.STRING)
+ fields(7) = new Field("sal", DataTypes.DOUBLE)
+ import scala.collection.JavaConverters._
+ val emailDataBlocklet1 = "FromEmail"
+ val emailDataBlocklet2 = "Email for testing min max for allowed chars"
+ try {
+ val options = Map("bad_records_action" -> "FORCE", "complex_delimiter_level_1" -> "$").asJava
+ val writer = CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16)
+ .sortBy(Array("myid", "ingestion_time", "event_id")).withLoadOptions(options)
+ .withCsvInput(new Schema(fields)).writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase")
+ .build()
+ val timeF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ val date_F = new SimpleDateFormat("yyyy-MM-dd")
+ for (i <- 1 to recordsInBlocklet1) {
+ val time = new Date(System.currentTimeMillis())
+ writer
+ .write(Array("" + i,
+ "event_" + i,
+ "" + date_F.format(time),
+ "" + timeF.format(time),
+ "" + date_F.format(time) + "$" + date_F.format(time),
+ "Subject_0",
+ emailDataBlocklet1,
+ "" + new Random().nextDouble()))
+ }
+ for (i <- 1 to recordsInBlocklet2) {
+ val time = new Date(System.currentTimeMillis())
+ writer
+ .write(Array("" + i,
+ "event_" + i,
+ "" + date_F.format(time),
+ "" + timeF.format(time),
+ "" + date_F.format(time) + "$" + date_F.format(time),
+ "Subject_0",
+ emailDataBlocklet2,
+ "" + new Random().nextDouble()))
+ }
+ writer.close()
+ }
+ }
+
+ private def clearDataMapCache(): Unit = {
+ if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+ val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(writerPath))
+ assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+ }
+ }
+
+ test("Test with long string columns") {
+ FileUtils.deleteDirectory(new File(writerPath))
+ // here we specify the long string column as varchar
+ val schema = new StringBuilder()
+ .append("[ \n")
+ .append(" {\"name\":\"string\"},\n")
+ .append(" {\"address\":\"varchar\"},\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"note\":\"varchar\"}\n")
+ .append("]")
+ .toString()
+ val builder = CarbonWriter.builder()
+ val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema))
+ .writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase").build()
+ val totalRecordsNum = 3
+ for (i <- 0 until totalRecordsNum) {
+ // write a varchar with 75,000 length
+ writer
+ .write(Array[String](s"name_$i",
+ RandomStringUtils.randomAlphabetic(75000),
+ i.toString,
+ RandomStringUtils.randomAlphabetic(75000)))
+ }
+ writer.close()
+
+ //--------------- data source external table with schema ---------------------------
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string)
+ |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address, note") """
+ .stripMargin)
+ } else {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string) USING
+ | carbon
+ |OPTIONS("long_String_columns"="address, note") LOCATION
+ |'$writerPath' """.stripMargin)
+ }
+ checkAnswer(sql("select count(*) from sdkOutputTable where age = 0"), Seq(Row(1)))
+ checkAnswer(sql(
+ "SELECT COUNT(*) FROM (select address,age,note from sdkOutputTable where length(address)" +
+ "=75000 and length(note)=75000)"),
+ Seq(Row(totalRecordsNum)))
+ checkAnswer(sql("select name from sdkOutputTable where age = 2"), Seq(Row("name_2")))
+ sql("DROP TABLE sdkOutputTable")
+
+ //--------------- data source external table without schema ---------------------------
+ sql("DROP TABLE IF EXISTS sdkOutputTableWithoutSchema")
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
+ |'$writerPath', "long_String_columns" "address, note") """.stripMargin)
+ } else {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
+ |("long_String_columns"="address, note") LOCATION '$writerPath' """.stripMargin)
+ }
+ checkAnswer(sql("select count(*) from sdkOutputTableWithoutSchema where age = 0"), Seq(Row(1)))
+ checkAnswer(sql(
+ "SELECT COUNT(*) FROM (select address,age,note from sdkOutputTableWithoutSchema where " +
+ "length(address)=75000 and length(note)=75000)"),
+ Seq(Row(totalRecordsNum)))
+ checkAnswer(sql("select name from sdkOutputTableWithoutSchema where age = 2"),
+ Seq(Row("name_2")))
+ sql("DROP TABLE sdkOutputTableWithoutSchema")
+ clearDataMapCache
+ cleanTestData()
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala
new file mode 100644
index 0000000..8f41ba7
--- /dev/null
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala
@@ -0,0 +1,1380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.cluster.sdv.generated.datasource
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File, InputStream}
+
+import scala.collection.mutable
+
+import org.apache.avro
+import org.apache.avro.file.DataFileWriter
+import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
+import org.apache.avro.io.{DecoderFactory, Encoder}
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.junit.Assert
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
+import org.apache.carbondata.hadoop.testutil.StoreCreator
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+
+class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
+
+ val warehouse1 = s"${TestQueryExecutor.projectPath}/integration/spark-datasource/target/warehouse"
+
+ test("test write using dataframe") {
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, "b", x))
+ .toDF("c1", "c2", "number")
+ sql("drop table if exists testformat")
+ // Saves dataframe to carbon file
+ df.write
+ .format("carbon").saveAsTable("testformat")
+ assert(sql("select * from testformat").count() == 10)
+ assert(sql("select * from testformat where c1='a0'").count() == 1)
+ checkAnswer(sql("select c1 from testformat where number = 7"), Seq(Row("a7")))
+ sql("drop table if exists testformat")
+ }
+
+ test("test write using ddl") {
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, "b", x))
+ .toDF("c1", "c2", "number")
+ sql("drop table if exists testparquet")
+ sql("drop table if exists testformat")
+ // Saves dataframe to carbon file
+ df.write
+ .format("parquet").saveAsTable("testparquet")
+ sql("create table carbon_table(c1 string, c2 string, number int) using carbon")
+ sql("insert into carbon_table select * from testparquet")
+ checkAnswer(sql("select * from carbon_table where c1='a1'"),
+ sql("select * from testparquet where c1='a1'"))
+ if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+ val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
+ assert(mapSize >= DataMapStoreManager.getInstance().getAllDataMaps.size())
+ }
+ sql("drop table if exists testparquet")
+ sql("drop table if exists testformat")
+ }
+
+ test("test read with df write") {
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, "b", x))
+ .toDF("c1", "c2", "number")
+
+ // Saves dataframe to carbon file
+ df.write.format("carbon").save(warehouse1 + "/test_folder/")
+
+ val frame = sqlContext.read.format("carbon").load(warehouse1 + "/test_folder")
+ assert(frame.count() == 10)
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ }
+
+ test("test write using subfolder") {
+ if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, "b", x))
+ .toDF("c1", "c2", "number")
+
+ // Saves dataframe to carbon file
+ df.write.format("carbon").save(warehouse1 + "/test_folder/" + System.nanoTime())
+ df.write.format("carbon").save(warehouse1 + "/test_folder/" + System.nanoTime())
+ df.write.format("carbon").save(warehouse1 + "/test_folder/" + System.nanoTime())
+
+ val frame = sqlContext.read.format("carbon").load(warehouse1 + "/test_folder")
+ assert(frame.where("c1='a1'").count() == 3)
+
+ val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/test_folder"))
+ assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ }
+ }
+
+ test("test write using partition ddl") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists testparquet")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, "b", x))
+ .toDF("c1", "c2", "number")
+
+ // Saves dataframe to carbon file
+ df.write
+ .format("parquet").partitionBy("c2").saveAsTable("testparquet")
+ sql(
+ "create table carbon_table(c1 string, c2 string, number int) using carbon PARTITIONED by " +
+ "(c2)")
+ sql("insert into carbon_table select * from testparquet")
+ // TODO fix in 2.1
+ if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"),
+ sql("select * from testparquet"))
+ }
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists testparquet")
+ }
+
+ test("test write with struct type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, ("b", "c"), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql(
+ "create table carbon_table(c1 string, c2 struct<a1:string, a2:string>, number int) using " +
+ "carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with array type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Array("b", "c"), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql("create table carbon_table(c1 string, c2 array<string>, number int) using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with nested array and struct type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Array(("1", "2"), ("3", "4")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql(
+ "create table carbon_table(c1 string, c2 array<struct<a1:string, a2:string>>, number int) " +
+ "using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with nested struct and array type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, (Array("1", "2"), ("3", "4")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql(
+ "create table carbon_table(c1 string, c2 struct<a1:array<string>, a2:struct<a1:string, " +
+ "a2:string>>, number int) using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with array type with value as nested map type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Array(Map("b" -> "c")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql(
+ "create table carbon_table(c1 string, c2 array<map<string,string>>, number int) using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with array type with value as nested array<array<map>> type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Array(Array(Map("b" -> "c"))), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql(
+ "create table carbon_table(c1 string, c2 array<array<map<string,string>>>, number int) " +
+ "using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with struct type with value as nested map type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, ("a", Map("b" -> "c")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql(
+ "create table carbon_table(c1 string, c2 struct<a1:string, a2:map<string,string>>, number " +
+ "int) using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with struct type with value as nested struct<array<map>> type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, ("a", Array(Map("b" -> "c"))), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql(
+ "create table carbon_table(c1 string, c2 struct<a1:string, a2:array<map<string,string>>>, " +
+ "number int) using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with map type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Map("b" -> "c"), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql("create table carbon_table(c1 string, c2 map<string, string>, number int) using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with map type with Int data type as key") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Map(99 -> "c"), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql("create table carbon_table(c1 string, c2 map<int, string>, number int) using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with map type with value as nested map type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Map("a" -> Map("b" -> "c")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql(
+ "create table carbon_table(c1 string, c2 map<string, map<string, string>>, number int) " +
+ "using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with map type with value as nested struct type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Map("a" -> ("b", "c")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql(
+ "create table carbon_table(c1 string, c2 map<string, struct<a1:string, a2:string>>, number " +
+ "int) using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with map type with value as nested array type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Map("a" -> Array("b", "c")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql(
+ "create table carbon_table(c1 string, c2 map<string, array<string>>, number int) using " +
+ "carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write using ddl and options") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists testparquet")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, "b", x))
+ .toDF("c1", "c2", "number")
+
+ // Saves dataframe to carbon file
+ df.write
+ .format("parquet").saveAsTable("testparquet")
+ sql(
+ "create table carbon_table(c1 string, c2 string, number int) using carbon options" +
+ "('table_blocksize'='256','inverted_index'='c1')")
+ checkExistence(sql("describe formatted carbon_table"), true, "table_blocksize")
+ checkExistence(sql("describe formatted carbon_table"), true, "inverted_index")
+ sql("insert into carbon_table select * from testparquet")
+ checkAnswer(sql("select * from carbon_table"), sql("select * from testparquet"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists testparquet")
+ }
+
+ test("test read with nested struct and array type without creating table") {
+ FileFactory
+ .deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_carbon_folder"))
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, (Array("1", "2"), ("3", "4")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ val frame = sql("select * from parquet_table")
+ frame.write.format("carbon").save(warehouse1 + "/test_carbon_folder")
+ val dfread = sqlContext.read.format("carbon").load(warehouse1 + "/test_carbon_folder")
+ FileFactory
+ .deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_carbon_folder"))
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test read and write with date datatype") {
+ sql("drop table if exists date_table")
+ sql("drop table if exists date_parquet_table")
+ sql("create table date_table(empno int, empname string, projdate Date) using carbon")
+ sql("insert into date_table select 11, 'ravi', '2017-11-11'")
+ sql("create table date_parquet_table(empno int, empname string, projdate Date) using parquet")
+ sql("insert into date_parquet_table select 11, 'ravi', '2017-11-11'")
+ checkAnswer(sql("select * from date_table"), sql("select * from date_parquet_table"))
+ sql("drop table if exists date_table")
+ sql("drop table if exists date_parquet_table")
+ }
+
+ test("test read and write with date datatype with wrong format") {
+ sql("drop table if exists date_table")
+ sql("drop table if exists date_parquet_table")
+ sql("create table date_table(empno int, empname string, projdate Date) using carbon")
+ sql("insert into date_table select 11, 'ravi', '11-11-2017'")
+ sql("create table date_parquet_table(empno int, empname string, projdate Date) using parquet")
+ sql("insert into date_parquet_table select 11, 'ravi', '11-11-2017'")
+ checkAnswer(sql("select * from date_table"), sql("select * from date_parquet_table"))
+ sql("drop table if exists date_table")
+ sql("drop table if exists date_parquet_table")
+ }
+
+ test("test read and write with timestamp datatype") {
+ sql("drop table if exists date_table")
+ sql("drop table if exists date_parquet_table")
+ sql("create table date_table(empno int, empname string, projdate timestamp) using carbon")
+ sql("insert into date_table select 11, 'ravi', '2017-11-11 00:00:01'")
+ sql(
+ "create table date_parquet_table(empno int, empname string, projdate timestamp) using " +
+ "parquet")
+ sql("insert into date_parquet_table select 11, 'ravi', '2017-11-11 00:00:01'")
+ checkAnswer(sql("select * from date_table"), sql("select * from date_parquet_table"))
+ sql("drop table if exists date_table")
+ sql("drop table if exists date_parquet_table")
+ }
+
+ test("test read and write with timestamp datatype with wrong format") {
+ sql("drop table if exists date_table")
+ sql("drop table if exists date_parquet_table")
+ sql("create table date_table(empno int, empname string, projdate timestamp) using carbon")
+ sql("insert into date_table select 11, 'ravi', '11-11-2017 00:00:01'")
+ sql(
+ "create table date_parquet_table(empno int, empname string, projdate timestamp) using " +
+ "parquet")
+ sql("insert into date_parquet_table select 11, 'ravi', '11-11-2017 00:00:01'")
+ checkAnswer(sql("select * from date_table"), sql("select * from date_parquet_table"))
+ sql("drop table if exists date_table")
+ sql("drop table if exists date_parquet_table")
+ }
+
+ test("test write with array type with filter") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Array("b", "c"), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql("create table carbon_table(c1 string, c2 array<string>, number int) using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table where c1='a1' and c2[0]='b'"),
+ sql("select * from parquet_table where c1='a1' and c2[0]='b'"))
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write with struct type with filter") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, (Array("1", "2"), ("3", "4")), Array(("1", 1), ("2", 2)), x))
+ .toDF("c1", "c2", "c3", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ sql(
+ "create table carbon_table(c1 string, c2 struct<a1:array<string>, a2:struct<a1:string, " +
+ "a2:string>>, c3 array<struct<a1:string, a2:int>>, number int) using carbon")
+ sql("insert into carbon_table select * from parquet_table")
+ assert(sql("select * from carbon_table").count() == 10)
+ checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
+ checkAnswer(sql("select * from carbon_table where c2.a1[0]='1' and c1='a1'"),
+ sql("select * from parquet_table where c2._1[0]='1' and c1='a1'"))
+ checkAnswer(sql("select * from carbon_table where c2.a1[0]='1' and c3[0].a2=1"),
+ sql("select * from parquet_table where c2._1[0]='1' and c3[0]._2=1"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test read with df write string issue") {
+ sql("drop table if exists test123")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, "b", x.toShort, x, x.toLong, x.toDouble, BigDecimal.apply(x),
+ Array(x + 1,
+ x), ("b", BigDecimal.apply(x))))
+ .toDF("c1", "c2", "shortc", "intc", "longc", "doublec", "bigdecimalc", "arrayc", "structc")
+
+ // Saves dataframe to carbon file
+ df.write.format("carbon").save(warehouse1 + "/test_folder/")
+ if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+ sql(s"create table test123 (c1 string, c2 string, shortc smallint,intc int, longc bigint, " +
+ s"doublec double, bigdecimalc decimal(38,18), arrayc array<int>, structc " +
+ s"struct<_1:string, _2:decimal(38,18)>) using carbon location '$warehouse1/test_folder/'")
+
+ checkAnswer(sql("select * from test123"),
+ sqlContext.read.format("carbon").load(warehouse1 + "/test_folder/"))
+ }
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ sql("drop table if exists test123")
+ }
+
+ test("test read with df write with empty data") {
+ sql("drop table if exists test123")
+ sql("drop table if exists test123_par")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ // Saves dataframe to carbon file
+ if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+ sql(s"create table test123 (c1 string, c2 string, arrayc array<int>, structc " +
+ s"struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint, " +
+ s"doublec double, bigdecimalc decimal(38,18)) using carbon location " +
+ s"'$warehouse1/test_folder/'")
+
+ sql(s"create table test123_par (c1 string, c2 string, arrayc array<int>, structc " +
+ s"struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint, " +
+ s"doublec double, bigdecimalc decimal(38,18)) using carbon location " +
+ s"'$warehouse1/test_folder/'")
+ checkAnswer(sql("select count(*) from test123"),
+ sql("select count(*) from test123_par"))
+ }
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ sql("drop table if exists test123")
+ sql("drop table if exists test123_par")
+ }
+
+ test("test write with nosort columns") {
+ sql("drop table if exists test123")
+ sql("drop table if exists test123_par")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, "b", x.toShort, x, x.toLong, x.toDouble, BigDecimal.apply(x),
+ Array(x + 1,
+ x), ("b", BigDecimal.apply(x))))
+ .toDF("c1", "c2", "shortc", "intc", "longc", "doublec", "bigdecimalc", "arrayc", "structc")
+
+ // Saves dataframe to carbon file
+ df.write.format("parquet").saveAsTable("test123_par")
+ if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+ sql(s"create table test123 (c1 string, c2 string, shortc smallint,intc int, longc bigint, " +
+ s"doublec double, bigdecimalc decimal(38,18), arrayc array<int>, structc " +
+ s"struct<_1:string, _2:decimal(38,18)>) using carbon options('sort_columns'='') " +
+ s"location '$warehouse1/test_folder/'")
+
+ sql(s"insert into test123 select * from test123_par")
+ checkAnswer(sql("select * from test123"), sql(s"select * from test123_par"))
+ }
+ sql("drop table if exists test123")
+ sql("drop table if exists test123_par")
+ }
+
+ test("test complex columns mismatch") {
+ sql("drop table if exists array_com_hive")
+ sql(s"drop table if exists array_com")
+ sql(
+ "create table array_com_hive (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, " +
+ "EDUCATED string, IS_MARRIED string, ARRAY_INT array<int>,ARRAY_STRING array<string>," +
+ "ARRAY_DATE array<timestamp>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT " +
+ "double, HQ_DEPOSIT double) row format delimited fields terminated by ',' collection items " +
+ "terminated by '$'")
+ val sourceFile = FileFactory
+ .getPath(s"$resourcesPath" + "../../../../../spark-datasource/src/test/resources/Array.csv")
+ .toString
+ sql(s"load data local inpath '$sourceFile' into table array_com_hive")
+ sql(
+ "create table Array_com (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, " +
+ "EDUCATED string, IS_MARRIED string, ARRAY_INT array<int>,ARRAY_STRING array<string>," +
+ "ARRAY_DATE array<timestamp>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT " +
+ "double, HQ_DEPOSIT double) using carbon")
+ sql("insert into Array_com select * from array_com_hive")
+ checkAnswer(sql("select * from Array_com order by CUST_ID ASC limit 3"),
+ sql("select * from array_com_hive order by CUST_ID ASC limit 3"))
+ sql("drop table if exists array_com_hive")
+ sql(s"drop table if exists array_com")
+ }
+
+ test("test complex columns fail while insert ") {
+ sql("drop table if exists STRUCT_OF_ARRAY_com_hive")
+ sql(s"drop table if exists STRUCT_OF_ARRAY_com")
+ sql(
+ " create table STRUCT_OF_ARRAY_com_hive (CUST_ID string, YEAR int, MONTH int, AGE int, " +
+ "GENDER string, EDUCATED string, IS_MARRIED string, STRUCT_OF_ARRAY struct<ID: int," +
+ "CHECK_DATE: timestamp ,SNo: array<int>,sal1: array<double>,state: array<string>," +
+ "date1: array<timestamp>>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT float, " +
+ "HQ_DEPOSIT double) row format delimited fields terminated by ',' collection items " +
+ "terminated by '$' map keys terminated by '&'")
+ val sourceFile = FileFactory
+ .getPath(
+ s"$resourcesPath" + "../../../../../spark-datasource/src/test/resources/structofarray.csv")
+ .toString
+ sql(s"load data local inpath '$sourceFile' into table STRUCT_OF_ARRAY_com_hive")
+ sql(
+ "create table STRUCT_OF_ARRAY_com (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER " +
+ "string, EDUCATED string, IS_MARRIED string, STRUCT_OF_ARRAY struct<ID: int," +
+ "CHECK_DATE: timestamp,SNo: array<int>,sal1: array<double>,state: array<string>," +
+ "date1: array<timestamp>>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT double," +
+ " HQ_DEPOSIT double) using carbon")
+ sql(" insert into STRUCT_OF_ARRAY_com select * from STRUCT_OF_ARRAY_com_hive")
+ checkAnswer(sql("select * from STRUCT_OF_ARRAY_com order by CUST_ID ASC"),
+ sql("select * from STRUCT_OF_ARRAY_com_hive order by CUST_ID ASC"))
+ sql("drop table if exists STRUCT_OF_ARRAY_com_hive")
+ sql(s"drop table if exists STRUCT_OF_ARRAY_com")
+ }
+
+ test("test partition error in carbon") {
+ sql("drop table if exists carbon_par")
+ sql("drop table if exists parquet_par")
+ sql(
+ "create table carbon_par (name string, age int, country string) using carbon partitioned by" +
+ " (country)")
+ sql("insert into carbon_par select 'b', '12', 'aa'")
+ sql(
+ "create table parquet_par (name string, age int, country string) using carbon partitioned " +
+ "by (country)")
+ sql("insert into parquet_par select 'b', '12', 'aa'")
+ checkAnswer(sql("select * from carbon_par"), sql("select * from parquet_par"))
+ sql("drop table if exists carbon_par")
+ sql("drop table if exists parquet_par")
+ }
+
+ test("test write and create table with sort columns not allow") {
+ sql("drop table if exists test123")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, "b", "c" + x, "d" + x, x.toShort, x, x.toLong, x.toDouble, BigDecimal
+ .apply(x)))
+ .toDF("c1", "c2", "c3", "c4", "shortc", "intc", "longc", "doublec", "bigdecimalc")
+
+ // Saves dataframe to carbon file
+ df.write.format("carbon").save(s"$warehouse1/test_folder/")
+ if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+ intercept[UnsupportedOperationException] {
+ sql(s"create table test123 using carbon options('sort_columns'='shortc,c2') location " +
+ s"'$warehouse1/test_folder/'")
+ }
+ }
+ sql("drop table if exists test123")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ }
+
+ test("valdate if path not specified during table creation") {
+ sql("drop table if exists test123")
+ val ex = intercept[AnalysisException] {
+ sql(s"create table test123 using carbon options('sort_columns'='shortc,c2')")
+ }
+ assert(ex.getMessage().contains("Unable to infer schema for carbon"))
+ }
+
+ test("test double boundary") {
+ sql("drop table if exists par")
+ sql("drop table if exists car")
+
+ sql("create table par (c1 string, c2 double, n int) using parquet")
+ sql("create table car (c1 string, c2 double, n int) using carbon")
+ sql("insert into par select 'a', 1.7986931348623157E308, 215565665556")
+ sql("insert into car select 'a', 1.7986931348623157E308, 215565665556")
+
+ checkAnswer(sql("select * from car"), sql("select * from par"))
+ sql("drop table if exists par")
+ sql("drop table if exists car")
+ }
+
+ test("test write using multi subfolder") {
+ if (!sqlContext.sparkContext.version.startsWith("2.1")) {
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, "b", x))
+ .toDF("c1", "c2", "number")
+
+ // Saves dataframe to carbon file
+ df.write.format("carbon").save(warehouse1 + "/test_folder/1/" + System.nanoTime())
+ df.write.format("carbon").save(warehouse1 + "/test_folder/2/" + System.nanoTime())
+ df.write.format("carbon").save(warehouse1 + "/test_folder/3/" + System.nanoTime())
+
+ val frame = sqlContext.read.format("carbon").load(warehouse1 + "/test_folder")
+ assert(frame.count() == 30)
+ assert(frame.where("c1='a1'").count() == 3)
+ val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/test_folder"))
+ assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+ }
+ }
+
+ test("test read using old data") {
+ val store = new StoreCreator(new File(warehouse1).getAbsolutePath,
+ new File(warehouse1 + "../../../../../hadoop/src/test/resources/data.csv").getCanonicalPath,
+ false)
+ store.createCarbonStore()
+ FileFactory
+ .deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_0/0"))
+ val dfread = sqlContext.read.format("carbon")
+ .load(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_0")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test write sdk and read with spark using different sort order data") {
+ sql("drop table if exists sdkout")
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk"))
+ buildTestDataOtherDataType(5, Array("age", "address"), warehouse1 + "/sdk")
+ sql(s"create table sdkout using carbon options(path='$warehouse1/sdk')")
+ assert(sql("select * from sdkout").collect().length == 5)
+ buildTestDataOtherDataType(5, Array("name", "salary"), warehouse1 + "/sdk")
+ sql("refresh table sdkout")
+ assert(sql("select * from sdkout where name = 'name1'").collect().length == 2)
+ assert(sql("select * from sdkout where salary=100").collect().length == 2)
+ buildTestDataOtherDataType(5, Array("name", "age"), warehouse1 + "/sdk")
+ sql("refresh table sdkout")
+ assert(sql("select * from sdkout where name='name0'").collect().length == 3)
+ assert(sql("select * from sdkout").collect().length == 15)
+ assert(sql("select * from sdkout where salary=100").collect().length == 3)
+ assert(sql("select * from sdkout where address='address1'").collect().length == 3)
+ buildTestDataOtherDataType(5, Array("name", "salary"), warehouse1 + "/sdk")
+ sql("refresh table sdkout")
+ assert(sql("select * from sdkout where name='name0'").collect().length == 4)
+ assert(sql("select * from sdkout").collect().length == 20)
+ assert(sql("select * from sdkout where salary=100").collect().length == 4)
+ assert(sql("select * from sdkout where address='address1'").collect().length == 4)
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk"))
+ }
+
+ test("test Float data type by giving schema explicitly and desc formatted") {
+ sql("drop table if exists sdkout")
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+ buildTestDataOtherDataType(5, Array("age", "address"), warehouse1 + "/sdk1")
+ sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+ s"string," +
+ s"salary long, floatField float, bytefield byte) using carbon options " +
+ s"(path='$warehouse1/sdk1')")
+ assert(sql("desc formatted sdkout").collect().take(7).reverse.head.get(1).equals("float"))
+ assert(sql("desc formatted sdkout").collect().take(8).reverse.head.get(1).equals
+ ("tinyint"))
+ }
+
+ test("test select * on table with float data type") {
+ sql("drop table if exists sdkout")
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+ buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+ sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+ s"string," +
+ s"salary long, floatField float, bytefield byte) using carbon options " +
+ s"(path='$warehouse1/sdk1')")
+ checkAnswer(sql("select * from par_table"), sql("select * from sdkout"))
+ checkAnswer(sql("select floatfield from par_table"), sql("select floatfield from sdkout"))
+ }
+
+ test("test various filters on float data") {
+ sql("drop table if exists sdkout")
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+ buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+ sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+ s"string," +
+ s"salary long, floatField float, bytefield byte) using carbon options " +
+ s"(path='$warehouse1/sdk1')")
+ checkAnswer(sql("select * from par_table where floatfield < 10"),
+ sql("select * from sdkout where floatfield < 10"))
+ checkAnswer(sql("select * from par_table where floatfield > 5.3"),
+ sql("select * from sdkout where floatfield > 5.3"))
+ checkAnswer(sql("select * from par_table where floatfield >= 4.1"),
+ sql("select * from sdkout where floatfield >= 4.1"))
+ checkAnswer(sql("select * from par_table where floatfield != 5.5"),
+ sql("select * from sdkout where floatfield != 5.5"))
+ checkAnswer(sql("select * from par_table where floatfield <= 5"),
+ sql("select * from sdkout where floatfield <= 5"))
+ checkAnswer(sql("select * from par_table where floatfield >= 5"),
+ sql("select * from sdkout where floatfield >= 5"))
+ checkAnswer(sql("select * from par_table where floatfield IN ('5.5','6.6')"),
+ sql("select * from sdkout where floatfield IN ('5.5','6.6')"))
+ checkAnswer(sql("select * from par_table where floatfield NOT IN ('5.5','6.6')"),
+ sql("select * from sdkout where floatfield NOT IN ('5.5','6.6')"))
+ checkAnswer(sql("select * from par_table where floatfield = cast('6.6' as float)"),
+ sql("select * from sdkout where floatfield = cast('6.6' as float)"))
+ }
+
+ test("test select * on table with byte data type") {
+ sql("drop table if exists sdkout")
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+ buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+ sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+ s"string," +
+ s"salary long, floatField float, bytefield byte) using carbon options " +
+ s"(path='$warehouse1/sdk1')")
+ checkAnswer(sql("select * from par_table"), sql("select * from sdkout"))
+ checkAnswer(sql("select byteField from par_table"), sql("select bytefield from sdkout"))
+ }
+
+ test("test various filters on byte data") {
+ sql("drop table if exists sdkout")
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+ buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+ sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+ s"string," +
+ s"salary long, floatField float, bytefield byte) using carbon options " +
+ s"(path='$warehouse1/sdk1')")
+ checkAnswer(sql("select * from par_table where bytefield < 10"),
+ sql("select * from sdkout where bytefield < 10"))
+ checkAnswer(sql("select * from par_table where bytefield > 5"),
+ sql("select * from sdkout where bytefield > 5"))
+ checkAnswer(sql("select * from par_table where bytefield >= 4"),
+ sql("select * from sdkout where bytefield >= 4"))
+ checkAnswer(sql("select * from par_table where bytefield != 5"),
+ sql("select * from sdkout where bytefield != 5"))
+ checkAnswer(sql("select * from par_table where bytefield <= 5"),
+ sql("select * from sdkout where bytefield <= 5"))
+ checkAnswer(sql("select * from par_table where bytefield >= 5"),
+ sql("select * from sdkout where bytefield >= 5"))
+ checkAnswer(sql("select * from par_table where bytefield IN ('5','6')"),
+ sql("select * from sdkout where bytefield IN ('5','6')"))
+ checkAnswer(sql("select * from par_table where bytefield NOT IN ('5','6')"),
+ sql("select * from sdkout where bytefield NOT IN ('5','6')"))
+ }
+
+ test("test struct of float type and byte type") {
+ import scala.collection.JavaConverters._
+ val path = FileFactory.getPath(warehouse1 + "/sdk1").toString
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+ sql("drop table if exists complextable")
+ val fields = List(new StructField
+ ("byteField", DataTypes.BYTE), new StructField("floatField", DataTypes.FLOAT))
+ val structType = Array(new Field("stringfield", DataTypes.STRING), new Field
+ ("structField", "struct", fields.asJava))
+
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ builder.outputPath(path)
+ .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
+ .withCsvInput(new Schema(structType)).writtenBy("SparkCarbonDataSourceTestCase").build()
+
+ var i = 0
+ while (i < 11) {
+ val array = Array[String](s"name$i", s"$i" + "$" + s"$i.${ i }12")
+ writer.write(array)
+ i += 1
+ }
+ writer.close()
+ sql("create table complextable (stringfield string, structfield struct<bytefield: " +
+ "byte, floatfield: float>) " +
+ s"using carbon location '$path'")
+ } catch {
+ case ex: Exception => throw new RuntimeException(ex)
+ case _ => None
+ }
+ checkAnswer(sql("select * from complextable limit 1"), Seq(Row("name0", Row(0
+ .asInstanceOf[Byte], 0.012.asInstanceOf[Float]))))
+ checkAnswer(sql("select * from complextable where structfield.bytefield > 9"), Seq(Row
+ ("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+ checkAnswer(sql("select * from complextable where structfield.bytefield > 9"), Seq(Row
+ ("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+ checkAnswer(sql("select * from complextable where structfield.floatfield > 9.912"), Seq
+ (Row
+ ("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+ checkAnswer(sql("select * from complextable where structfield.floatfield > 9.912 and " +
+ "structfield.bytefield < 11"),
+ Seq(Row("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+ }
+
+ test("test bytefield as sort column") {
+ val path = FileFactory.getPath(warehouse1 + "/sdk1").toString
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+ var fields: Array[Field] = new Array[Field](8)
+ // same column name, but name as boolean type
+ fields(0) = new Field("age", DataTypes.INT)
+ fields(1) = new Field("height", DataTypes.DOUBLE)
+ fields(2) = new Field("name", DataTypes.STRING)
+ fields(3) = new Field("address", DataTypes.STRING)
+ fields(4) = new Field("salary", DataTypes.LONG)
+ fields(5) = new Field("bytefield", DataTypes.BYTE)
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ builder.outputPath(path)
+ .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(Array("bytefield"))
+ .withCsvInput(new Schema(fields)).writtenBy("SparkCarbonDataSourceTestCase").build()
+
+ var i = 0
+ while (i < 11) {
+ val array = Array[String](
+ String.valueOf(i),
+ String.valueOf(i.toDouble / 2),
+ "name" + i,
+ "address" + i,
+ (i * 100).toString,
+ s"${ 10 - i }")
+ writer.write(array)
+ i += 1
+ }
+ writer.close()
+ sql("drop table if exists sorted_par")
+ sql("drop table if exists sort_table")
+ sql(s"create table sort_table (age int, height double, name string, address string," +
+ s" salary long, bytefield byte) using carbon location '$path'")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
+ sql(s"create table sorted_par(age int, height double, name string, address " +
+ s"string," +
+ s"salary long, bytefield byte) using parquet location " +
+ s"'$warehouse1/../warehouse2'")
+ (0 to 10).foreach {
+ i =>
+ sql(s"insert into sorted_par select '$i', ${ i.toDouble / 2 }, 'name$i', " +
+ s"'address$i', ${ i * 100 }, '${ 10 - i }'")
+ }
+ checkAnswer(sql("select * from sorted_par order by bytefield"),
+ sql("select * from sort_table"))
+ } catch {
+ case ex: Exception => throw new RuntimeException(ex)
+ case _ => None
+ }
+ }
+
+ test("test array of float type and byte type") {
+ import scala.collection.JavaConverters._
+ val path = FileFactory.getPath(warehouse1 + "/sdk1").toString
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+ sql("drop table if exists complextable")
+ val structType =
+ Array(new Field("stringfield", DataTypes.STRING),
+ new Field("bytearray", "array", List(new StructField("byteField", DataTypes.BYTE))
+ .asJava),
+ new Field("floatarray", "array", List(new StructField("floatfield", DataTypes.FLOAT))
+ .asJava))
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ builder.outputPath(path)
+ .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
+ .withCsvInput(new Schema(structType)).writtenBy("SparkCarbonDataSourceTestCase").build()
+
+ var i = 0
+ while (i < 10) {
+ val array = Array[String](s"name$i",
+ s"$i" + "$" + s"${ i * 2 }",
+ s"${ i / 2 }" + "$" + s"${ i / 3 }")
+ writer.write(array)
+ i += 1
+ }
+ writer.close()
+ sql(s"create table complextable (stringfield string, bytearray " +
+ s"array<byte>, floatarray array<float>) using carbon " +
+ s"location " +
+ s"'$path'")
+ } catch {
+ case ex: Exception => throw new RuntimeException(ex)
+ case _ => None
+ }
+ checkAnswer(sql("select * from complextable limit 1"), Seq(Row("name0", mutable
+ .WrappedArray.make(Array[Byte](0, 0)), mutable.WrappedArray.make(Array[Float](0.0f, 0.0f)))))
+ checkAnswer(sql("select * from complextable where bytearray[0] = 1"), Seq(Row("name1",
+ mutable.WrappedArray.make(Array[Byte](1, 2)), mutable.WrappedArray.make(Array[Float](0.0f,
+ 0.0f)))))
+ checkAnswer(sql("select * from complextable where bytearray[0] > 8"), Seq(Row("name9",
+ mutable.WrappedArray.make(Array[Byte](9, 18)), mutable.WrappedArray.make(Array[Float](4.0f,
+ 3.0f)))))
+ checkAnswer(sql(
+ "select * from complextable where floatarray[0] IN (4.0) and stringfield = 'name8'"), Seq(Row
+ ("name8",
+ mutable.WrappedArray.make(Array[Byte](8, 16)), mutable.WrappedArray.make(Array[Float](4.0f,
+ 2.0f)))))
+ }
+
+ private def createParquetTable {
+ val path = FileFactory.getUpdatedFilePath(s"$warehouse1/../warehouse2")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$path"))
+ sql(s"create table par_table(male boolean, age int, height double, name string, address " +
+ s"string," +
+ s"salary long, floatField float, bytefield byte) using parquet location '$path'")
+ (0 to 10).foreach {
+ i =>
+ sql(s"insert into par_table select 'true','$i', ${ i.toDouble / 2 }, 'name$i', " +
+ s"'address$i', ${ i * 100 }, $i.$i, '$i'")
+ }
+ }
+
+ // prepare sdk writer output with other schema
+ def buildTestDataOtherDataType(rows: Int,
+ sortColumns: Array[String],
+ writerPath: String,
+ colCount: Int = -1): Any = {
+ var fields: Array[Field] = new Array[Field](8)
+ // same column name, but name as boolean type
+ fields(0) = new Field("male", DataTypes.BOOLEAN)
+ fields(1) = new Field("age", DataTypes.INT)
+ fields(2) = new Field("height", DataTypes.DOUBLE)
+ fields(3) = new Field("name", DataTypes.STRING)
+ fields(4) = new Field("address", DataTypes.STRING)
+ fields(5) = new Field("salary", DataTypes.LONG)
+ fields(6) = new Field("floatField", DataTypes.FLOAT)
+ fields(7) = new Field("bytefield", DataTypes.BYTE)
+
+ if (colCount > 0) {
+ val fieldsToWrite: Array[Field] = new Array[Field](colCount)
+ var i = 0
+ while (i < colCount) {
+ fieldsToWrite(i) = fields(i)
+ i += 1
+ }
+ fields = fieldsToWrite
+ }
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ builder.outputPath(writerPath)
+ .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns)
+ .withCsvInput(new Schema(fields)).writtenBy("SparkCarbonDataSourceTestCase").build()
+
+ var i = 0
+ while (i < rows) {
+ val array = Array[String]("true",
+ String.valueOf(i),
+ String.valueOf(i.toDouble / 2),
+ "name" + i,
+ "address" + i,
+ (i * 100).toString,
+ s"$i.$i", s"$i")
+ if (colCount > 0) {
+ writer.write(array.slice(0, colCount))
+ } else {
+ writer.write(array)
+ }
+ i += 1
+ }
+ writer.close()
+ } catch {
+ case ex: Exception => throw new RuntimeException(ex)
+ case _ => None
+ }
+ }
+
+ def buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath: String, rows: Int): Unit = {
+ FileFactory.deleteAllFilesOfDir(new File(writerPath))
+ val mySchema =
+ """
+ |{
+ | "name": "address",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "name",
+ | "type": "string"
+ | },
+ | {
+ | "name": "age",
+ | "type": "int"
+ | },
+ | {
+ | "name": "structRecord",
+ | "type": {
+ | "type": "record",
+ | "name": "my_address",
+ | "fields": [
+ | {
+ | "name": "street",
+ | "type": "string"
+ | },
+ | {
+ | "name": "houseDetails",
+ | "type": {
+ | "type": "array",
+ | "items": {
+ | "name": "memberDetails",
+ | "type": "map",
+ | "values": "string"
+ | }
+ | }
+ | }
+ | ]
+ | }
+ | }
+ | ]
+ |}
+ """.stripMargin
+ val json =
+ """ {"name":"bob", "age":10, "structRecord": {"street":"street1",
+ |"houseDetails": [{"101": "Rahul", "102": "Pawan"}]}} """
+ .stripMargin
+ WriteFilesWithAvroWriter(writerPath, rows, mySchema, json)
+ }
+
+ test("test external table with struct type with value as nested struct<array<map>> type") {
+ val writerPath: String = FileFactory.getUpdatedFilePath(warehouse1 + "/sdk1")
+ val rowCount = 3
+ buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath, rowCount)
+ sql("drop table if exists carbon_external")
+ sql(s"create table carbon_external using carbon location '$writerPath'")
+ assert(sql("select * from carbon_external").count() == rowCount)
+ sql("drop table if exists carbon_external")
+ }
+
+ test("test byte and float for multiple pages") {
+ val path = FileFactory.getPath(warehouse1 + "/sdk1").toString
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+ sql("drop table if exists multi_page")
+ var fields: Array[Field] = new Array[Field](8)
+ // same column name, but name as boolean type
+ fields(0) = new Field("a", DataTypes.STRING)
+ fields(1) = new Field("b", DataTypes.FLOAT)
+ fields(2) = new Field("c", DataTypes.BYTE)
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ builder.outputPath(path)
+ .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
+ .withCsvInput(new Schema(fields)).writtenBy("SparkCarbonDataSourceTestCase").build()
+
+ var i = 0
+ while (i < 33000) {
+ val array = Array[String](
+ String.valueOf(i),
+ s"$i.3200", "32")
+ writer.write(array)
+ i += 1
+ }
+ writer.close()
+ sql(s"create table multi_page (a string, b float, c byte) using carbon location " +
+ s"'$path'")
+ assert(sql("select * from multi_page").count() == 33000)
+ } catch {
+ case ex: Exception => throw new RuntimeException(ex)
+ } finally {
+ FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/sdk1"))
+ }
+ }
+
+ test("test partition issue with add location") {
+ sql("drop table if exists partitionTable_obs")
+ sql("drop table if exists partitionTable_obs_par")
+ sql(s"create table partitionTable_obs (id int,name String,email String) using carbon " +
+ s"partitioned by(email) ")
+ sql(s"create table partitionTable_obs_par (id int,name String,email String) using parquet " +
+ s"partitioned by(email) ")
+ sql("insert into partitionTable_obs select 1,'huawei','abc'")
+ sql("insert into partitionTable_obs select 1,'huawei','bcd'")
+ sql(s"alter table partitionTable_obs add partition (email='def') location " +
+ s"'$warehouse1/test_folder121/'")
+ sql("insert into partitionTable_obs select 1,'huawei','def'")
+
+ sql("insert into partitionTable_obs_par select 1,'huawei','abc'")
+ sql("insert into partitionTable_obs_par select 1,'huawei','bcd'")
+ sql(s"alter table partitionTable_obs_par add partition (email='def') location " +
+ s"'$warehouse1/test_folder122/'")
+ sql("insert into partitionTable_obs_par select 1,'huawei','def'")
+
+ checkAnswer(sql("select * from partitionTable_obs"),
+ sql("select * from partitionTable_obs_par"))
+ sql("drop table if exists partitionTable_obs")
+ sql("drop table if exists partitionTable_obs_par")
+ }
+
+ test("test multiple partition select issue") {
+ sql("drop table if exists t_carbn01b_hive")
+ sql(s"drop table if exists t_carbn01b")
+ sql(
+ "create table t_carbn01b_hive(Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep " +
+ "DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String," +
+ "Create_date String,Active_status String,Item_type_cd INT, Update_time TIMESTAMP, " +
+ "Discount_price DOUBLE) using parquet partitioned by (Active_status,Item_type_cd, " +
+ "Update_time, Discount_price)")
+ sql(
+ "create table t_carbn01b(Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep " +
+ "DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String," +
+ "Create_date String,Active_status String,Item_type_cd INT, Update_time TIMESTAMP, " +
+ "Discount_price DOUBLE) using carbon partitioned by (Active_status,Item_type_cd, " +
+ "Update_time, Discount_price)")
+ sql(
+ "insert into t_carbn01b partition(Active_status, Item_type_cd,Update_time,Discount_price) " +
+ "select * from t_carbn01b_hive")
+ sql(
+ "alter table t_carbn01b add partition (active_status='xyz',Item_type_cd=12," +
+ "Update_time=NULL,Discount_price='3000')")
+ sql(
+ "insert overwrite table t_carbn01b select 'xyz', 12, 74,3000,20000000,121.5,4.99,2.44," +
+ "'RE3423ee','dddd', 'ssss','2012-01-02 23:04:05.12', '2012-01-20'")
+ sql(
+ "insert overwrite table t_carbn01b_hive select 'xyz', 12, 74,3000,20000000,121.5,4.99,2.44," +
+ "'RE3423ee','dddd', 'ssss','2012-01-02 23:04:05.12', '2012-01-20'")
+ checkAnswer(sql("select * from t_carbn01b_hive"), sql("select * from t_carbn01b"))
+ sql("drop table if exists t_carbn01b_hive")
+ sql(s"drop table if exists t_carbn01b")
+ }
+
+ test("Test Float value by having negative exponents") {
+ sql("DROP TABLE IF EXISTS float_p")
+ sql("DROP TABLE IF EXISTS float_c")
+ sql("CREATE TABLE float_p(f float) using parquet")
+ sql("CREATE TABLE float_c(f float) using carbon")
+ sql("INSERT INTO float_p select \"1.4E-3\"")
+ sql("INSERT INTO float_p select \"1.4E-38\"")
+ sql("INSERT INTO float_c select \"1.4E-3\"")
+ sql("INSERT INTO float_c select \"1.4E-38\"")
+ checkAnswer(sql("SELECT * FROM float_p"),
+ sql("SELECT * FROM float_c"))
+ sql("DROP TABLE float_p")
+ sql("DROP TABLE float_c")
+ }
+
+ test("test fileformat flow with drop and query on same table") {
+ sql("drop table if exists fileformat_drop")
+ sql("drop table if exists fileformat_drop_hive")
+ sql(
+ "create table fileformat_drop (imei string,AMSize string,channelsId string,ActiveCountry " +
+ "string, Activecity string,gamePointId double,deviceInformationId double,productionDate " +
+ "Timestamp,deliveryDate timestamp,deliverycharge double) using carbon options" +
+ "('table_blocksize'='1','LOCAL_DICTIONARY_ENABLE'='TRUE'," +
+ "'LOCAL_DICTIONARY_THRESHOLD'='1000')")
+ sql(
+ "create table fileformat_drop_hive(imei string,deviceInformationId double,AMSize string," +
+ "channelsId string,ActiveCountry string,Activecity string,gamePointId double,productionDate" +
+ " Timestamp,deliveryDate timestamp,deliverycharge double)row format delimited FIELDS " +
+ "terminated by ',' LINES terminated by '\n' stored as textfile")
+ val sourceFile = FileFactory
+ .getPath(s"$resourcesPath" +
+ "../../../../../spark-datasource/src/test/resources/vardhandaterestruct.csv")
+ .toString
+ sql(s"load data local inpath '$sourceFile' into table fileformat_drop_hive")
+ sql(
+ "insert into fileformat_drop select imei ,deviceInformationId ,AMSize ,channelsId ," +
+ "ActiveCountry ,Activecity ,gamePointId ,productionDate ,deliveryDate ,deliverycharge from " +
+ "fileformat_drop_hive")
+ assert(sql("select count(*) from fileformat_drop where imei='1AA10000'").collect().length == 1)
+
+ sql("drop table if exists fileformat_drop")
+ sql(
+ "create table fileformat_drop (imei string,deviceInformationId double,AMSize string," +
+ "channelsId string,ActiveCountry string,Activecity string,gamePointId float,productionDate " +
+ "timestamp,deliveryDate timestamp,deliverycharge decimal(10,2)) using carbon options" +
+ "('table_blocksize'='1','LOCAL_DICTIONARY_ENABLE'='true'," +
+ "'local_dictionary_threshold'='1000')")
+ sql(
+ "insert into fileformat_drop select imei ,deviceInformationId ,AMSize ,channelsId ," +
+ "ActiveCountry ,Activecity ,gamePointId ,productionDate ,deliveryDate ,deliverycharge from " +
+ "fileformat_drop_hive")
+ assert(sql("select count(*) from fileformat_drop where imei='1AA10000'").collect().length == 1)
+ sql("drop table if exists fileformat_drop")
+ sql("drop table if exists fileformat_drop_hive")
+ }
+
+ test("validate the columns not present in schema") {
+ sql("drop table if exists validate")
+ sql(
+ "create table validate (name string, age int, address string) using carbon options" +
+ "('inverted_index'='abc')")
+ val ex = intercept[Exception] {
+ sql("insert into validate select 'abc',4,'def'")
+ }
+ assert(ex.getMessage
+ .contains("column: abc specified in inverted index columns does not exist in schema"))
+ }
+
+ override protected def beforeAll(): Unit = {
+ drop
+ createParquetTable
+ }
+
+ override def afterAll(): Unit = {
+ drop
+ }
+
+ private def drop = {
+ sql("drop table if exists testformat")
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists testparquet")
+ sql("drop table if exists par_table")
+ sql("drop table if exists sdkout")
+ sql("drop table if exists validate")
+ }
+
+ private def jsonToAvro(json: String, avroSchema: String): GenericRecord = {
+ var input: InputStream = null
+ var writer: DataFileWriter[GenericRecord] = null
+ var encoder: Encoder = null
+ var output: ByteArrayOutputStream = null
+ try {
+ val schema = new org.apache.avro.Schema.Parser().parse(avroSchema)
+ val reader = new GenericDatumReader[GenericRecord](schema)
+ input = new ByteArrayInputStream(json.getBytes())
+ output = new ByteArrayOutputStream()
+ val din = new DataInputStream(input)
+ writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
+ writer.create(schema, output)
+ val decoder = DecoderFactory.get().jsonDecoder(schema, din)
+ var datum: GenericRecord = reader.read(null, decoder)
+ return datum
+ } finally {
+ input.close()
+ writer.close()
+ }
+ }
+
+ def WriteFilesWithAvroWriter(writerPath: String,
+ rows: Int,
+ mySchema: String,
+ json: String) = {
+ // conversion to GenericData.Record
+ val nn = new avro.Schema.Parser().parse(mySchema)
+ val record = jsonToAvro(json, mySchema)
+ try {
+ val writer = CarbonWriter.builder
+ .outputPath(writerPath)
+ .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).writtenBy("DataSource")
+ .build()
+ var i = 0
+ while (i < rows) {
+ writer.write(record)
+ i = i + 1
+ }
+ writer.close()
+ }
+ catch {
+ case e: Exception => {
+ e.printStackTrace()
+ Assert.fail(e.getMessage)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
index d117d69..f2ae2cb 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
@@ -20,6 +20,7 @@ import org.apache.spark.sql.test.TestQueryExecutor
import org.scalatest.{BeforeAndAfterAll, Suites}
import org.apache.carbondata.cluster.sdv.generated._
+import org.apache.carbondata.cluster.sdv.generated.datasource.{CreateTableUsingSparkCarbonFileFormatTestCase, SparkCarbonDataSourceTestCase}
import org.apache.carbondata.cluster.sdv.register.TestRegisterCarbonTable
import org.apache.carbondata.spark.testsuite.localdictionary.LoadTableWithLocalDictionaryTestCase
@@ -174,7 +175,9 @@ class SDVSuites3 extends Suites with BeforeAndAfterAll {
*/
class SDVSuites4 extends Suites with BeforeAndAfterAll {
- val suites = new CarbonV1toV3CompatabilityTestCase :: Nil
+ val suites = new CreateTableUsingSparkCarbonFileFormatTestCase ::
+ new SparkCarbonDataSourceTestCase ::
+ new CarbonV1toV3CompatabilityTestCase :: Nil
override val nestedSuites = suites.toIndexedSeq
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7ff3e68/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 49aa7ff..3ee75ad 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -23,10 +23,11 @@ import java.util.{Locale, TimeZone}
import org.apache.carbondata.common.logging.LogServiceFactory
import scala.collection.JavaConversions._
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.command.LoadDataCommand
-import org.apache.spark.sql.hive.{CarbonSessionCatalog}
+import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor}
import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext}
import org.scalatest.Suite
@@ -138,6 +139,8 @@ class QueryTest extends PlanTest with Suite {
val sqlContext: SQLContext = TestQueryExecutor.INSTANCE.sqlContext
+ sqlContext.sparkSession.experimental.extraOptimizations = Seq(new CarbonFileIndexReplaceRule)
+
val resourcesPath = TestQueryExecutor.resourcesPath
val hiveClient = sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]