You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/06/05 10:42:02 UTC
[17/26] carbondata git commit: [CARBONDATA-2355] Support run SQL on
carbondata files directly
[CARBONDATA-2355] Support run SQL on carbondata files directly
Support run SQL on carbondata files directly
This closes #2181
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/75f638e3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/75f638e3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/75f638e3
Branch: refs/heads/branch-1.4
Commit: 75f638e35838e58a0677ae128e437d5cdf3b5abb
Parents: d0dc822
Author: xubo245 <60...@qq.com>
Authored: Wed Apr 18 17:34:12 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Jun 5 16:04:20 2018 +0530
----------------------------------------------------------------------
docs/sdk-guide.md | 7 ++
.../carbondata/examples/DirectSQLExample.scala | 100 +++++++++++++++++++
.../carbondata/examples/S3UsingSDkExample.scala | 2 +-
...FileInputFormatWithExternalCarbonTable.scala | 2 +-
...tCreateTableUsingSparkCarbonFileFormat.scala | 30 +++++-
.../TestNonTransactionalCarbonTable.scala | 2 +-
...ransactionalCarbonTableWithComplexType.scala | 2 +-
...tSparkCarbonFileFormatWithSparkSession.scala | 2 +-
.../datasources/SparkCarbonFileFormat.scala | 26 ++++-
9 files changed, 164 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/75f638e3/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 360516a..ec70919 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -128,7 +128,14 @@ Each of SQL data types are mapped into data types of SDK. Following are the mapp
| STRING | DataTypes.STRING |
| DECIMAL | DataTypes.createDecimalType(precision, scale) |
+## Run SQL on files directly
+Instead of creating table and query it, you can also query that file directly with SQL.
+### Example
+```
+SELECT * FROM carbonfile.`$Path`
+```
+Find example code at [DirectSQLExample](https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala) in the CarbonData repo.
## API List
### Class org.apache.carbondata.sdk.file.CarbonWriterBuilder
http://git-wip-us.apache.org/repos/asf/carbondata/blob/75f638e3/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
new file mode 100644
index 0000000..a011d80
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.examples.util.ExampleUtils
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+
+/**
+ * Running SQL on carbon files directly
+ * No need to create table first
+ * TODO: support more than one carbon file
+ */
+object DirectSQLExample {
+
+ // prepare SDK writer output
+ def buildTestData(
+ path: String,
+ num: Int = 3,
+ persistSchema: Boolean = false): Any = {
+
+ // getCanonicalPath gives path with \, but the code expects /.
+ val writerPath = path.replace("\\", "/");
+
+ val fields: Array[Field] = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+ fields(2) = new Field("height", DataTypes.DOUBLE)
+
+ try {
+ val builder = CarbonWriter
+ .builder()
+ .outputPath(writerPath)
+ .isTransactionalTable(true)
+ .uniqueIdentifier(System.currentTimeMillis)
+ .withBlockSize(2)
+ if (persistSchema) {
+ builder.persistSchemaFile(true)
+ }
+ val writer = builder.buildWriterForCSVInput(new Schema(fields))
+ var i = 0
+ while (i < num) {
+ writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+ i += 1
+ }
+ writer.close()
+ } catch {
+ case e: Exception => throw e
+ }
+ }
+
+ def cleanTestData(path: String): Unit = {
+ FileUtils.deleteDirectory(new File(path))
+ }
+
+ // scalastyle:off
+ def main(args: Array[String]) {
+ val carbonSession = ExampleUtils.createCarbonSession("DirectSQLExample")
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val path = s"$rootPath/examples/spark2/target/carbonFile/"
+
+ import carbonSession._
+ // 1. generate data file
+ cleanTestData(path)
+ buildTestData(path, 20)
+ val readPath = path + "Fact/Part0/Segment_null"
+
+ println("Running SQL on carbon files directly")
+ try {
+ // 2. run queries directly, no need to create table first
+ sql(s"""select * FROM carbonfile.`$readPath` limit 10""".stripMargin).show()
+ } catch {
+ case e: Exception => throw e
+ } finally {
+ // 3.delete data files
+ cleanTestData(path)
+ }
+ }
+ // scalastyle:on
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/75f638e3/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
index 022b28e..1795960 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
@@ -36,7 +36,7 @@ object S3UsingSDKExample {
num: Int = 3,
persistSchema: Boolean = false): Any = {
- // getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ // getCanonicalPath gives path with \, but the code expects /.
val writerPath = path.replace("\\", "/");
val fields: Array[Field] = new Array[Field](3)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/75f638e3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 019b915..e6d39d3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -38,7 +38,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
"../." +
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
- //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ //getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/75f638e3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
index 66be8e4..211bc8c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -46,7 +46,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
"../." +
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
- //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ //getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/");
val filePath = writerPath + "/Fact/Part0/Segment_null/"
@@ -153,6 +153,34 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
cleanTestData()
}
+ test("Running SQL directly and read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") {
+ buildTestData(false)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //data source file format
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else {
+ // TO DO
+ }
+
+ val directSQL = sql(s"""select * FROM carbonfile.`$filePath`""".stripMargin)
+ directSQL.show(false)
+ checkAnswer(sql("select * from sdkOutputTable"), directSQL)
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+
test("should not allow to alter datasource carbontable ") {
buildTestData(false)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/75f638e3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 61b37d5..0083733 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -55,7 +55,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
"../." +
"./target/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
- //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ //getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/")
def buildTestDataSingleFile(): Any = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/75f638e3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index d4de428..19aaf72 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -39,7 +39,7 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
"../." +
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
- //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ //getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/75f638e3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
index 54b23a5..79b64ae 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -36,7 +36,7 @@ object TestSparkCarbonFileFormatWithSparkSession {
"../." +
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
- //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ //getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/");
val filePath = writerPath + "/Fact/Part0/Segment_null/"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/75f638e3/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
index 934f5c7..697eec5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import java.net.URI
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.conf.Configuration
@@ -68,8 +69,23 @@ class SparkCarbonFileFormat extends FileFormat
override def inferSchema(sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
- val filePaths = CarbonUtil.getFilePathExternalFilePath(
- options.get("path").get)
+ val filePaths = if (options.isEmpty) {
+ val carbondataFiles = files.seq.filter { each =>
+ if (each.isFile) {
+ each.getPath.getName.contains(".carbondata")
+ } else {
+ false
+ }
+ }
+
+ carbondataFiles.map { each =>
+ each.getPath.toString
+ }.toList.asJava
+ } else {
+ CarbonUtil.getFilePathExternalFilePath(
+ options.get("path").get)
+ }
+
if (filePaths.size() == 0) {
throw new SparkException("CarbonData file is not present in the location mentioned in DDL")
}
@@ -193,7 +209,11 @@ class SparkCarbonFileFormat extends FileFormat
val fileSplit =
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
- val path: String = options.get("path").get
+ val path: String = if (options.isEmpty) {
+ file.filePath
+ } else {
+ options.get("path").get
+ }
val endindex: Int = path.indexOf("Fact") - 1
val tablePath = path.substring(0, endindex)
lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(