You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/30 09:22:12 UTC
[22/35] carbondata git commit: [CARBONDATA-1597] Remove spark1
integration
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
deleted file mode 100644
index 824730f..0000000
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.File
-
-import scala.util.Random
-
-import org.apache.spark.sql.{CarbonContext, DataFrame, Row, SaveMode, SQLContext}
-import org.apache.spark.sql.types.{DataTypes, StructType}
-
-import org.apache.carbondata.examples.PerfTest._
-import org.apache.carbondata.examples.util.ExampleUtils
-
-// scalastyle:off println
-
-/**
- * represent one query
- */
-class Query(val queryType: String, val queryNo: Int, val sqlString: String) {
-
- /**
- * run the query in a batch and calculate average time
- *
- * @param sqlContext context to run the query
- * @param runs run how many time
- * @param datasource datasource to run
- */
- def run(sqlContext: SQLContext, runs: Int, datasource: String): QueryResult = {
- // run repeated and calculate average time elapsed
- require(runs >= 1)
- val sqlToRun = makeSQLString(datasource)
-
- val firstTime = withTime {
- sqlContext.sql(sqlToRun).collect
- }
-
- var totalTime: Long = 0
- var result: Array[Row] = null
- (1 to (runs - 1)).foreach { x =>
- totalTime += withTime {
- result = sqlContext.sql(sqlToRun).collect
- }
- }
-
- val avgTime = totalTime / (runs - 1)
- QueryResult(datasource, result, avgTime, firstTime)
- }
-
- private def makeSQLString(datasource: String): String = {
- sqlString.replaceFirst("tableName", PerfTest.makeTableName(datasource))
- }
-
-}
-
-/**
- * query performance result
- */
-case class QueryResult(datasource: String, result: Array[Row], avgTime: Long, firstTime: Long)
-
-class QueryRunner(sqlContext: SQLContext, dataFrame: DataFrame, datasources: Seq[String]) {
-
- /**
- * run a query on each datasource
- */
- def run(query: Query, runs: Int): Seq[QueryResult] = {
- var results = Seq[QueryResult]()
- datasources.foreach { datasource =>
- val result = query.run(sqlContext, runs, datasource)
- results :+= result
- }
- checkResult(results)
- results
- }
-
- private def checkResult(results: Seq[QueryResult]): Unit = {
- results.foldLeft(results.head) { (last, cur) =>
- if (last.result.sortBy(_.toString()).sameElements(cur.result.sortBy(_.toString()))) cur
- else sys.error(s"result is not the same between " +
- s"${last.datasource} and " +
- s"${cur.datasource}")
- }
- }
-
- private def loadToNative(datasource: String): Unit = {
- val savePath = PerfTest.savePath(datasource)
- println(s"loading data into $datasource, path: $savePath")
- dataFrame.write
- .mode(SaveMode.Overwrite)
- .format(datasource)
- .save(savePath)
- sqlContext.read
- .format(datasource)
- .load(savePath)
- .registerTempTable(PerfTest.makeTableName(datasource))
- }
-
- /**
- * load data to each datasource
- */
- def loadData: Seq[QueryResult] = {
- // load data into all datasources
- var results = Seq[QueryResult]()
- datasources.foreach { datasource =>
- val time = withTime {
- datasource match {
- case "parquet" =>
- dataFrame.sqlContext.setConf(s"spark.sql.$datasource.compression.codec", "snappy")
- loadToNative(datasource)
- case "orc" =>
- dataFrame.sqlContext.sparkContext.hadoopConfiguration.set("orc.compress", "SNAPPY")
- loadToNative(datasource)
- case "carbon" =>
- sqlContext.sql(s"DROP TABLE IF EXISTS ${PerfTest.makeTableName(datasource)}")
- println(s"loading data into $datasource, path: " +
- s"${dataFrame.sqlContext.asInstanceOf[CarbonContext].storePath}")
- dataFrame.write
- .format("org.apache.spark.sql.CarbonSource")
- .option("tableName", PerfTest.makeTableName(datasource))
- .mode(SaveMode.Overwrite)
- .save()
- case _ => sys.error("unsupported data source")
- }
- }
- println(s"load data into $datasource completed, time taken ${time/1000000}ms")
- results :+= QueryResult(datasource, null, time, time)
- }
- results
- }
-
- def shutDown(): Unit = {
- // drop all tables and temp files
- datasources.foreach {
- case datasource @ ("parquet" | "orc") =>
- val f = new File(PerfTest.savePath(datasource))
- if (f.exists()) f.delete()
- case "carbon" =>
- sqlContext.sql(s"DROP TABLE IF EXISTS ${PerfTest.makeTableName("carbon")}")
- case _ => sys.error("unsupported data source")
- }
- }
-}
-
-/**
- * template for table data generation
- *
- * @param dimension number of dimension columns and their cardinality
- * @param measure number of measure columns
- */
-case class TableTemplate(dimension: Seq[(Int, Int)], measure: Int)
-
-/**
- * utility to generate random data according to template
- */
-class TableGenerator(sqlContext: SQLContext) {
-
- /**
- * generate a dataframe from random data
- */
- def genDataFrame(template: TableTemplate, rows: Int): DataFrame = {
- val measures = template.measure
- val dimensions = template.dimension.foldLeft(0) {(x, y) => x + y._1}
- val cardinality = template.dimension.foldLeft(Seq[Int]()) {(x, y) =>
- x ++ (1 to y._1).map(z => y._2)
- }
- print(s"generating data: $rows rows of $dimensions dimensions and $measures measures. ")
- println("cardinality for each dimension: " + cardinality.mkString(", "))
-
- val dimensionFields = (1 to dimensions).map { id =>
- DataTypes.createStructField(s"c$id", DataTypes.StringType, false)
- }
- val measureFields = (dimensions + 1 to dimensions + measures).map { id =>
- DataTypes.createStructField(s"c$id", DataTypes.IntegerType, false)
- }
- val schema = StructType(dimensionFields ++ measureFields)
- val data = sqlContext.sparkContext.parallelize(1 to rows).map { x =>
- val random = new Random()
- val dimSeq = (1 to dimensions).map { y =>
- s"P${y}_${random.nextInt(cardinality(y - 1))}"
- }
- val msrSeq = (1 to measures).map { y =>
- random.nextInt(10)
- }
- Row.fromSeq(dimSeq ++ msrSeq)
- }
- val df = sqlContext.createDataFrame(data, schema)
- df.write.mode(SaveMode.Overwrite).parquet(PerfTest.savePath("temp"))
- sqlContext.parquetFile(PerfTest.savePath("temp"))
- }
-}
-
-object PerfTest {
-
- private val olap: Seq[String] = Seq(
- """SELECT c3, c4, sum(c8) FROM tableName
- |WHERE c1 = 'P1_23' and c2 = 'P2_43'
- |GROUP BY c3, c4""".stripMargin,
-
- """SELECT c2, c3, sum(c9) FROM tableName
- |WHERE c1 = 'P1_432' and c4 = 'P4_3' and c5 = 'P5_2'
- |GROUP by c2, c3 """.stripMargin,
-
- """SELECT c2, count(distinct c1), sum(c8) FROM tableName
- |WHERE c3="P3_4" and c5="P5_4"
- |GROUP BY c2 """.stripMargin,
-
- """SELECT c2, c5, count(distinct c1), sum(c7) FROM tableName
- |WHERE c4="P4_4" and c5="P5_7" and c8>4
- |GROUP BY c2, c5 """.stripMargin
- )
-
- private val point: Seq[String] = Seq(
- """SELECT c4 FROM tableName
- |WHERE c1="P1_43" """.stripMargin,
-
- """SELECT c3 FROM tableName
- |WHERE c1="P1_542" and c2="P2_23" """.stripMargin,
-
- """SELECT c3, c5 FROM tableName
- |WHERE c1="P1_52" and c7=4""".stripMargin,
-
- """SELECT c4, c9 FROM tableName
- |WHERE c1="P1_43" and c8<3""".stripMargin
- )
-
- private val filter: Seq[String] = Seq(
- """SELECT * FROM tableName
- |WHERE c2="P2_43" """.stripMargin,
-
- """SELECT * FROM tableName
- |WHERE c3="P3_3" """.stripMargin,
-
- """SELECT * FROM tableName
- |WHERE c2="P2_32" and c3="P3_23" """.stripMargin,
-
- """SELECT * FROM tableName
- |WHERE c3="P3_28" and c4="P4_3" """.stripMargin
- )
-
- private val scan: Seq[String] = Seq(
- """SELECT sum(c7), sum(c8), avg(c9), max(c10) FROM tableName """.stripMargin,
-
- """SELECT sum(c7) FROM tableName
- |WHERE c2="P2_32" """.stripMargin,
-
- """SELECT sum(c7), sum(c8), sum(9), sum(c10) FROM tableName
- |WHERE c4="P4_4" """.stripMargin,
-
- """SELECT sum(c7), sum(c8), sum(9), sum(c10) FROM tableName
- |WHERE c2="P2_75" and c6<5 """.stripMargin
- )
-
- def main(args: Array[String]) {
- val cc = ExampleUtils.createCarbonContext("PerfTest")
-
- // prepare performance queries
- var workload = Seq[Query]()
- olap.zipWithIndex.foreach(x => workload :+= new Query("OLAP Query", x._2, x._1))
- point.zipWithIndex.foreach(x => workload :+= new Query("Point Query", x._2, x._1))
- filter.zipWithIndex.foreach(x => workload :+= new Query("Filter Query", x._2, x._1))
- scan.zipWithIndex.foreach(x => workload :+= new Query("Scan Query", x._2, x._1))
-
- // prepare data
- val rows = 3 * 1000 * 1000
- val dimension = Seq((1, 1 * 1000), (1, 100), (1, 50), (2, 10)) // cardinality for each column
- val measure = 5 // number of measure
- val template = TableTemplate(dimension, measure)
- val df = new TableGenerator(cc).genDataFrame(template, rows)
- println("generate data completed")
-
- // run all queries against all data sources
- val datasource = Seq("parquet", "orc", "carbon")
- val runner = new QueryRunner(cc, df, datasource)
-
- val results = runner.loadData
- println(s"load performance: ${results.map(_.avgTime / 1000000L).mkString(", ")}")
-
- var parquetTime: Double = 0
- var orcTime: Double = 0
- var carbonTime: Double = 0
-
- println(s"query id: ${datasource.mkString(", ")}, result in millisecond")
- workload.foreach { query =>
- // run 4 times each round, will print performance of first run and avg time of last 3 runs
- print(s"${query.queryType} ${query.queryNo}: ")
- val results = runner.run(query, 4)
- print(s"${results.map(_.avgTime / 1000000L).mkString(", ")} ")
- println(s"[sql: ${query.sqlString.replace('\n', ' ')}]")
- parquetTime += results(0).avgTime
- orcTime += results(1).avgTime
- carbonTime += results(2).avgTime
- }
-
- println(s"Total time: ${parquetTime / 1000000}, ${orcTime / 1000000}, " +
- s"${carbonTime / 1000000} = 1 : ${parquetTime / orcTime} : ${parquetTime / carbonTime}")
- runner.shutDown()
- }
-
- def makeTableName(datasource: String): String = {
- s"${datasource}_perftest_table"
- }
-
- def savePath(datasource: String): String =
- s"${ExampleUtils.currentPath}/target/perftest/${datasource}"
-
- def withTime(body: => Unit): Long = {
- val start = System.nanoTime()
- body
- System.nanoTime() - start
- }
-
-}
-// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala
deleted file mode 100644
index 3ab61bf..0000000
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples.util
-
-import java.io.DataOutputStream
-
-import scala.collection.mutable.{ArrayBuffer, HashSet}
-
-import org.apache.spark.SparkContext
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory
-
-object AllDictionaryUtil {
- private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- def extractDictionary(sc: SparkContext,
- srcData: String,
- outputPath: String,
- fileHeader: String,
- dictCol: String): Unit = {
- val fileHeaderArr = fileHeader.split(",")
- val isDictCol = new Array[Boolean](fileHeaderArr.length)
- for (i <- 0 until fileHeaderArr.length) {
- if (dictCol.contains("|" + fileHeaderArr(i).toLowerCase() + "|")) {
- isDictCol(i) = true
- } else {
- isDictCol(i) = false
- }
- }
- val dictionaryRdd = sc.textFile(srcData).flatMap(x => {
- val tokens = x.split(",")
- val result = new ArrayBuffer[(Int, String)]()
- for (i <- 0 until isDictCol.length) {
- if (isDictCol(i)) {
- try {
- result += ((i, tokens(i)))
- } catch {
- case ex: ArrayIndexOutOfBoundsException =>
- LOGGER.error("Read a bad record: " + x)
- }
- }
- }
- result
- }).groupByKey().flatMap(x => {
- val distinctValues = new HashSet[(Int, String)]()
- for (value <- x._2) {
- distinctValues.add(x._1, value)
- }
- distinctValues
- })
- val dictionaryValues = dictionaryRdd.map(x => x._1 + "," + x._2).collect()
- saveToFile(dictionaryValues, outputPath)
- }
-
- def cleanDictionary(outputPath: String): Unit = {
- try {
- val fileType = FileFactory.getFileType(outputPath)
- val file = FileFactory.getCarbonFile(outputPath, fileType)
- if (file.exists()) {
- file.delete()
- }
- } catch {
- case ex: Exception =>
- LOGGER.error("Clean dictionary catching exception:" + ex)
- }
- }
-
- def saveToFile(contents: Array[String], outputPath: String): Unit = {
- var writer: DataOutputStream = null
- try {
- val fileType = FileFactory.getFileType(outputPath)
- val file = FileFactory.getCarbonFile(outputPath, fileType)
- if (!file.exists()) {
- file.createNewFile()
- }
- writer = FileFactory.getDataOutputStream(outputPath, fileType)
- for (content <- contents) {
- writer.writeBytes(content + "\n")
- }
- } catch {
- case ex: Exception =>
- LOGGER.error("Save dictionary to file catching exception:" + ex)
- } finally {
- if (writer != null) {
- try {
- writer.close()
- } catch {
- case ex: Exception =>
- LOGGER.error("Close output stream catching exception:" + ex)
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
deleted file mode 100644
index f98ec3b..0000000
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples.util
-
-import java.io.File
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{CarbonContext, SaveMode}
-
-import org.apache.carbondata.core.util.CarbonProperties
-
-// scalastyle:off println
-
-object ExampleUtils {
-
- def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../")
- .getCanonicalPath
- val storeLocation = currentPath + "/target/store"
-
- def createCarbonContext(appName: String): CarbonContext = {
- val sc = new SparkContext(new SparkConf()
- .setAppName(appName)
- .setMaster("local[2]"))
- sc.setLogLevel("ERROR")
-
- println(s"Starting $appName using spark version ${sc.version}")
-
- val cc = new CarbonContext(sc, storeLocation, currentPath + "/target/carbonmetastore")
-
- CarbonProperties.getInstance()
- .addProperty("carbon.storelocation", storeLocation)
- cc
- }
-
- /**
- * This func will write a sample CarbonData file containing following schema:
- * c1: String, c2: String, c3: Double
- * Returns table path
- */
- def writeSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): String = {
- cc.sql(s"DROP TABLE IF EXISTS $tableName")
- writeDataframe(cc, tableName, numRows, SaveMode.Overwrite)
- s"$storeLocation/default/$tableName"
- }
-
- /**
- * This func will append data to the CarbonData file
- * Returns table path
- */
- def appendSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): String = {
- writeDataframe(cc, tableName, numRows, SaveMode.Append)
- s"$storeLocation/default/$tableName"
- }
-
- /**
- * create a new dataframe and write to CarbonData file, based on save mode
- */
- private def writeDataframe(
- cc: CarbonContext, tableName: String, numRows: Int, mode: SaveMode): Unit = {
- // use CarbonContext to write CarbonData files
- import cc.implicits._
- val sc = cc.sparkContext
- val df = sc.parallelize(1 to numRows, 2)
- .map(x => ("a", "b", x))
- .toDF("c1", "c2", "c3")
-
- // save dataframe directl to carbon file without tempCSV
- df.write
- .format("carbondata")
- .option("tableName", tableName)
- .option("compress", "true")
- .option("tempCSV", "false")
- .mode(mode)
- .save()
- }
-
- def cleanSampleCarbonFile(cc: CarbonContext, tableName: String): Unit = {
- cc.sql(s"DROP TABLE IF EXISTS $tableName")
- }
-}
-// scalastyle:on println
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index 94af8ec..af25771 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -38,20 +38,6 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark2</artifactId>
<version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/hive/pom.xml
----------------------------------------------------------------------
diff --git a/integration/hive/pom.xml b/integration/hive/pom.xml
index 13bd581..e0ad499 100644
--- a/integration/hive/pom.xml
+++ b/integration/hive/pom.xml
@@ -67,20 +67,6 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark2</artifactId>
<version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
@@ -116,12 +102,6 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-hadoop</artifactId>
<version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index e11ce4c..13d351d 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -244,10 +244,6 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -371,10 +367,6 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- </exclusion>
</exclusions>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark-common-cluster-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml
index 9728a5c..e529035 100644
--- a/integration/spark-common-cluster-test/pom.xml
+++ b/integration/spark-common-cluster-test/pom.xml
@@ -40,12 +40,6 @@
<artifactId>carbondata-spark-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -167,16 +161,6 @@
<artifactId>carbondata-spark</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
</profile>
@@ -188,16 +172,6 @@
<artifactId>carbondata-spark</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
</profile>
@@ -212,16 +186,6 @@
<artifactId>carbondata-spark2</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
</profile>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index 7c67000..b2ee316 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -91,12 +91,6 @@
<artifactId>carbondata-spark-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -341,16 +335,6 @@
<artifactId>carbondata-spark</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
</profile>
@@ -362,16 +346,6 @@
<artifactId>carbondata-spark</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
</profile>
@@ -386,16 +360,6 @@
<artifactId>carbondata-spark2</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
</profile>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark-common/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 82ff7a4..d40e213 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -38,16 +38,6 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-processing</artifactId>
<version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/CARBON_SPARK_INTERFACELogResource.properties
----------------------------------------------------------------------
diff --git a/integration/spark/CARBON_SPARK_INTERFACELogResource.properties b/integration/spark/CARBON_SPARK_INTERFACELogResource.properties
deleted file mode 100644
index 61856cf..0000000
--- a/integration/spark/CARBON_SPARK_INTERFACELogResource.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-carbon.spark.interface = {0}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
deleted file mode 100644
index 5060809..0000000
--- a/integration/spark/pom.xml
+++ /dev/null
@@ -1,194 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-parent</artifactId>
- <version>1.3.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>carbondata-spark</artifactId>
- <name>Apache CarbonData :: Spark</name>
-
- <properties>
- <dev.path>${basedir}/../../dev</dev.path>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-processing</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-hadoop</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-spark-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_${scala.binary.version}</artifactId>
- <version>2.2.1</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <testSourceDirectory>src/test/scala</testSourceDirectory>
- <resources>
- <resource>
- <directory>src/resources</directory>
- </resource>
- <resource>
- <directory>.</directory>
- <includes>
- <include>CARBON_SPARK_INTERFACELogResource.properties</include>
- </includes>
- </resource>
- </resources>
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <version>2.15.2</version>
- <executions>
- <execution>
- <id>compile</id>
- <goals>
- <goal>compile</goal>
- </goals>
- <phase>compile</phase>
- </execution>
- <execution>
- <id>testCompile</id>
- <goals>
- <goal>testCompile</goal>
- </goals>
- <phase>test</phase>
- </execution>
- <execution>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.18</version>
- <!-- Note config is repeated in scalatest config -->
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
- <systemProperties>
- <java.awt.headless>true</java.awt.headless>
- </systemProperties>
- <failIfNoTests>false</failIfNoTests>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <version>1.0</version>
- <!-- Note config is repeated in surefire config -->
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <junitxml>.</junitxml>
- <filereports>CarbonTestSuite.txt</filereports>
- <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
- </argLine>
- <stderr />
- <environmentVariables>
- </environmentVariables>
- <systemProperties>
- <java.awt.headless>true</java.awt.headless>
- </systemProperties>
- </configuration>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <profiles>
- <profile>
- <id>sdvtest</id>
- <properties>
- <maven.test.skip>true</maven.test.skip>
- </properties>
- </profile>
- </profiles>
-
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
deleted file mode 100644
index 92c8402..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.readsupport;
-
-import java.io.IOException;
-import java.sql.Date;
-import java.sql.Timestamp;
-
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
-
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
-import org.apache.spark.unsafe.types.UTF8String;
-
-public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<Row> {
-
- @Override public void initialize(CarbonColumn[] carbonColumns,
- AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
- super.initialize(carbonColumns, absoluteTableIdentifier);
- //can initialize and generate schema here.
- }
-
- @Override public Row readRow(Object[] data) {
- for (int i = 0; i < dictionaries.length; i++) {
- if (data[i] == null) {
- continue;
- }
- if (dictionaries[i] != null) {
- data[i] = DataTypeUtil
- .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKeyInBytes((int) data[i]),
- (CarbonDimension) carbonColumns[i]);
- if (data[i] == null) {
- continue;
- }
- if (dataTypes[i] == DataTypes.STRING) {
- data[i] = UTF8String.fromString(data[i].toString());
- } else if (dataTypes[i] == DataTypes.TIMESTAMP) {
- data[i] = new Timestamp((long) data[i]);
- } else if (dataTypes[i] == DataTypes.DATE) {
- data[i] = new Date((long) data[i]);
- } else if (dataTypes[i] == DataTypes.LONG) {
- data[i] = data[i];
- }
- }
- else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- //convert the long to timestamp in case of direct dictionary column
- if (DataTypes.TIMESTAMP == carbonColumns[i].getDataType()) {
- data[i] = new Timestamp((long) data[i] / 1000L);
- } else if (DataTypes.DATE == carbonColumns[i].getDataType()) {
- data[i] = new Date((long) data[i]);
- }
- }
- }
- return new GenericRow(data);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
deleted file mode 100644
index 7881b93..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.command.LoadTable
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
-
-class CarbonDataFrameWriter(val dataFrame: DataFrame) {
-
- private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
- checkContext()
- val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
-
- // create a new table using dataframe's schema and write its content into the table
- cc.sql(makeCreateTableString(dataFrame.schema, new CarbonOption(parameters)))
- writeToCarbonFile(parameters)
- }
-
- def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
- // append the data as a new load
- checkContext()
- writeToCarbonFile(parameters)
- }
-
- private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
- val options = new CarbonOption(parameters)
- val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
- if (options.tempCSV) {
- loadTempCSV(options, cc)
- } else {
- loadDataFrame(options, cc)
- }
- }
-
- /**
- * Firstly, saving DataFrame to CSV files
- * Secondly, load CSV files
- * @param options
- * @param cc
- */
- private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = {
- // temporary solution: write to csv file, then load the csv into carbon
- val storePath = CarbonEnv.get.carbonMetastore.storePath
- val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
- .append("tempCSV")
- .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
- .append(CarbonCommonConstants.UNDERSCORE).append(options.tableName)
- .append(CarbonCommonConstants.UNDERSCORE).append(System.nanoTime()).toString
- writeToTempCSVFile(tempCSVFolder, options)
-
- val tempCSVPath = new Path(tempCSVFolder)
- val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
-
- def countSize(): Double = {
- var size: Double = 0
- val itor = fs.listFiles(tempCSVPath, true)
- while (itor.hasNext) {
- val f = itor.next()
- if (f.getPath.getName.startsWith("part-")) {
- size += f.getLen
- }
- }
- size
- }
-
- LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")
-
- try {
- cc.sql(makeLoadString(tempCSVFolder, options))
- } finally {
- fs.delete(tempCSVPath, true)
- }
- }
-
- private def checkContext(): Unit = {
- // To avoid derby problem, dataframe need to be writen and read using CarbonContext
- require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
- "Error in saving dataframe to carbon file, must use CarbonContext to save dataframe"
- )
- }
-
- private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = {
-
- val strRDD = dataFrame.rdd.mapPartitions { case iter =>
- new Iterator[String] {
- override def hasNext = iter.hasNext
-
- def convertToCSVString(seq: Seq[Any]): String = {
- val build = new java.lang.StringBuilder()
- if (seq.head != null) {
- build.append(seq.head.toString)
- }
- val itemIter = seq.tail.iterator
- while (itemIter.hasNext) {
- build.append(CarbonCommonConstants.COMMA)
- val value = itemIter.next()
- if (value != null) {
- build.append(value.toString)
- }
- }
- build.toString
- }
-
- override def next: String = {
- convertToCSVString(iter.next.toSeq)
- }
- }
- }
-
- if (options.compress) {
- strRDD.saveAsTextFile(tempCSVFolder, classOf[GzipCodec])
- } else {
- strRDD.saveAsTextFile(tempCSVFolder)
- }
- }
-
- /**
- * Loading DataFrame directly without saving DataFrame to CSV files.
- * @param options
- * @param cc
- */
- private def loadDataFrame(options: CarbonOption, cc: CarbonContext): Unit = {
- val header = dataFrame.columns.mkString(",")
- LoadTable(
- Some(options.dbName),
- options.tableName,
- null,
- Seq(),
- Map("fileheader" -> header) ++ options.toMap,
- isOverwriteExist = false,
- null,
- Some(dataFrame),
- None).run(cc)
- }
-
- private def convertToCarbonType(sparkType: DataType): String = {
- sparkType match {
- case StringType => CarbonType.STRING.getName
- case IntegerType => CarbonType.INT.getName
- case ShortType => "smallint"
- case LongType => "bigint"
- case FloatType => CarbonType.DOUBLE.getName
- case DoubleType => CarbonType.DOUBLE.getName
- case TimestampType => CarbonType.TIMESTAMP.getName
- case DateType => CarbonType.DATE.getName
- case decimal: DecimalType => s"${CarbonType.DECIMAL.getName} (${decimal.precision}" +
- s", ${decimal.scale})"
- case other => sys.error(s"unsupported type: $other")
- }
- }
-
- private def makeCreateTableString(schema: StructType, options: CarbonOption): String = {
- val properties = Map(
- "DICTIONARY_INCLUDE" -> options.dictionaryInclude,
- "DICTIONARY_EXCLUDE" -> options.dictionaryExclude
- ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
- val carbonSchema = schema.map { field =>
- s"${ field.name } ${ convertToCarbonType(field.dataType) }"
- }
- s"""
- CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
- (${ carbonSchema.mkString(", ") })
- STORED BY '${ CarbonContext.datasourceName }'
- ${ if (properties.nonEmpty) " TBLPROPERTIES (" + properties + ")" else ""}
- """
- }
-
- private def makeLoadString(csvFolder: String, options: CarbonOption): String = {
- s"""
- LOAD DATA INPATH '$csvFolder'
- INTO TABLE ${options.dbName}.${options.tableName}
- OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}',
- 'SINGLE_PASS' = '${options.singlePass}')
- """
- }
-
-
-}