You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:41:07 UTC

[42/50] [abbrv] carbondata git commit: [CARBONDATA-2355] Support run SQL on carbondata files directly

[CARBONDATA-2355] Support run SQL on carbondata files directly

Support run SQL on carbondata files directly

This closes #2181


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9469e6bd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9469e6bd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9469e6bd

Branch: refs/heads/spark-2.3
Commit: 9469e6bd4da5c75ba836fb550112cec01f666544
Parents: 4d22ddc
Author: xubo245 <60...@qq.com>
Authored: Wed Apr 18 17:34:12 2018 +0800
Committer: chenliang613 <ch...@huawei.com>
Committed: Fri Jun 1 18:01:33 2018 +0800

----------------------------------------------------------------------
 docs/sdk-guide.md                               |   7 ++
 .../carbondata/examples/DirectSQLExample.scala  | 100 +++++++++++++++++++
 .../carbondata/examples/S3UsingSDkExample.scala |   2 +-
 ...FileInputFormatWithExternalCarbonTable.scala |   2 +-
 ...tCreateTableUsingSparkCarbonFileFormat.scala |  30 +++++-
 .../TestNonTransactionalCarbonTable.scala       |   2 +-
 ...ransactionalCarbonTableWithComplexType.scala |   2 +-
 ...tSparkCarbonFileFormatWithSparkSession.scala |   2 +-
 .../datasources/SparkCarbonFileFormat.scala     |  26 ++++-
 9 files changed, 164 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 360516a..ec70919 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -128,7 +128,14 @@ Each of SQL data types are mapped into data types of SDK. Following are the mapp
 | STRING | DataTypes.STRING |
 | DECIMAL | DataTypes.createDecimalType(precision, scale) |
 
+## Run SQL on files directly
+Instead of creating table and query it, you can also query that file directly with SQL.
 
+### Example
+```
+SELECT * FROM carbonfile.`$Path`
+```
+Find example code at [DirectSQLExample](https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala) in the CarbonData repo.
 ## API List
 
 ### Class org.apache.carbondata.sdk.file.CarbonWriterBuilder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
new file mode 100644
index 0000000..a011d80
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.examples.util.ExampleUtils
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+
+/**
+ * Running SQL on carbon files directly
+ * No need to create table first
+ * TODO: support more than one carbon file
+ */
+object DirectSQLExample {
+
+  // prepare SDK writer output
+  def buildTestData(
+      path: String,
+      num: Int = 3,
+      persistSchema: Boolean = false): Any = {
+
+    // getCanonicalPath gives path with \, but the code expects /.
+    val writerPath = path.replace("\\", "/");
+
+    val fields: Array[Field] = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+    fields(2) = new Field("height", DataTypes.DOUBLE)
+
+    try {
+      val builder = CarbonWriter
+        .builder()
+        .outputPath(writerPath)
+        .isTransactionalTable(true)
+        .uniqueIdentifier(System.currentTimeMillis)
+        .withBlockSize(2)
+      if (persistSchema) {
+        builder.persistSchemaFile(true)
+      }
+      val writer = builder.buildWriterForCSVInput(new Schema(fields))
+      var i = 0
+      while (i < num) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case e: Exception => throw e
+    }
+  }
+
+  def cleanTestData(path: String): Unit = {
+    FileUtils.deleteDirectory(new File(path))
+  }
+
+  // scalastyle:off
+  def main(args: Array[String]) {
+    val carbonSession = ExampleUtils.createCarbonSession("DirectSQLExample")
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val path = s"$rootPath/examples/spark2/target/carbonFile/"
+
+    import carbonSession._
+    // 1. generate data file
+    cleanTestData(path)
+    buildTestData(path, 20)
+    val readPath = path + "Fact/Part0/Segment_null"
+
+    println("Running SQL on carbon files directly")
+    try {
+      // 2. run queries directly, no need to create table first
+      sql(s"""select * FROM  carbonfile.`$readPath` limit 10""".stripMargin).show()
+    } catch {
+      case e: Exception => throw e
+    } finally {
+      // 3.delete data files
+      cleanTestData(path)
+    }
+  }
+  // scalastyle:on
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
index 022b28e..1795960 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
@@ -36,7 +36,7 @@ object S3UsingSDKExample {
       num: Int = 3,
       persistSchema: Boolean = false): Any = {
 
-    // getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+    // getCanonicalPath gives path with \, but the code expects /.
     val writerPath = path.replace("\\", "/");
 
     val fields: Array[Field] = new Array[Field](3)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 019b915..e6d39d3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -38,7 +38,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
                             "../." +
                             "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
     .getCanonicalPath
-  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  //getCanonicalPath gives path with \, but the code expects /.
   writerPath = writerPath.replace("\\", "/");
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
index 66be8e4..211bc8c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -46,7 +46,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
                             "../." +
                             "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
     .getCanonicalPath
-  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  //getCanonicalPath gives path with \, but the code expects /.
   writerPath = writerPath.replace("\\", "/");
 
   val filePath = writerPath + "/Fact/Part0/Segment_null/"
@@ -153,6 +153,34 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
     cleanTestData()
   }
 
+  test("Running SQL directly and read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") {
+    buildTestData(false)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //data source file format
+    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
+    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
+           |'$filePath' """.stripMargin)
+    } else {
+      // TO DO
+    }
+
+    val directSQL = sql(s"""select * FROM  carbonfile.`$filePath`""".stripMargin)
+    directSQL.show(false)
+    checkAnswer(sql("select * from sdkOutputTable"), directSQL)
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+
 
   test("should not allow to alter datasource carbontable ") {
     buildTestData(false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 61b37d5..0083733 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -55,7 +55,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
                             "../." +
                             "./target/SparkCarbonFileFormat/WriterOutput/")
     .getCanonicalPath
-  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  //getCanonicalPath gives path with \, but the code expects /.
   writerPath = writerPath.replace("\\", "/")
 
   def buildTestDataSingleFile(): Any = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index d4de428..19aaf72 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -39,7 +39,7 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
                             "../." +
                             "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
     .getCanonicalPath
-  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  //getCanonicalPath gives path with \, but the code expects /.
   writerPath = writerPath.replace("\\", "/")
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
index 54b23a5..79b64ae 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -36,7 +36,7 @@ object TestSparkCarbonFileFormatWithSparkSession {
                             "../." +
                             "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
     .getCanonicalPath
-  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  //getCanonicalPath gives path with \, but the code expects /.
   writerPath = writerPath.replace("\\", "/");
 
   val filePath = writerPath + "/Fact/Part0/Segment_null/"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9469e6bd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
index 934f5c7..697eec5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import java.net.URI
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.conf.Configuration
@@ -68,8 +69,23 @@ class SparkCarbonFileFormat extends FileFormat
   override def inferSchema(sparkSession: SparkSession,
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
-    val filePaths = CarbonUtil.getFilePathExternalFilePath(
-      options.get("path").get)
+    val filePaths = if (options.isEmpty) {
+      val carbondataFiles = files.seq.filter { each =>
+        if (each.isFile) {
+          each.getPath.getName.contains(".carbondata")
+        } else {
+          false
+        }
+      }
+
+      carbondataFiles.map { each =>
+        each.getPath.toString
+      }.toList.asJava
+    } else {
+      CarbonUtil.getFilePathExternalFilePath(
+        options.get("path").get)
+    }
+
     if (filePaths.size() == 0) {
       throw new SparkException("CarbonData file is not present in the location mentioned in DDL")
     }
@@ -193,7 +209,11 @@ class SparkCarbonFileFormat extends FileFormat
         val fileSplit =
           new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
 
-        val path: String = options.get("path").get
+        val path: String = if (options.isEmpty) {
+          file.filePath
+        } else {
+          options.get("path").get
+        }
         val endindex: Int = path.indexOf("Fact") - 1
         val tablePath = path.substring(0, endindex)
         lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(