You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/16 09:33:58 UTC
[1/6] carbondata git commit: [CARBONDATA-2255] Rename the streaming
examples
Repository: carbondata
Updated Branches:
refs/heads/carbonfile 6cb6f8380 -> 7a124ecd8
[CARBONDATA-2255] Rename the streaming examples
optimize streaming examples
This closes #2064
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d4f9003a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d4f9003a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d4f9003a
Branch: refs/heads/carbonfile
Commit: d4f9003af3593d30799b573ce729de1918b8c800
Parents: 6cb6f83
Author: QiangCai <qi...@qq.com>
Authored: Wed Mar 14 17:42:39 2018 +0800
Committer: chenliang613 <ch...@huawei.com>
Committed: Thu Mar 15 15:33:00 2018 +0800
----------------------------------------------------------------------
.../CarbonBatchSparkStreamingExample.scala | 207 ------------------
.../CarbonStreamSparkStreamingExample.scala | 213 ------------------
.../CarbonStructuredStreamingExample.scala | 200 -----------------
...CarbonStructuredStreamingWithRowParser.scala | 216 -------------------
.../examples/SparkStreamingExample.scala | 213 ++++++++++++++++++
.../StreamingUsingBatchLoadExample.scala | 207 ++++++++++++++++++
.../StreamingWithRowParserExample.scala | 216 +++++++++++++++++++
.../examples/StructuredStreamingExample.scala | 200 +++++++++++++++++
8 files changed, 836 insertions(+), 836 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
deleted file mode 100644
index bcbf190..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession}
-import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
-
-/**
- * This example introduces how to use CarbonData batch load to integrate
- * with Spark Streaming(it's DStream, not Spark Structured Streaming)
- */
-// scalastyle:off println
-
-case class DStreamData(id: Int, name: String, city: String, salary: Float)
-
-object CarbonBatchSparkStreamingExample {
-
- def main(args: Array[String]): Unit = {
-
- // setup paths
- val rootPath = new File(this.getClass.getResource("/").getPath
- + "../../../..").getCanonicalPath
- val checkpointPath =
- s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
- System.currentTimeMillis().toString()
- val streamTableName = s"dstream_batch_table"
-
- val spark = ExampleUtils.createCarbonSession("CarbonBatchSparkStreamingExample", 4)
-
- val requireCreateTable = true
-
- if (requireCreateTable) {
- // drop table if exists previously
- spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
- // Create target carbon table and populate with initial data
- // set AUTO_LOAD_MERGE to true to compact segment automatically
- spark.sql(
- s"""
- | CREATE TABLE ${ streamTableName }(
- | id INT,
- | name STRING,
- | city STRING,
- | salary FLOAT
- | )
- | STORED BY 'carbondata'
- | TBLPROPERTIES(
- | 'sort_columns'='name',
- | 'dictionary_include'='city',
- | 'AUTO_LOAD_MERGE'='true',
- | 'COMPACTION_LEVEL_THRESHOLD'='4,10')
- | """.stripMargin)
-
- val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
- // batch load
- val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
- spark.sql(
- s"""
- | LOAD DATA LOCAL INPATH '$path'
- | INTO TABLE $streamTableName
- | OPTIONS('HEADER'='true')
- """.stripMargin)
-
- // streaming ingest
- val serverSocket = new ServerSocket(7071)
- val thread1 = writeSocket(serverSocket)
- val thread2 = showTableCount(spark, streamTableName)
- val ssc = startStreaming(spark, streamTableName, checkpointPath)
- // wait for stop signal to stop Spark Streaming App
- waitForStopSignal(ssc)
- // it need to start Spark Streaming App in main thread
- // otherwise it will encounter an not-serializable exception.
- ssc.start()
- ssc.awaitTermination()
- thread1.interrupt()
- thread2.interrupt()
- serverSocket.close()
- }
-
- spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
-
- spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
-
- // record(id = 100000001) comes from batch segment_0
- // record(id = 1) comes from stream segment_1
- spark.sql(s"select * " +
- s"from ${ streamTableName } " +
- s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
-
- // not filter
- spark.sql(s"select * " +
- s"from ${ streamTableName } " +
- s"where id < 10 limit 100").show(100, truncate = false)
-
- // show segments
- spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
-
- // drop table
- spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-
- spark.stop()
- System.out.println("streaming finished")
- }
-
- def showTableCount(spark: SparkSession, tableName: String): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- for (_ <- 0 to 1000) {
- spark.sql(s"select count(*) from $tableName").show(truncate = false)
- spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
- Thread.sleep(1000 * 5)
- }
- }
- }
- thread.start()
- thread
- }
-
- def waitForStopSignal(ssc: StreamingContext): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
- new ServerSocket(7072).accept()
- // don't stop SparkContext here
- ssc.stop(false, true)
- }
- }
- thread.start()
- thread
- }
-
- def startStreaming(spark: SparkSession, tableName: String,
- checkpointPath: String): StreamingContext = {
- var ssc: StreamingContext = null
- try {
- // recommend: the batch interval must set larger, such as 30s, 1min.
- ssc = new StreamingContext(spark.sparkContext, Seconds(15))
- ssc.checkpoint(checkpointPath)
-
- val readSocketDF = ssc.socketTextStream("localhost", 7071)
-
- val batchData = readSocketDF
- .map(_.split(","))
- .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
-
- batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
- val df = spark.createDataFrame(rdd).toDF("id", "name", "city", "salary")
- println("at time: " + time.toString() + " the count of received data: " + df.count())
- df.write
- .format("carbondata")
- .option("tableName", tableName)
- .mode(SaveMode.Append)
- .save()
- }}
- } catch {
- case ex: Exception =>
- ex.printStackTrace()
- println("Done reading and writing streaming data")
- }
- ssc
- }
-
- def writeSocket(serverSocket: ServerSocket): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- // wait for client to connection request and accept
- val clientSocket = serverSocket.accept()
- val socketWriter = new PrintWriter(clientSocket.getOutputStream())
- var index = 0
- for (_ <- 1 to 1000) {
- // write 5 records per iteration
- for (_ <- 0 to 1000) {
- index = index + 1
- socketWriter.println(index.toString + ",name_" + index
- + ",city_" + index + "," + (index * 10000.00).toString +
- ",school_" + index + ":school_" + index + index + "$" + index)
- }
- socketWriter.flush()
- Thread.sleep(1000)
- }
- socketWriter.close()
- System.out.println("Socket closed")
- }
- }
- thread.start()
- thread
- }
-}
-// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
deleted file mode 100644
index 856084b..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.CarbonSparkStreamingFactory
-import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.streaming.CarbonSparkStreamingListener
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
-
-/**
- * This example introduces how to use Spark Streaming to write data
- * to CarbonData stream table.
- *
- * NOTE: Current integration with Spark Streaming is an alpha feature.
- */
-// scalastyle:off println
-object CarbonStreamSparkStreamingExample {
-
- def main(args: Array[String]): Unit = {
-
- // setup paths
- val rootPath = new File(this.getClass.getResource("/").getPath
- + "../../../..").getCanonicalPath
- val checkpointPath =
- s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
- System.currentTimeMillis().toString()
- val streamTableName = s"dstream_stream_table"
-
- val spark = ExampleUtils.createCarbonSession("CarbonStreamSparkStreamingExample", 4)
-
- val requireCreateTable = true
-
- if (requireCreateTable) {
- // drop table if exists previously
- spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
- // Create target carbon table and populate with initial data
- spark.sql(
- s"""
- | CREATE TABLE ${ streamTableName }(
- | id INT,
- | name STRING,
- | city STRING,
- | salary FLOAT
- | )
- | STORED BY 'carbondata'
- | TBLPROPERTIES(
- | 'streaming'='true',
- | 'sort_columns'='name',
- | 'dictionary_include'='city')
- | """.stripMargin)
- val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
- // batch load
- val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
- spark.sql(
- s"""
- | LOAD DATA LOCAL INPATH '$path'
- | INTO TABLE $streamTableName
- | OPTIONS('HEADER'='true')
- """.stripMargin)
-
- // streaming ingest
- val serverSocket = new ServerSocket(7071)
- val thread1 = writeSocket(serverSocket)
- val thread2 = showTableCount(spark, streamTableName)
- val ssc = startStreaming(spark, streamTableName, checkpointPath)
- // add a Spark Streaming Listener to remove all lock for stream tables when stop app
- ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
- // wait for stop signal to stop Spark Streaming App
- waitForStopSignal(ssc)
- // it need to start Spark Streaming App in main thread
- // otherwise it will encounter an not-serializable exception.
- ssc.start()
- ssc.awaitTermination()
- thread1.interrupt()
- thread2.interrupt()
- serverSocket.close()
- }
-
- spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
-
- spark.sql(s"select * from ${ streamTableName } order by id desc").show(100, truncate = false)
-
- // record(id = 100000001) comes from batch segment_0
- // record(id = 1) comes from stream segment_1
- spark.sql(s"select * " +
- s"from ${ streamTableName } " +
- s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
-
- // not filter
- spark.sql(s"select * " +
- s"from ${ streamTableName } " +
- s"where id < 10 limit 100").show(100, truncate = false)
-
- // show segments
- spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
-
- spark.stop()
- System.out.println("streaming finished")
- }
-
- def showTableCount(spark: SparkSession, tableName: String): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- for (_ <- 0 to 1000) {
- println(System.currentTimeMillis())
- spark.sql(s"select count(*) from $tableName").show(truncate = false)
- spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
- Thread.sleep(1000 * 5)
- }
- }
- }
- thread.start()
- thread
- }
-
- def waitForStopSignal(ssc: StreamingContext): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
- new ServerSocket(7072).accept()
- // don't stop SparkContext here
- ssc.stop(false, true)
- }
- }
- thread.start()
- thread
- }
-
- def startStreaming(spark: SparkSession, tableName: String,
- checkpointPath: String): StreamingContext = {
- var ssc: StreamingContext = null
- try {
- // recommend: the batch interval must set larger, such as 30s, 1min.
- ssc = new StreamingContext(spark.sparkContext, Seconds(30))
- ssc.checkpoint(checkpointPath)
-
- val readSocketDF = ssc.socketTextStream("localhost", 7071)
-
- val batchData = readSocketDF
- .map(_.split(","))
- .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
-
- println("init carbon table info")
- batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
- val df = spark.createDataFrame(rdd).toDF()
- println(System.currentTimeMillis().toString() +
- " at batch time: " + time.toString() +
- " the count of received data: " + df.count())
- CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, "default", tableName)
- .option(CarbonStreamParser.CARBON_STREAM_PARSER,
- CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
- .mode(SaveMode.Append)
- .writeStreamData(df, time)
- }}
- } catch {
- case ex: Exception =>
- ex.printStackTrace()
- println("Done reading and writing streaming data")
- }
- ssc
- }
-
- def writeSocket(serverSocket: ServerSocket): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- // wait for client to connection request and accept
- val clientSocket = serverSocket.accept()
- val socketWriter = new PrintWriter(clientSocket.getOutputStream())
- var index = 0
- for (_ <- 1 to 1000) {
- // write 5 records per iteration
- for (_ <- 0 to 100) {
- index = index + 1
- socketWriter.println(index.toString + ",name_" + index
- + ",city_" + index + "," + (index * 10000.00).toString +
- ",school_" + index + ":school_" + index + index + "$" + index)
- }
- socketWriter.flush()
- Thread.sleep(2000)
- }
- socketWriter.close()
- System.out.println("Socket closed")
- }
- }
- thread.start()
- thread
- }
-}
-// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
deleted file mode 100644
index bc65b2f..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
-
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
-// scalastyle:off println
-object CarbonStructuredStreamingExample {
- def main(args: Array[String]) {
-
- // setup paths
- val rootPath = new File(this.getClass.getResource("/").getPath
- + "../../../..").getCanonicalPath
-
- val spark = ExampleUtils.createCarbonSession("CarbonStructuredStreamingExample", 4)
- val streamTableName = s"stream_table"
-
- val requireCreateTable = true
- val useComplexDataType = false
-
- if (requireCreateTable) {
- // drop table if exists previously
- spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
- // Create target carbon table and populate with initial data
- if (useComplexDataType) {
- spark.sql(
- s"""
- | CREATE TABLE ${ streamTableName }(
- | id INT,
- | name STRING,
- | city STRING,
- | salary FLOAT,
- | file struct<school:array<string>, age:int>
- | )
- | STORED BY 'carbondata'
- | TBLPROPERTIES(
- | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
- | """.stripMargin)
- } else {
- spark.sql(
- s"""
- | CREATE TABLE ${ streamTableName }(
- | id INT,
- | name STRING,
- | city STRING,
- | salary FLOAT
- | )
- | STORED BY 'carbondata'
- | TBLPROPERTIES(
- | 'streaming'='true', 'sort_columns'='name')
- | """.stripMargin)
- }
-
- val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
- // batch load
- val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
- spark.sql(
- s"""
- | LOAD DATA LOCAL INPATH '$path'
- | INTO TABLE $streamTableName
- | OPTIONS('HEADER'='true')
- """.stripMargin)
-
- // streaming ingest
- val serverSocket = new ServerSocket(7071)
- val thread1 = startStreaming(spark, carbonTable)
- val thread2 = writeSocket(serverSocket)
- val thread3 = showTableCount(spark, streamTableName)
-
- System.out.println("type enter to interrupt streaming")
- System.in.read()
- thread1.interrupt()
- thread2.interrupt()
- thread3.interrupt()
- serverSocket.close()
- }
-
- spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
-
- spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
-
- // record(id = 100000001) comes from batch segment_0
- // record(id = 1) comes from stream segment_1
- spark.sql(s"select * " +
- s"from ${ streamTableName } " +
- s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
-
- // not filter
- spark.sql(s"select * " +
- s"from ${ streamTableName } " +
- s"where id < 10 limit 100").show(100, truncate = false)
-
- if (useComplexDataType) {
- // complex
- spark.sql(s"select file.age, file.school " +
- s"from ${ streamTableName } " +
- s"where where file.age = 30 ").show(100, truncate = false)
- }
-
- spark.stop()
- System.out.println("streaming finished")
- }
-
- def showTableCount(spark: SparkSession, tableName: String): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- for (_ <- 0 to 1000) {
- spark.sql(s"select count(*) from $tableName").show(truncate = false)
- Thread.sleep(1000 * 3)
- }
- }
- }
- thread.start()
- thread
- }
-
- def startStreaming(spark: SparkSession, carbonTable: CarbonTable): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- var qry: StreamingQuery = null
- try {
- val readSocketDF = spark.readStream
- .format("socket")
- .option("host", "localhost")
- .option("port", 7071)
- .load()
-
- // Write data from socket stream to carbondata file
- qry = readSocketDF.writeStream
- .format("carbondata")
- .trigger(ProcessingTime("5 seconds"))
- .option("checkpointLocation",
- CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
- .option("dbName", "default")
- .option("tableName", "stream_table")
- .start()
-
- qry.awaitTermination()
- } catch {
- case ex: Exception =>
- ex.printStackTrace()
- println("Done reading and writing streaming data")
- } finally {
- qry.stop()
- }
- }
- }
- thread.start()
- thread
- }
-
- def writeSocket(serverSocket: ServerSocket): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- // wait for client to connection request and accept
- val clientSocket = serverSocket.accept()
- val socketWriter = new PrintWriter(clientSocket.getOutputStream())
- var index = 0
- for (_ <- 1 to 1000) {
- // write 5 records per iteration
- for (_ <- 0 to 1000) {
- index = index + 1
- socketWriter.println(index.toString + ",name_" + index
- + ",city_" + index + "," + (index * 10000.00).toString +
- ",school_" + index + ":school_" + index + index + "$" + index)
- }
- socketWriter.flush()
- Thread.sleep(1000)
- }
- socketWriter.close()
- System.out.println("Socket closed")
- }
- }
- thread.start()
- thread
- }
-}
-// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
deleted file mode 100644
index 9ca0e07..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
-
-case class FileElement(school: Array[String], age: Int)
-case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
-
-// scalastyle:off println
-object CarbonStructuredStreamingWithRowParser {
- def main(args: Array[String]) {
-
- // setup paths
- val rootPath = new File(this.getClass.getResource("/").getPath
- + "../../../..").getCanonicalPath
-
- val spark = ExampleUtils.createCarbonSession("CarbonStructuredStreamingWithRowParser", 4)
- val streamTableName = s"stream_table_with_row_parser"
-
- val requireCreateTable = true
- val useComplexDataType = false
-
- if (requireCreateTable) {
- // drop table if exists previously
- spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
- // Create target carbon table and populate with initial data
- if (useComplexDataType) {
- spark.sql(
- s"""
- | CREATE TABLE ${ streamTableName }(
- | id INT,
- | name STRING,
- | city STRING,
- | salary FLOAT,
- | file struct<school:array<string>, age:int>
- | )
- | STORED BY 'carbondata'
- | TBLPROPERTIES(
- | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
- | """.stripMargin)
- } else {
- spark.sql(
- s"""
- | CREATE TABLE ${ streamTableName }(
- | id INT,
- | name STRING,
- | city STRING,
- | salary FLOAT
- | )
- | STORED BY 'carbondata'
- | TBLPROPERTIES(
- | 'streaming'='true', 'sort_columns'='name')
- | """.stripMargin)
- }
-
- val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
- // batch load
- val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
- spark.sql(
- s"""
- | LOAD DATA LOCAL INPATH '$path'
- | INTO TABLE $streamTableName
- | OPTIONS('HEADER'='true')
- """.stripMargin)
-
- // streaming ingest
- val serverSocket = new ServerSocket(7071)
- val thread1 = startStreaming(spark, carbonTable.getTablePath)
- val thread2 = writeSocket(serverSocket)
- val thread3 = showTableCount(spark, streamTableName)
-
- System.out.println("type enter to interrupt streaming")
- System.in.read()
- thread1.interrupt()
- thread2.interrupt()
- thread3.interrupt()
- serverSocket.close()
- }
-
- spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
-
- spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
-
- // record(id = 100000001) comes from batch segment_0
- // record(id = 1) comes from stream segment_1
- spark.sql(s"select * " +
- s"from ${ streamTableName } " +
- s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
-
- // not filter
- spark.sql(s"select * " +
- s"from ${ streamTableName } " +
- s"where id < 10 limit 100").show(100, truncate = false)
-
- if (useComplexDataType) {
- // complex
- spark.sql(s"select file.age, file.school " +
- s"from ${ streamTableName } " +
- s"where where file.age = 30 ").show(100, truncate = false)
- }
-
- spark.stop()
- System.out.println("streaming finished")
- }
-
- def showTableCount(spark: SparkSession, tableName: String): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- for (_ <- 0 to 1000) {
- spark.sql(s"select count(*) from $tableName").show(truncate = false)
- Thread.sleep(1000 * 3)
- }
- }
- }
- thread.start()
- thread
- }
-
- def startStreaming(spark: SparkSession, tablePath: String): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- var qry: StreamingQuery = null
- try {
- import spark.implicits._
- val readSocketDF = spark.readStream
- .format("socket")
- .option("host", "localhost")
- .option("port", 7071)
- .load()
- .as[String]
- .map(_.split(","))
- .map { fields => {
- val tmp = fields(4).split("\\$")
- val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
- if (fields(0).toInt % 2 == 0) {
- StreamData(fields(0).toInt, null, fields(2), fields(3).toFloat, file)
- } else {
- StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file)
- }
- } }
-
- // Write data from socket stream to carbondata file
- qry = readSocketDF.writeStream
- .format("carbondata")
- .trigger(ProcessingTime("5 seconds"))
- .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
- .option("dbName", "default")
- .option("tableName", "stream_table_with_row_parser")
- .option(CarbonStreamParser.CARBON_STREAM_PARSER,
- CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
- .start()
-
- qry.awaitTermination()
- } catch {
- case ex: Exception =>
- ex.printStackTrace()
- println("Done reading and writing streaming data")
- } finally {
- qry.stop()
- }
- }
- }
- thread.start()
- thread
- }
-
- def writeSocket(serverSocket: ServerSocket): Thread = {
- val thread = new Thread() {
- override def run(): Unit = {
- // wait for client to connection request and accept
- val clientSocket = serverSocket.accept()
- val socketWriter = new PrintWriter(clientSocket.getOutputStream())
- var index = 0
- for (_ <- 1 to 1000) {
- // write 5 records per iteration
- for (_ <- 0 to 1000) {
- index = index + 1
- socketWriter.println(index.toString + ",name_" + index
- + ",city_" + index + "," + (index * 10000.00).toString +
- ",school_" + index + ":school_" + index + index + "$" + index)
- }
- socketWriter.flush()
- Thread.sleep(1000)
- }
- socketWriter.close()
- System.out.println("Socket closed")
- }
- }
- thread.start()
- thread
- }
-}
-// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
new file mode 100644
index 0000000..d819a3f
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.CarbonSparkStreamingFactory
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.CarbonSparkStreamingListener
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+/**
+ * This example introduces how to use Spark Streaming to write data
+ * to CarbonData stream table.
+ *
+ * NOTE: Current integration with Spark Streaming is an alpha feature.
+ */
+// scalastyle:off println
+object SparkStreamingExample {
+
+ def main(args: Array[String]): Unit = {
+
+ // setup paths
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val checkpointPath =
+ s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
+ System.currentTimeMillis().toString()
+ val streamTableName = s"dstream_stream_table"
+
+ val spark = ExampleUtils.createCarbonSession("SparkStreamingExample", 4)
+
+ val requireCreateTable = true
+
+ if (requireCreateTable) {
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+ // Create target carbon table and populate with initial data
+ spark.sql(
+ s"""
+ | CREATE TABLE ${ streamTableName }(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(
+ | 'streaming'='true',
+ | 'sort_columns'='name',
+ | 'dictionary_include'='city')
+ | """.stripMargin)
+ val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+ // batch load
+ val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE $streamTableName
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ // streaming ingest
+ val serverSocket = new ServerSocket(7071)
+ val thread1 = writeSocket(serverSocket)
+ val thread2 = showTableCount(spark, streamTableName)
+ val ssc = startStreaming(spark, streamTableName, checkpointPath)
+ // add a Spark Streaming Listener to remove all lock for stream tables when stop app
+ ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
+ // wait for stop signal to stop Spark Streaming App
+ waitForStopSignal(ssc)
+ // it need to start Spark Streaming App in main thread
+ // otherwise it will encounter an not-serializable exception.
+ ssc.start()
+ ssc.awaitTermination()
+ thread1.interrupt()
+ thread2.interrupt()
+ serverSocket.close()
+ }
+
+ spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+ spark.sql(s"select * from ${ streamTableName } order by id desc").show(100, truncate = false)
+
+ // record(id = 100000001) comes from batch segment_0
+ // record(id = 1) comes from stream segment_1
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+ // not filter
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id < 10 limit 100").show(100, truncate = false)
+
+ // show segments
+ spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
+
+ spark.stop()
+ System.out.println("streaming finished")
+ }
+
+ def showTableCount(spark: SparkSession, tableName: String): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ for (_ <- 0 to 1000) {
+ println(System.currentTimeMillis())
+ spark.sql(s"select count(*) from $tableName").show(truncate = false)
+ spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
+ Thread.sleep(1000 * 5)
+ }
+ }
+ }
+ thread.start()
+ thread
+ }
+
+ def waitForStopSignal(ssc: StreamingContext): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
+ new ServerSocket(7072).accept()
+ // don't stop SparkContext here
+ ssc.stop(false, true)
+ }
+ }
+ thread.start()
+ thread
+ }
+
+ def startStreaming(spark: SparkSession, tableName: String,
+ checkpointPath: String): StreamingContext = {
+ var ssc: StreamingContext = null
+ try {
+ // recommend: the batch interval must set larger, such as 30s, 1min.
+ ssc = new StreamingContext(spark.sparkContext, Seconds(30))
+ ssc.checkpoint(checkpointPath)
+
+ val readSocketDF = ssc.socketTextStream("localhost", 7071)
+
+ val batchData = readSocketDF
+ .map(_.split(","))
+ .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
+
+ println("init carbon table info")
+ batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
+ val df = spark.createDataFrame(rdd).toDF()
+ println(System.currentTimeMillis().toString() +
+ " at batch time: " + time.toString() +
+ " the count of received data: " + df.count())
+ CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, "default", tableName)
+ .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+ CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
+ .mode(SaveMode.Append)
+ .writeStreamData(df, time)
+ }}
+ } catch {
+ case ex: Exception =>
+ ex.printStackTrace()
+ println("Done reading and writing streaming data")
+ }
+ ssc
+ }
+
+ def writeSocket(serverSocket: ServerSocket): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ // wait for client to connection request and accept
+ val clientSocket = serverSocket.accept()
+ val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+ var index = 0
+ for (_ <- 1 to 1000) {
+ // write 5 records per iteration
+ for (_ <- 0 to 100) {
+ index = index + 1
+ socketWriter.println(index.toString + ",name_" + index
+ + ",city_" + index + "," + (index * 10000.00).toString +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ }
+ socketWriter.flush()
+ Thread.sleep(2000)
+ }
+ socketWriter.close()
+ System.out.println("Socket closed")
+ }
+ }
+ thread.start()
+ thread
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala
new file mode 100644
index 0000000..89883f8
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession}
+import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
+
+/**
+ * This example introduces how to use CarbonData batch load to integrate
+ * with Spark Streaming(it's DStream, not Spark Structured Streaming)
+ */
+// scalastyle:off println
+
+case class DStreamData(id: Int, name: String, city: String, salary: Float)
+
+object StreamingUsingBatchLoadExample {
+
+ def main(args: Array[String]): Unit = {
+
+ // setup paths
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val checkpointPath =
+ s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
+ System.currentTimeMillis().toString()
+ val streamTableName = s"dstream_batch_table"
+
+ val spark = ExampleUtils.createCarbonSession("StreamingUsingBatchLoadExample", 4)
+
+ val requireCreateTable = true
+
+ if (requireCreateTable) {
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+ // Create target carbon table and populate with initial data
+ // set AUTO_LOAD_MERGE to true to compact segment automatically
+ spark.sql(
+ s"""
+ | CREATE TABLE ${ streamTableName }(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(
+ | 'sort_columns'='name',
+ | 'dictionary_include'='city',
+ | 'AUTO_LOAD_MERGE'='true',
+ | 'COMPACTION_LEVEL_THRESHOLD'='4,10')
+ | """.stripMargin)
+
+ val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+ // batch load
+ val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE $streamTableName
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ // streaming ingest
+ val serverSocket = new ServerSocket(7071)
+ val thread1 = writeSocket(serverSocket)
+ val thread2 = showTableCount(spark, streamTableName)
+ val ssc = startStreaming(spark, streamTableName, checkpointPath)
+ // wait for stop signal to stop Spark Streaming App
+ waitForStopSignal(ssc)
+ // it need to start Spark Streaming App in main thread
+ // otherwise it will encounter an not-serializable exception.
+ ssc.start()
+ ssc.awaitTermination()
+ thread1.interrupt()
+ thread2.interrupt()
+ serverSocket.close()
+ }
+
+ spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+ spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
+
+ // record(id = 100000001) comes from batch segment_0
+ // record(id = 1) comes from stream segment_1
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+ // not filter
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id < 10 limit 100").show(100, truncate = false)
+
+ // show segments
+ spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
+
+ // drop table
+ spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+
+ spark.stop()
+ System.out.println("streaming finished")
+ }
+
+ def showTableCount(spark: SparkSession, tableName: String): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ for (_ <- 0 to 1000) {
+ spark.sql(s"select count(*) from $tableName").show(truncate = false)
+ spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
+ Thread.sleep(1000 * 5)
+ }
+ }
+ }
+ thread.start()
+ thread
+ }
+
+ def waitForStopSignal(ssc: StreamingContext): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
+ new ServerSocket(7072).accept()
+ // don't stop SparkContext here
+ ssc.stop(false, true)
+ }
+ }
+ thread.start()
+ thread
+ }
+
+ def startStreaming(spark: SparkSession, tableName: String,
+ checkpointPath: String): StreamingContext = {
+ var ssc: StreamingContext = null
+ try {
+ // recommend: the batch interval must set larger, such as 30s, 1min.
+ ssc = new StreamingContext(spark.sparkContext, Seconds(15))
+ ssc.checkpoint(checkpointPath)
+
+ val readSocketDF = ssc.socketTextStream("localhost", 7071)
+
+ val batchData = readSocketDF
+ .map(_.split(","))
+ .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
+
+ batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
+ val df = spark.createDataFrame(rdd).toDF("id", "name", "city", "salary")
+ println("at time: " + time.toString() + " the count of received data: " + df.count())
+ df.write
+ .format("carbondata")
+ .option("tableName", tableName)
+ .mode(SaveMode.Append)
+ .save()
+ }}
+ } catch {
+ case ex: Exception =>
+ ex.printStackTrace()
+ println("Done reading and writing streaming data")
+ }
+ ssc
+ }
+
+ def writeSocket(serverSocket: ServerSocket): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ // wait for client to connection request and accept
+ val clientSocket = serverSocket.accept()
+ val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+ var index = 0
+ for (_ <- 1 to 1000) {
+ // write 5 records per iteration
+ for (_ <- 0 to 1000) {
+ index = index + 1
+ socketWriter.println(index.toString + ",name_" + index
+ + ",city_" + index + "," + (index * 10000.00).toString +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ }
+ socketWriter.flush()
+ Thread.sleep(1000)
+ }
+ socketWriter.close()
+ System.out.println("Socket closed")
+ }
+ }
+ thread.start()
+ thread
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
new file mode 100644
index 0000000..a07d504
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+case class FileElement(school: Array[String], age: Int)
+case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
+
+// scalastyle:off println
+object StreamingWithRowParserExample {
+ def main(args: Array[String]) {
+
+ // setup paths
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+
+ val spark = ExampleUtils.createCarbonSession("StreamingWithRowParserExample", 4)
+ val streamTableName = s"stream_table_with_row_parser"
+
+ val requireCreateTable = true
+ val useComplexDataType = false
+
+ if (requireCreateTable) {
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+ // Create target carbon table and populate with initial data
+ if (useComplexDataType) {
+ spark.sql(
+ s"""
+ | CREATE TABLE ${ streamTableName }(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | file struct<school:array<string>, age:int>
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(
+ | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
+ | """.stripMargin)
+ } else {
+ spark.sql(
+ s"""
+ | CREATE TABLE ${ streamTableName }(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(
+ | 'streaming'='true', 'sort_columns'='name')
+ | """.stripMargin)
+ }
+
+ val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+ // batch load
+ val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE $streamTableName
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ // streaming ingest
+ val serverSocket = new ServerSocket(7071)
+ val thread1 = startStreaming(spark, carbonTable.getTablePath)
+ val thread2 = writeSocket(serverSocket)
+ val thread3 = showTableCount(spark, streamTableName)
+
+ System.out.println("type enter to interrupt streaming")
+ System.in.read()
+ thread1.interrupt()
+ thread2.interrupt()
+ thread3.interrupt()
+ serverSocket.close()
+ }
+
+ spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+ spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
+
+ // record(id = 100000001) comes from batch segment_0
+ // record(id = 1) comes from stream segment_1
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+ // not filter
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id < 10 limit 100").show(100, truncate = false)
+
+ if (useComplexDataType) {
+ // complex
+ spark.sql(s"select file.age, file.school " +
+ s"from ${ streamTableName } " +
+ s"where where file.age = 30 ").show(100, truncate = false)
+ }
+
+ spark.stop()
+ System.out.println("streaming finished")
+ }
+
+ def showTableCount(spark: SparkSession, tableName: String): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ for (_ <- 0 to 1000) {
+ spark.sql(s"select count(*) from $tableName").show(truncate = false)
+ Thread.sleep(1000 * 3)
+ }
+ }
+ }
+ thread.start()
+ thread
+ }
+
+ def startStreaming(spark: SparkSession, tablePath: String): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ var qry: StreamingQuery = null
+ try {
+ import spark.implicits._
+ val readSocketDF = spark.readStream
+ .format("socket")
+ .option("host", "localhost")
+ .option("port", 7071)
+ .load()
+ .as[String]
+ .map(_.split(","))
+ .map { fields => {
+ val tmp = fields(4).split("\\$")
+ val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
+ if (fields(0).toInt % 2 == 0) {
+ StreamData(fields(0).toInt, null, fields(2), fields(3).toFloat, file)
+ } else {
+ StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file)
+ }
+ } }
+
+ // Write data from socket stream to carbondata file
+ qry = readSocketDF.writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime("5 seconds"))
+ .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
+ .option("dbName", "default")
+ .option("tableName", "stream_table_with_row_parser")
+ .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+ CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
+ .start()
+
+ qry.awaitTermination()
+ } catch {
+ case ex: Exception =>
+ ex.printStackTrace()
+ println("Done reading and writing streaming data")
+ } finally {
+ qry.stop()
+ }
+ }
+ }
+ thread.start()
+ thread
+ }
+
+ def writeSocket(serverSocket: ServerSocket): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ // wait for client to connection request and accept
+ val clientSocket = serverSocket.accept()
+ val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+ var index = 0
+ for (_ <- 1 to 1000) {
+ // write 5 records per iteration
+ for (_ <- 0 to 1000) {
+ index = index + 1
+ socketWriter.println(index.toString + ",name_" + index
+ + ",city_" + index + "," + (index * 10000.00).toString +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ }
+ socketWriter.flush()
+ Thread.sleep(1000)
+ }
+ socketWriter.close()
+ System.out.println("Socket closed")
+ }
+ }
+ thread.start()
+ thread
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
new file mode 100644
index 0000000..1d4bedf
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+// scalastyle:off println
+object StructuredStreamingExample {
+ def main(args: Array[String]) {
+
+ // setup paths
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+
+ val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4)
+ val streamTableName = s"stream_table"
+
+ val requireCreateTable = true
+ val useComplexDataType = false
+
+ if (requireCreateTable) {
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+ // Create target carbon table and populate with initial data
+ if (useComplexDataType) {
+ spark.sql(
+ s"""
+ | CREATE TABLE ${ streamTableName }(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | file struct<school:array<string>, age:int>
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(
+ | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
+ | """.stripMargin)
+ } else {
+ spark.sql(
+ s"""
+ | CREATE TABLE ${ streamTableName }(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(
+ | 'streaming'='true', 'sort_columns'='name')
+ | """.stripMargin)
+ }
+
+ val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+ // batch load
+ val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE $streamTableName
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ // streaming ingest
+ val serverSocket = new ServerSocket(7071)
+ val thread1 = startStreaming(spark, carbonTable)
+ val thread2 = writeSocket(serverSocket)
+ val thread3 = showTableCount(spark, streamTableName)
+
+ System.out.println("type enter to interrupt streaming")
+ System.in.read()
+ thread1.interrupt()
+ thread2.interrupt()
+ thread3.interrupt()
+ serverSocket.close()
+ }
+
+ spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+ spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
+
+ // record(id = 100000001) comes from batch segment_0
+ // record(id = 1) comes from stream segment_1
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+ // not filter
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id < 10 limit 100").show(100, truncate = false)
+
+ if (useComplexDataType) {
+ // complex
+ spark.sql(s"select file.age, file.school " +
+ s"from ${ streamTableName } " +
+ s"where where file.age = 30 ").show(100, truncate = false)
+ }
+
+ spark.stop()
+ System.out.println("streaming finished")
+ }
+
+ def showTableCount(spark: SparkSession, tableName: String): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ for (_ <- 0 to 1000) {
+ spark.sql(s"select count(*) from $tableName").show(truncate = false)
+ Thread.sleep(1000 * 3)
+ }
+ }
+ }
+ thread.start()
+ thread
+ }
+
+ def startStreaming(spark: SparkSession, carbonTable: CarbonTable): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ var qry: StreamingQuery = null
+ try {
+ val readSocketDF = spark.readStream
+ .format("socket")
+ .option("host", "localhost")
+ .option("port", 7071)
+ .load()
+
+ // Write data from socket stream to carbondata file
+ qry = readSocketDF.writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime("5 seconds"))
+ .option("checkpointLocation",
+ CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
+ .option("dbName", "default")
+ .option("tableName", "stream_table")
+ .start()
+
+ qry.awaitTermination()
+ } catch {
+ case ex: Exception =>
+ ex.printStackTrace()
+ println("Done reading and writing streaming data")
+ } finally {
+ qry.stop()
+ }
+ }
+ }
+ thread.start()
+ thread
+ }
+
+ def writeSocket(serverSocket: ServerSocket): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ // wait for client to connection request and accept
+ val clientSocket = serverSocket.accept()
+ val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+ var index = 0
+ for (_ <- 1 to 1000) {
+ // write 5 records per iteration
+ for (_ <- 0 to 1000) {
+ index = index + 1
+ socketWriter.println(index.toString + ",name_" + index
+ + ",city_" + index + "," + (index * 10000.00).toString +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ }
+ socketWriter.flush()
+ Thread.sleep(1000)
+ }
+ socketWriter.close()
+ System.out.println("Socket closed")
+ }
+ }
+ thread.start()
+ thread
+ }
+}
+// scalastyle:on println
[5/6] carbondata git commit: [CARBONDATA-2224][File Level Reader
Support] External File level reader support
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 0298eea..cf22569 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -150,7 +150,7 @@ case class CarbonDropTableCommand(
// delete table data only if it is not external table
if (FileFactory.isFileExist(tablePath, fileType) &&
- !carbonTable.isExternalTable) {
+ !(carbonTable.isExternalTable || carbonTable.isFileLevelExternalTable)) {
val file = FileFactory.getCarbonFile(tablePath, fileType)
CarbonUtil.deleteFoldersAndFilesSilent(file)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
deleted file mode 100644
index 2eed988..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import java.io.File
-import java.util
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
-import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore
-import org.apache.carbondata.core.metadata.SegmentFileStore
-import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
-import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
-import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
-
-class CarbonFileFormat
- extends FileFormat
- with DataSourceRegister
- with Logging
-with Serializable {
-
- override def shortName(): String = "carbondata"
-
- override def inferSchema(sparkSession: SparkSession,
- options: Map[String, String],
- files: Seq[FileStatus]): Option[StructType] = {
- None
- }
-
- SparkSession.getActiveSession.get.sessionState.conf.setConfString(
- "spark.sql.sources.commitProtocolClass",
- "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
-
- override def prepareWrite(
- sparkSession: SparkSession,
- job: Job,
- options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = {
- val conf = job.getConfiguration
- conf.setClass(
- SQLConf.OUTPUT_COMMITTER_CLASS.key,
- classOf[CarbonOutputCommitter],
- classOf[CarbonOutputCommitter])
- conf.set("carbon.commit.protocol", "carbon.commit.protocol")
- job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
- val table = CarbonEnv.getCarbonTable(
- TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
- val model = new CarbonLoadModel
- val carbonProperty = CarbonProperties.getInstance()
- val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
- val tableProperties = table.getTableInfo.getFactTable.getTableProperties
- optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
- val partitionStr =
- table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
- _.getColumnName.toLowerCase).mkString(",")
- optionsFinal.put(
- "fileheader",
- dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr)
- val optionsLocal = new mutable.HashMap[String, String]()
- optionsLocal ++= options
- optionsLocal += (("header", "false"))
- new CarbonLoadModelBuilder(table).build(
- optionsLocal.toMap.asJava,
- optionsFinal,
- model,
- conf)
- model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
- model.setDictionaryServerHost(options.getOrElse("dicthost", null))
- model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
- CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
- model.setPartitionLoad(true)
-
- val staticPartition = options.getOrElse("staticpartition", null)
- if (staticPartition != null) {
- conf.set("carbon.staticpartition", staticPartition)
- }
- // In case of update query there is chance to remove the older segments, so here we can set
- // the to be deleted segments to mark as delete while updating tablestatus
- val segemntsTobeDeleted = options.get("segmentsToBeDeleted")
- if (segemntsTobeDeleted.isDefined) {
- conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get)
- }
-
- val currPartition = options.getOrElse("currentpartition", null)
- if (currPartition != null) {
- conf.set("carbon.currentpartition", currPartition)
- }
- // Update with the current in progress load.
- val currEntry = options.getOrElse("currentloadentry", null)
- if (currEntry != null) {
- val loadEntry =
- ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails]
- val details =
- SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
- model.setSegmentId(loadEntry.getLoadName)
- model.setFactTimeStamp(loadEntry.getLoadStartTime)
- val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
- list.add(loadEntry)
- model.setLoadMetadataDetails(list)
- }
- // Set the update timestamp if user sets in case of update query. It needs to be updated
- // in load status update time
- val updateTimeStamp = options.get("updatetimestamp")
- if (updateTimeStamp.isDefined) {
- conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
- }
- CarbonTableOutputFormat.setLoadModel(conf, model)
-
- new OutputWriterFactory {
-
- /**
- * counter used for generating task numbers. This is used to generate unique partition numbers
- * in case of partitioning
- */
- val counter = new AtomicLong()
- val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]()
-
- override def newInstance(
- path: String,
- dataSchema: StructType,
- context: TaskAttemptContext): OutputWriter = {
- val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
- val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
- var storeLocation: Array[String] = Array[String]()
- val isCarbonUseLocalDir = CarbonProperties.getInstance()
- .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
-
-
- val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
- val tmpLocationSuffix =
- File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber
- if (isCarbonUseLocalDir) {
- val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
- if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
- // use single dir
- storeLocation = storeLocation :+
- (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix)
- if (storeLocation == null || storeLocation.isEmpty) {
- storeLocation = storeLocation :+
- (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
- }
- } else {
- // use all the yarn dirs
- storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
- }
- } else {
- storeLocation =
- storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
- }
- CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
- new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model)
- }
-
- /**
- * Generate taskid using the taskid of taskcontext and the path. It should be unique in case
- * of partition tables.
- */
- private def generateTaskNumber(path: String,
- context: TaskAttemptContext, segmentId: String): String = {
- var partitionNumber: java.lang.Long = taskIdMap.get(path)
- if (partitionNumber == null) {
- partitionNumber = counter.incrementAndGet()
- // Generate taskid using the combination of taskid and partition number to make it unique.
- taskIdMap.put(path, partitionNumber)
- }
- val taskID = context.getTaskAttemptID.getTaskID.getId
- CarbonScalaUtil.generateUniqueNumber(taskID, segmentId, partitionNumber)
- }
-
- override def getFileExtension(context: TaskAttemptContext): String = {
- CarbonTablePath.CARBON_DATA_EXT
- }
-
- }
- }
-}
-
-case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
- extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
- override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext,
- absoluteDir: String,
- ext: String): String = {
- val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol")
- if (carbonFlow != null) {
- super.newTaskTempFile(taskContext, Some(absoluteDir), ext)
- } else {
- super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
- }
- }
-}
-
-/**
- * It is a just class to make compile between spark 2.1 and 2.2
- */
-private trait AbstractCarbonOutputWriter {
- def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
- def writeInternal(row: InternalRow): Unit = {
- writeCarbon(row)
- }
- def write(row: InternalRow): Unit = {
- writeCarbon(row)
- }
- def writeCarbon(row: InternalRow): Unit
-}
-
-private class CarbonOutputWriter(path: String,
- context: TaskAttemptContext,
- fieldTypes: Seq[DataType],
- taskNo : String,
- model: CarbonLoadModel)
- extends OutputWriter with AbstractCarbonOutputWriter {
-
- val converter = new DataTypeConverterImpl
-
- val partitions =
- getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName)
- val staticPartition: util.HashMap[String, Boolean] = {
- val staticPart = context.getConfiguration.get("carbon.staticpartition")
- if (staticPart != null) {
- ObjectSerializationUtil.convertStringToObject(
- staticPart).asInstanceOf[util.HashMap[String, Boolean]]
- } else {
- null
- }
- }
- lazy val currPartitions: util.List[indexstore.PartitionSpec] = {
- val currParts = context.getConfiguration.get("carbon.currentpartition")
- if (currParts != null) {
- ObjectSerializationUtil.convertStringToObject(
- currParts).asInstanceOf[util.List[indexstore.PartitionSpec]]
- } else {
- new util.ArrayList[indexstore.PartitionSpec]()
- }
- }
- var (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
- val updatedPartitions = partitions.map(splitPartition)
- (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
- } else {
- (Map.empty[String, String].toArray, Array.empty)
- }
-
- private def splitPartition(p: String) = {
- val value = p.substring(p.indexOf("=") + 1, p.length)
- val col = p.substring(0, p.indexOf("="))
- // NUll handling case. For null hive creates with this special name
- if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
- (col, null)
- // we should replace back the special string with empty value.
- } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
- (col, "")
- } else {
- (col, value)
- }
- }
-
- lazy val writePath = {
- val updatedPath = getPartitionPath(path, context, model)
- // in case of partition location specified by user then search the partitions from the current
- // partitions to get the corresponding partitions.
- if (partitions.isEmpty) {
- val writeSpec = new indexstore.PartitionSpec(null, updatedPath)
- val index = currPartitions.indexOf(writeSpec)
- if (index > -1) {
- val spec = currPartitions.get(index)
- updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray
- partitionData = updatePartitions(updatedPartitions.map(_._2))
- }
- }
- updatedPath
- }
-
- val writable = new ObjectArrayWritable
-
- private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = {
- model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
- .getColumnSchemaList.asScala.zipWithIndex.map { case (col, index) =>
-
- val dataType = if (col.hasEncoding(Encoding.DICTIONARY)) {
- DataTypes.INT
- } else if (col.getDataType.equals(DataTypes.TIMESTAMP) ||
- col.getDataType.equals(DataTypes.DATE)) {
- DataTypes.LONG
- } else {
- col.getDataType
- }
- if (staticPartition != null && staticPartition.get(col.getColumnName.toLowerCase)) {
- val converetedVal =
- CarbonScalaUtil.convertStaticPartitions(
- partitionData(index),
- col,
- model.getCarbonDataLoadSchema.getCarbonTable)
- if (col.hasEncoding(Encoding.DICTIONARY)) {
- converetedVal.toInt.asInstanceOf[AnyRef]
- } else {
- DataTypeUtil.getDataBasedOnDataType(
- converetedVal,
- dataType,
- converter)
- }
- } else {
- DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType, converter)
- }
- }.toArray
- }
-
- private val recordWriter: CarbonRecordWriter = {
- context.getConfiguration.set("carbon.outputformat.taskno", taskNo)
- context.getConfiguration.set("carbon.outputformat.writepath",
- writePath + "/" + model.getSegmentId + "_" + model.getFactTimeStamp + ".tmp")
- new CarbonTableOutputFormat() {
- override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- new Path(path)
- }
- }.getRecordWriter(context).asInstanceOf[CarbonRecordWriter]
- }
-
- // TODO Implement writesupport interface to support writing Row directly to recordwriter
- def writeCarbon(row: InternalRow): Unit = {
- val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
- var i = 0
- while (i < fieldTypes.length) {
- if (!row.isNullAt(i)) {
- fieldTypes(i) match {
- case StringType =>
- data(i) = row.getString(i)
- case d: DecimalType =>
- data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
- case other =>
- data(i) = row.get(i, other)
- }
- }
- i += 1
- }
- if (partitionData.length > 0) {
- System.arraycopy(partitionData, 0, data, fieldTypes.length, partitionData.length)
- }
- writable.set(data)
- recordWriter.write(NullWritable.get(), writable)
- }
-
-
- override def writeInternal(row: InternalRow): Unit = {
- writeCarbon(row)
- }
-
- override def close(): Unit = {
- recordWriter.close(context)
- // write partition info to new file.
- val partitonList = new util.ArrayList[String]()
- val formattedPartitions =
- // All dynamic partitions need to be converted to proper format
- CarbonScalaUtil.updatePartitions(
- updatedPartitions.toMap,
- model.getCarbonDataLoadSchema.getCarbonTable)
- formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
- SegmentFileStore.writeSegmentFile(
- model.getTablePath,
- taskNo,
- writePath,
- model.getSegmentId + "_" + model.getFactTimeStamp + "",
- partitonList)
- }
-
- def getPartitionPath(path: String,
- attemptContext: TaskAttemptContext,
- model: CarbonLoadModel): String = {
- if (updatedPartitions.nonEmpty) {
- val formattedPartitions =
- // All dynamic partitions need to be converted to proper format
- CarbonScalaUtil.updatePartitions(
- updatedPartitions.toMap,
- model.getCarbonDataLoadSchema.getCarbonTable)
- val partitionstr = formattedPartitions.map{p =>
- ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2)
- }.mkString(CarbonCommonConstants.FILE_SEPARATOR)
- model.getCarbonDataLoadSchema.getCarbonTable.getTablePath +
- CarbonCommonConstants.FILE_SEPARATOR + partitionstr
- } else {
- var updatedPath = FileFactory.getUpdatedFilePath(path)
- updatedPath.substring(0, updatedPath.lastIndexOf("/"))
- }
- }
-
- def getPartitionsFromPath(
- path: String,
- attemptContext: TaskAttemptContext,
- model: CarbonLoadModel): Array[String] = {
- var attemptId = attemptContext.getTaskAttemptID.toString + "/"
- if (path.indexOf(attemptId) > -1) {
- val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/"))
- if (str.length > 0) {
- str.split("/")
- } else {
- Array.empty
- }
- } else {
- Array.empty
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
new file mode 100644
index 0000000..fa54e0d
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.text.TextOutputWriter
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types.{AtomicType, StructField, StructType}
+
+import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment}
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion}
+import org.apache.carbondata.core.reader.CarbonHeaderReader
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.scan.model.QueryModel
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+class SparkCarbonFileFormat extends FileFormat
+ with DataSourceRegister
+ with Logging
+ with Serializable {
+
+ @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ override def inferSchema(sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ val filePaths = CarbonUtil.getFilePathExternalFilePath(
+ options.get("path").get)
+ if (filePaths.size() == 0) {
+ throw new SparkException("CarbonData file is not present in the location mentioned in DDL")
+ }
+ val carbonHeaderReader: CarbonHeaderReader = new CarbonHeaderReader(filePaths.get(0))
+ val fileHeader = carbonHeaderReader.readHeader
+ val table_columns: java.util.List[org.apache.carbondata.format.ColumnSchema] = fileHeader
+ .getColumn_schema
+ var colArray = ArrayBuffer[StructField]()
+ for (i <- 0 to table_columns.size() - 1) {
+ val col = CarbonUtil.thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i))
+ colArray += (new StructField(col.getColumnName,
+ CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType), false))
+ }
+ colArray.+:(Nil)
+
+ Some(StructType(colArray))
+ }
+
+ override def prepareWrite(sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new TextOutputWriter(path, dataSchema, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ CarbonTablePath.CARBON_DATA_EXT
+ }
+ }
+ }
+
+ override def shortName(): String = "Carbonfile"
+
+ override def toString: String = "Carbonfile"
+
+ override def hashCode(): Int = getClass.hashCode()
+
+ override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonFileFormat]
+
+ def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = {
+ val vectorizedReader = {
+ if (sparkSession.sqlContext.sparkSession.conf
+ .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
+ sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
+ System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else {
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+ CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+ }
+ }
+ vectorizedReader.toBoolean
+ }
+
+
+ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
+ val conf = sparkSession.sessionState.conf
+ conf.wholeStageEnabled &&
+ schema.length <= conf.wholeStageMaxNumFields &&
+ schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
+
+ def createVectorizedCarbonRecordReader(queryModel: QueryModel,
+ inputMetricsStats: InputMetricsStats, enableBatch: String): RecordReader[Void, Object] = {
+ val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
+ try {
+ val cons = Class.forName(name).getDeclaredConstructors
+ cons.head.setAccessible(true)
+ cons.head.newInstance(queryModel, inputMetricsStats, enableBatch)
+ .asInstanceOf[RecordReader[Void, Object]]
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ null
+ }
+ }
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+
+ val filter : Option[Expression] = filters.flatMap { filter =>
+ CarbonFilters.createCarbonFilter(dataSchema, filter)
+ }.reduceOption(new AndExpression(_, _))
+
+ val projection = requiredSchema.map(_.name).toArray
+ val carbonProjection = new CarbonProjection
+ projection.foreach(carbonProjection.addColumn)
+
+ val conf = new Configuration()
+ val jobConf = new JobConf(conf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val job = Job.getInstance(jobConf)
+ var supportBatchValue: Boolean = false
+
+ val readVector = supportVector(sparkSession, dataSchema)
+ if (readVector) {
+ supportBatchValue = supportBatch(sparkSession, dataSchema)
+ }
+
+ CarbonFileInputFormat.setTableName(job.getConfiguration, "externaldummy")
+ CarbonFileInputFormat.setDatabaseName(job.getConfiguration, "default")
+ CarbonMetadata.getInstance.removeTable("default_externaldummy")
+ val dataMapJob: DataMapJob = CarbonFileInputFormat.getDataMapJob(job.getConfiguration)
+ val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
+
+ (file: PartitionedFile) => {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ if (file.filePath.endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+ val fileSplit =
+ new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
+
+ val path: String = options.get("path").get
+ val endindex: Int = path.indexOf("Fact") - 1
+ val tablePath = path.substring(0, endindex)
+ lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+ tablePath,
+ "default",
+ "externaldummy")
+ val split = CarbonInputSplit.from("null", "0", fileSplit, ColumnarFormatVersion.V3, null)
+
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+ val conf1 = new Configuration()
+ conf1.set("mapreduce.input.carboninputformat.tableName", "externaldummy")
+ conf1.set("mapreduce.input.carboninputformat.databaseName", "default")
+ conf1.set("mapreduce.input.fileinputformat.inputdir", tablePath)
+ CarbonFileInputFormat.setColumnProjection(conf1, carbonProjection)
+ filter match {
+ case Some(c) => CarbonFileInputFormat.setFilterPredicates(conf1, c)
+ case None => None
+ }
+ val attemptContext = new TaskAttemptContextImpl(conf1, attemptId)
+
+ val model = format.createQueryModel(split, attemptContext)
+
+ var segments = new java.util.ArrayList[Segment]()
+ val seg = new Segment("null", null)
+ segments.add(seg)
+ var partition : java.util.List[PartitionSpec] = new java.util.ArrayList[PartitionSpec]()
+
+
+ val segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null")
+ val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentPath)
+ if (indexFiles.size() == 0) {
+ throw new SparkException("Index file not present to read the carbondata file")
+ }
+
+ val tab = model.getTable
+ DataMapStoreManager.getInstance().clearDataMaps(identifier)
+ val dataMapExprWrapper = DataMapChooser.get
+ .choose(tab, model.getFilterExpressionResolverTree)
+
+ // TODO : handle the partition for CarbonFileLevelFormat
+ val prunedBlocklets = dataMapExprWrapper.prune(segments, null)
+
+ val detailInfo = prunedBlocklets.get(0).getDetailInfo
+ detailInfo.readColumnSchema(detailInfo.getColumnSchemaBinary)
+ split.setDetailInfo(detailInfo)
+
+ val carbonReader = if (readVector) {
+ val vectorizedReader = createVectorizedCarbonRecordReader(model,
+ null,
+ supportBatchValue.toString)
+ vectorizedReader.initialize(split, attemptContext)
+ logDebug(s"Appending $partitionSchema ${ file.partitionValues }")
+ vectorizedReader
+ } else {
+ val reader = new CarbonRecordReader(model,
+ format.getReadSupportClass(attemptContext.getConfiguration), null)
+ reader.initialize(split, attemptContext)
+ reader
+ }
+
+ val iter = new RecordReaderIterator(carbonReader)
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+
+ iter.asInstanceOf[Iterator[InternalRow]]
+ }
+ else {
+ Iterator.empty
+ }
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
new file mode 100644
index 0000000..d34b201
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
+
+class SparkCarbonTableFormat
+ extends FileFormat
+ with DataSourceRegister
+ with Logging
+with Serializable {
+
+ override def shortName(): String = "carbondata"
+
+ override def inferSchema(sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ None
+ }
+
+ SparkSession.getActiveSession.get.sessionState.conf.setConfString(
+ "spark.sql.sources.commitProtocolClass",
+ "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
+
+ override def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ val conf = job.getConfiguration
+ conf.setClass(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key,
+ classOf[CarbonOutputCommitter],
+ classOf[CarbonOutputCommitter])
+ conf.set("carbon.commit.protocol", "carbon.commit.protocol")
+ job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
+ val table = CarbonEnv.getCarbonTable(
+ TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
+ val model = new CarbonLoadModel
+ val carbonProperty = CarbonProperties.getInstance()
+ val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
+ val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+ optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
+ carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+ val partitionStr =
+ table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
+ _.getColumnName.toLowerCase).mkString(",")
+ optionsFinal.put(
+ "fileheader",
+ dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr)
+ val optionsLocal = new mutable.HashMap[String, String]()
+ optionsLocal ++= options
+ optionsLocal += (("header", "false"))
+ new CarbonLoadModelBuilder(table).build(
+ optionsLocal.toMap.asJava,
+ optionsFinal,
+ model,
+ conf)
+ model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
+ model.setDictionaryServerHost(options.getOrElse("dicthost", null))
+ model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
+ CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
+ model.setPartitionLoad(true)
+
+ val staticPartition = options.getOrElse("staticpartition", null)
+ if (staticPartition != null) {
+ conf.set("carbon.staticpartition", staticPartition)
+ }
+ // In case of update query there is chance to remove the older segments, so here we can set
+ // the to be deleted segments to mark as delete while updating tablestatus
+ val segemntsTobeDeleted = options.get("segmentsToBeDeleted")
+ if (segemntsTobeDeleted.isDefined) {
+ conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get)
+ }
+
+ val currPartition = options.getOrElse("currentpartition", null)
+ if (currPartition != null) {
+ conf.set("carbon.currentpartition", currPartition)
+ }
+ // Update with the current in progress load.
+ val currEntry = options.getOrElse("currentloadentry", null)
+ if (currEntry != null) {
+ val loadEntry =
+ ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails]
+ val details =
+ SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
+ model.setSegmentId(loadEntry.getLoadName)
+ model.setFactTimeStamp(loadEntry.getLoadStartTime)
+ val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
+ list.add(loadEntry)
+ model.setLoadMetadataDetails(list)
+ }
+ // Set the update timestamp if user sets in case of update query. It needs to be updated
+ // in load status update time
+ val updateTimeStamp = options.get("updatetimestamp")
+ if (updateTimeStamp.isDefined) {
+ conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
+ }
+ CarbonTableOutputFormat.setLoadModel(conf, model)
+
+ new OutputWriterFactory {
+
+ /**
+ * counter used for generating task numbers. This is used to generate unique partition numbers
+ * in case of partitioning
+ */
+ val counter = new AtomicLong()
+ val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]()
+
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
+ val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
+ var storeLocation: Array[String] = Array[String]()
+ val isCarbonUseLocalDir = CarbonProperties.getInstance()
+ .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
+
+
+ val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
+ val tmpLocationSuffix =
+ File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber
+ if (isCarbonUseLocalDir) {
+ val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+ if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
+ // use single dir
+ storeLocation = storeLocation :+
+ (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix)
+ if (storeLocation == null || storeLocation.isEmpty) {
+ storeLocation = storeLocation :+
+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
+ }
+ } else {
+ // use all the yarn dirs
+ storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
+ }
+ } else {
+ storeLocation =
+ storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
+ }
+ CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
+ new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model)
+ }
+
+ /**
+ * Generate taskid using the taskid of taskcontext and the path. It should be unique in case
+ * of partition tables.
+ */
+ private def generateTaskNumber(path: String,
+ context: TaskAttemptContext, segmentId: String): String = {
+ var partitionNumber: java.lang.Long = taskIdMap.get(path)
+ if (partitionNumber == null) {
+ partitionNumber = counter.incrementAndGet()
+ // Generate taskid using the combination of taskid and partition number to make it unique.
+ taskIdMap.put(path, partitionNumber)
+ }
+ val taskID = context.getTaskAttemptID.getTaskID.getId
+ CarbonScalaUtil.generateUniqueNumber(taskID, segmentId, partitionNumber)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ CarbonTablePath.CARBON_DATA_EXT
+ }
+
+ }
+ }
+}
+
+case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
+ extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
+ override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext,
+ absoluteDir: String,
+ ext: String): String = {
+ val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol")
+ if (carbonFlow != null) {
+ super.newTaskTempFile(taskContext, Some(absoluteDir), ext)
+ } else {
+ super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
+ }
+ }
+}
+
+/**
+ * It is a just class to make compile between spark 2.1 and 2.2
+ */
+private trait AbstractCarbonOutputWriter {
+ def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
+ def writeInternal(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+ def write(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+ def writeCarbon(row: InternalRow): Unit
+}
+
+private class CarbonOutputWriter(path: String,
+ context: TaskAttemptContext,
+ fieldTypes: Seq[DataType],
+ taskNo : String,
+ model: CarbonLoadModel)
+ extends OutputWriter with AbstractCarbonOutputWriter {
+
+ val converter = new DataTypeConverterImpl
+
+ val partitions =
+ getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName)
+ val staticPartition: util.HashMap[String, Boolean] = {
+ val staticPart = context.getConfiguration.get("carbon.staticpartition")
+ if (staticPart != null) {
+ ObjectSerializationUtil.convertStringToObject(
+ staticPart).asInstanceOf[util.HashMap[String, Boolean]]
+ } else {
+ null
+ }
+ }
+ lazy val currPartitions: util.List[indexstore.PartitionSpec] = {
+ val currParts = context.getConfiguration.get("carbon.currentpartition")
+ if (currParts != null) {
+ ObjectSerializationUtil.convertStringToObject(
+ currParts).asInstanceOf[util.List[indexstore.PartitionSpec]]
+ } else {
+ new util.ArrayList[indexstore.PartitionSpec]()
+ }
+ }
+ var (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
+ val updatedPartitions = partitions.map(splitPartition)
+ (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
+ } else {
+ (Map.empty[String, String].toArray, Array.empty)
+ }
+
+ private def splitPartition(p: String) = {
+ val value = p.substring(p.indexOf("=") + 1, p.length)
+ val col = p.substring(0, p.indexOf("="))
+ // NUll handling case. For null hive creates with this special name
+ if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
+ (col, null)
+ // we should replace back the special string with empty value.
+ } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+ (col, "")
+ } else {
+ (col, value)
+ }
+ }
+
+ lazy val writePath = {
+ val updatedPath = getPartitionPath(path, context, model)
+ // in case of partition location specified by user then search the partitions from the current
+ // partitions to get the corresponding partitions.
+ if (partitions.isEmpty) {
+ val writeSpec = new indexstore.PartitionSpec(null, updatedPath)
+ val index = currPartitions.indexOf(writeSpec)
+ if (index > -1) {
+ val spec = currPartitions.get(index)
+ updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray
+ partitionData = updatePartitions(updatedPartitions.map(_._2))
+ }
+ }
+ updatedPath
+ }
+
+ val writable = new ObjectArrayWritable
+
+ private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = {
+ model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
+ .getColumnSchemaList.asScala.zipWithIndex.map { case (col, index) =>
+
+ val dataType = if (col.hasEncoding(Encoding.DICTIONARY)) {
+ DataTypes.INT
+ } else if (col.getDataType.equals(DataTypes.TIMESTAMP) ||
+ col.getDataType.equals(DataTypes.DATE)) {
+ DataTypes.LONG
+ } else {
+ col.getDataType
+ }
+ if (staticPartition != null && staticPartition.get(col.getColumnName.toLowerCase)) {
+ val converetedVal =
+ CarbonScalaUtil.convertStaticPartitions(
+ partitionData(index),
+ col,
+ model.getCarbonDataLoadSchema.getCarbonTable)
+ if (col.hasEncoding(Encoding.DICTIONARY)) {
+ converetedVal.toInt.asInstanceOf[AnyRef]
+ } else {
+ DataTypeUtil.getDataBasedOnDataType(
+ converetedVal,
+ dataType,
+ converter)
+ }
+ } else {
+ DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType, converter)
+ }
+ }.toArray
+ }
+
+ private val recordWriter: CarbonRecordWriter = {
+ context.getConfiguration.set("carbon.outputformat.taskno", taskNo)
+ context.getConfiguration.set("carbon.outputformat.writepath",
+ writePath + "/" + model.getSegmentId + "_" + model.getFactTimeStamp + ".tmp")
+ new CarbonTableOutputFormat() {
+ override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+ new Path(path)
+ }
+ }.getRecordWriter(context).asInstanceOf[CarbonRecordWriter]
+ }
+
+ // TODO Implement writesupport interface to support writing Row directly to recordwriter
+ def writeCarbon(row: InternalRow): Unit = {
+ val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
+ var i = 0
+ while (i < fieldTypes.length) {
+ if (!row.isNullAt(i)) {
+ fieldTypes(i) match {
+ case StringType =>
+ data(i) = row.getString(i)
+ case d: DecimalType =>
+ data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+ case other =>
+ data(i) = row.get(i, other)
+ }
+ }
+ i += 1
+ }
+ if (partitionData.length > 0) {
+ System.arraycopy(partitionData, 0, data, fieldTypes.length, partitionData.length)
+ }
+ writable.set(data)
+ recordWriter.write(NullWritable.get(), writable)
+ }
+
+
+ override def writeInternal(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+
+ override def close(): Unit = {
+ recordWriter.close(context)
+ // write partition info to new file.
+ val partitonList = new util.ArrayList[String]()
+ val formattedPartitions =
+ // All dynamic partitions need to be converted to proper format
+ CarbonScalaUtil.updatePartitions(
+ updatedPartitions.toMap,
+ model.getCarbonDataLoadSchema.getCarbonTable)
+ formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
+ SegmentFileStore.writeSegmentFile(
+ model.getTablePath,
+ taskNo,
+ writePath,
+ model.getSegmentId + "_" + model.getFactTimeStamp + "",
+ partitonList)
+ }
+
+ def getPartitionPath(path: String,
+ attemptContext: TaskAttemptContext,
+ model: CarbonLoadModel): String = {
+ if (updatedPartitions.nonEmpty) {
+ val formattedPartitions =
+ // All dynamic partitions need to be converted to proper format
+ CarbonScalaUtil.updatePartitions(
+ updatedPartitions.toMap,
+ model.getCarbonDataLoadSchema.getCarbonTable)
+ val partitionstr = formattedPartitions.map{p =>
+ ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2)
+ }.mkString(CarbonCommonConstants.FILE_SEPARATOR)
+ model.getCarbonDataLoadSchema.getCarbonTable.getTablePath +
+ CarbonCommonConstants.FILE_SEPARATOR + partitionstr
+ } else {
+ var updatedPath = FileFactory.getUpdatedFilePath(path)
+ updatedPath.substring(0, updatedPath.lastIndexOf("/"))
+ }
+ }
+
+ def getPartitionsFromPath(
+ path: String,
+ attemptContext: TaskAttemptContext,
+ model: CarbonLoadModel): Array[String] = {
+ var attemptId = attemptContext.getTaskAttemptID.toString + "/"
+ if (path.indexOf(attemptId) > -1) {
+ val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/"))
+ if (str.length > 0) {
+ str.split("/")
+ } else {
+ Array.empty
+ }
+ } else {
+ Array.empty
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index ec20ec2..d85ef68 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -110,7 +110,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
.tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
alterTableChangeDataTypeModel.databaseName))(sparkSession)
if (isCarbonTable) {
- ExecutedCommandExec(dataTypeChange) :: Nil
+ val carbonTable = CarbonEnv.getCarbonTable(alterTableChangeDataTypeModel.databaseName,
+ alterTableChangeDataTypeModel.tableName)(sparkSession)
+ if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+ throw new MalformedCarbonCommandException(
+ "Unsupported alter operation on Carbon external fileformat table")
+ } else {
+ ExecutedCommandExec(dataTypeChange) :: Nil
+ }
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
@@ -119,7 +126,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
.tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
alterTableAddColumnsModel.databaseName))(sparkSession)
if (isCarbonTable) {
- ExecutedCommandExec(addColumn) :: Nil
+ val carbonTable = CarbonEnv.getCarbonTable(alterTableAddColumnsModel.databaseName,
+ alterTableAddColumnsModel.tableName)(sparkSession)
+ if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+ throw new MalformedCarbonCommandException(
+ "Unsupported alter operation on Carbon external fileformat table")
+ } else {
+ ExecutedCommandExec(addColumn) :: Nil
+ }
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
@@ -128,7 +142,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
.tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
alterTableDropColumnModel.databaseName))(sparkSession)
if (isCarbonTable) {
- ExecutedCommandExec(dropColumn) :: Nil
+ val carbonTable = CarbonEnv.getCarbonTable(alterTableDropColumnModel.databaseName,
+ alterTableDropColumnModel.tableName)(sparkSession)
+ if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+ throw new MalformedCarbonCommandException(
+ "Unsupported alter operation on Carbon external fileformat table")
+ } else {
+ ExecutedCommandExec(dropColumn) :: Nil
+ }
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 4996bec..b2f4505 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
-import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index e0fff08..69fd366 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -33,7 +33,9 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.util.SchemaReader
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
@@ -144,19 +146,24 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
.getOrElse(Map.empty)
}
- def createCarbonTable(tableHeader: CreateTableHeaderContext,
- skewSpecContext: SkewSpecContext,
- bucketSpecContext: BucketSpecContext,
- partitionColumns: ColTypeListContext,
- columns : ColTypeListContext,
- tablePropertyList : TablePropertyListContext,
- locationSpecContext: SqlBaseParser.LocationSpecContext,
- tableComment : Option[String],
- ctas: TerminalNode,
- query: QueryContext) : LogicalPlan = {
+ def createCarbonTable(createTableTuple: (CreateTableHeaderContext, SkewSpecContext,
+ BucketSpecContext, ColTypeListContext, ColTypeListContext, TablePropertyListContext,
+ LocationSpecContext, Option[String], TerminalNode, QueryContext, String)): LogicalPlan = {
// val parser = new CarbonSpark2SqlParser
+ val (tableHeader, skewSpecContext,
+ bucketSpecContext,
+ partitionColumns,
+ columns,
+ tablePropertyList,
+ locationSpecContext,
+ tableComment,
+ ctas,
+ query,
+ provider) = createTableTuple
+
val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
+
// TODO: implement temporary tables
if (temp) {
throw new ParseException(
@@ -256,13 +263,27 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession),
tableIdentifier.table)
val table = try {
- SchemaReader.getTableInfo(identifier)
- } catch {
+ val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
+ if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath)) &&
+ provider.equalsIgnoreCase("'Carbonfile'")) {
+ SchemaReader.inferSchema(identifier)
+ }
+ else {
+ SchemaReader.getTableInfo(identifier)
+ }
+ }
+ catch {
case e: Throwable =>
operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader)
}
// set "_external" property, so that DROP TABLE will not delete the data
- table.getFactTable.getTableProperties.put("_external", "true")
+ if (provider.equalsIgnoreCase("'Carbonfile'")) {
+ table.getFactTable.getTableProperties.put("_filelevelexternal", "true")
+ table.getFactTable.getTableProperties.put("_external", "false")
+ } else {
+ table.getFactTable.getTableProperties.put("_external", "true")
+ table.getFactTable.getTableProperties.put("_filelevelexternal", "false")
+ }
table
} else {
// prepare table model of the collected tokens
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index ba2fe947..c6bab9e 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -326,18 +326,13 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
val fileStorage = helper.getFileStorage(ctx.createFileFormat)
if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+ fileStorage.equalsIgnoreCase("'Carbonfile'") ||
fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
- helper.createCarbonTable(
- tableHeader = ctx.createTableHeader,
- skewSpecContext = ctx.skewSpec,
- bucketSpecContext = ctx.bucketSpec,
- partitionColumns = ctx.partitionColumns,
- columns = ctx.columns,
- tablePropertyList = ctx.tablePropertyList,
- locationSpecContext = ctx.locationSpec(),
- tableComment = Option(ctx.STRING()).map(string),
- ctas = ctx.AS,
- query = ctx.query)
+ val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec,
+ ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(),
+ Option(ctx.STRING()).map(string),
+ ctx.AS, ctx.query, fileStorage)
+ helper.createCarbonTable(createTableTuple)
} else {
super.visitCreateTable(ctx)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index f033a8e..c28e4ba 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -325,18 +325,12 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
val fileStorage = helper.getFileStorage(ctx.createFileFormat)
if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+ fileStorage.equalsIgnoreCase("'Carbonfile'") ||
fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
- helper.createCarbonTable(
- tableHeader = ctx.createTableHeader,
- skewSpecContext = ctx.skewSpec,
- bucketSpecContext = ctx.bucketSpec,
- partitionColumns = ctx.partitionColumns,
- columns = ctx.columns,
- tablePropertyList = ctx.tablePropertyList,
- locationSpecContext = ctx.locationSpec(),
- tableComment = Option(ctx.STRING()).map(string),
- ctas = ctx.AS,
- query = ctx.query)
+ val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
+ ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
+ Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
+ helper.createCarbonTable(createTableTuple)
} else {
super.visitCreateHiveTable(ctx)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index d09c9b5..5831f3e 100644
--- a/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -14,4 +14,5 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ------------------------------------------------------------------------
-org.apache.spark.sql.CarbonSource
\ No newline at end of file
+org.apache.spark.sql.CarbonSource
+org.apache.spark.sql.SparkCarbonFileFormat
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
index 0ac6f38..b15dafd 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -113,9 +114,12 @@ public class CSVCarbonWriterSuite {
writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
}
writer.close();
- } catch (Exception e) {
+ } catch (IOException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
+ } catch (InvalidLoadOptionException l) {
+ l.printStackTrace();
+ Assert.fail(l.getMessage());
}
File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
[6/6] carbondata git commit: [CARBONDATA-2224][File Level Reader
Support] External File level reader support
Posted by ja...@apache.org.
[CARBONDATA-2224][File Level Reader Support] External File level reader support
File level reader reads any carbondata file placed in any external file path.
This closes #2055
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7a124ecd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7a124ecd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7a124ecd
Branch: refs/heads/carbonfile
Commit: 7a124ecd87769c0197ae67a0726c5abb4745d3a8
Parents: a386f1f
Author: sounakr <so...@gmail.com>
Authored: Sat Feb 24 07:55:14 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 16 17:33:43 2018 +0800
----------------------------------------------------------------------
.../core/metadata/schema/table/CarbonTable.java | 6 +
.../apache/carbondata/core/util/CarbonUtil.java | 209 +++++-
.../hadoop/api/CarbonFileInputFormat.java | 682 +++++++++++++++++++
.../carbondata/hadoop/util/SchemaReader.java | 17 +-
integration/spark-common-test/pom.xml | 6 +
...FileInputFormatWithExternalCarbonTable.scala | 240 +++++++
...tCreateTableUsingSparkCarbonFileFormat.scala | 327 +++++++++
...tSparkCarbonFileFormatWithSparkSession.scala | 176 +++++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 64 +-
.../VectorizedCarbonRecordReader.java | 22 +-
.../management/CarbonLoadDataCommand.scala | 4 +-
.../command/table/CarbonDropTableCommand.scala | 2 +-
.../datasources/CarbonFileFormat.scala | 443 ------------
.../datasources/SparkCarbonFileFormat.scala | 269 ++++++++
.../datasources/SparkCarbonTableFormat.scala | 443 ++++++++++++
.../sql/execution/strategy/DDLStrategy.scala | 27 +-
.../spark/sql/hive/CarbonAnalysisRules.scala | 2 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 47 +-
.../spark/sql/hive/CarbonSessionState.scala | 17 +-
.../spark/sql/hive/CarbonSessionState.scala | 16 +-
....apache.spark.sql.sources.DataSourceRegister | 3 +-
.../sdk/file/CSVCarbonWriterSuite.java | 6 +-
22 files changed, 2522 insertions(+), 506 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f14672f..278dc96 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -826,6 +826,12 @@ public class CarbonTable implements Serializable {
return external != null && external.equalsIgnoreCase("true");
}
+ public boolean isFileLevelExternalTable() {
+ String external = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal");
+ return external != null && external.equalsIgnoreCase("true");
+ }
+
+
public long size() throws IOException {
Map<String, Long> dataIndexSize = CarbonUtil.calculateDataIndexSize(this);
Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b961b60..ff49edf 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
@@ -52,18 +53,26 @@ import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypeAdapter;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
import org.apache.carbondata.core.reader.ThriftReader;
import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
@@ -77,6 +86,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockletHeader;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
+import org.apache.carbondata.format.FileHeader;
+
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -1279,7 +1290,7 @@ public final class CarbonUtil {
int counter = 0;
for (int i = 0; i < wrapperColumnSchemaList.size(); i++) {
if (CarbonUtil.hasEncoding(wrapperColumnSchemaList.get(i).getEncodingList(),
- org.apache.carbondata.core.metadata.encoder.Encoding.DICTIONARY)) {
+ Encoding.DICTIONARY)) {
cardinality.add(dictionaryColumnCardinality[counter]);
counter++;
} else if (!wrapperColumnSchemaList.get(i).isDimensionColumn()) {
@@ -2068,6 +2079,202 @@ public final class CarbonUtil {
return tableInfo;
}
+ public static ColumnSchema thriftColumnSchmeaToWrapperColumnSchema(
+ org.apache.carbondata.format.ColumnSchema externalColumnSchema) {
+ ColumnSchema wrapperColumnSchema = new ColumnSchema();
+ wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
+ wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
+ wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar());
+ DataType dataType = thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type);
+ if (DataTypes.isDecimal(dataType)) {
+ DecimalType decimalType = (DecimalType) dataType;
+ decimalType.setPrecision(externalColumnSchema.getPrecision());
+ decimalType.setScale(externalColumnSchema.getScale());
+ }
+ wrapperColumnSchema.setDataType(dataType);
+ wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension());
+ List<Encoding> encoders = new ArrayList<Encoding>();
+ for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) {
+ encoders.add(fromExternalToWrapperEncoding(encoder));
+ }
+ wrapperColumnSchema.setEncodingList(encoders);
+ wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child());
+ wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision());
+ wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id());
+ wrapperColumnSchema.setScale(externalColumnSchema.getScale());
+ wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
+ wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
+ Map<String, String> properties = externalColumnSchema.getColumnProperties();
+ if (properties != null) {
+ if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
+ wrapperColumnSchema.setSortColumn(true);
+ }
+ }
+ wrapperColumnSchema.setFunction(externalColumnSchema.getAggregate_function());
+ List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation =
+ externalColumnSchema.getParentColumnTableRelations();
+ if (null != parentColumnTableRelation) {
+ wrapperColumnSchema.setParentColumnTableRelations(
+ fromThriftToWrapperParentTableColumnRelations(parentColumnTableRelation));
+ }
+ return wrapperColumnSchema;
+ }
+
+ static List<ParentColumnTableRelation> fromThriftToWrapperParentTableColumnRelations(
+ List<org.apache.carbondata.format.ParentColumnTableRelation> thirftParentColumnRelation) {
+ List<ParentColumnTableRelation> parentColumnTableRelationList = new ArrayList<>();
+ for (org.apache.carbondata.format.ParentColumnTableRelation carbonTableRelation :
+ thirftParentColumnRelation) {
+ RelationIdentifier relationIdentifier =
+ new RelationIdentifier(carbonTableRelation.getRelationIdentifier().getDatabaseName(),
+ carbonTableRelation.getRelationIdentifier().getTableName(),
+ carbonTableRelation.getRelationIdentifier().getTableId());
+ ParentColumnTableRelation parentColumnTableRelation =
+ new ParentColumnTableRelation(relationIdentifier, carbonTableRelation.getColumnId(),
+ carbonTableRelation.getColumnName());
+ parentColumnTableRelationList.add(parentColumnTableRelation);
+ }
+ return parentColumnTableRelationList;
+ }
+
+ static Encoding fromExternalToWrapperEncoding(
+ org.apache.carbondata.format.Encoding encoderThrift) {
+ switch (encoderThrift) {
+ case DICTIONARY:
+ return Encoding.DICTIONARY;
+ case DELTA:
+ return Encoding.DELTA;
+ case RLE:
+ return Encoding.RLE;
+ case INVERTED_INDEX:
+ return Encoding.INVERTED_INDEX;
+ case BIT_PACKED:
+ return Encoding.BIT_PACKED;
+ case DIRECT_DICTIONARY:
+ return Encoding.DIRECT_DICTIONARY;
+ default:
+ throw new IllegalArgumentException(encoderThrift.toString() + " is not supported");
+ }
+ }
+
+ static DataType thriftDataTyopeToWrapperDataType(
+ org.apache.carbondata.format.DataType dataTypeThrift) {
+ switch (dataTypeThrift) {
+ case BOOLEAN:
+ return DataTypes.BOOLEAN;
+ case STRING:
+ return DataTypes.STRING;
+ case SHORT:
+ return DataTypes.SHORT;
+ case INT:
+ return DataTypes.INT;
+ case LONG:
+ return DataTypes.LONG;
+ case DOUBLE:
+ return DataTypes.DOUBLE;
+ case DECIMAL:
+ return DataTypes.createDefaultDecimalType();
+ case DATE:
+ return DataTypes.DATE;
+ case TIMESTAMP:
+ return DataTypes.TIMESTAMP;
+ case ARRAY:
+ return DataTypes.createDefaultArrayType();
+ case STRUCT:
+ return DataTypes.createDefaultStructType();
+ default:
+ return DataTypes.STRING;
+ }
+ }
+
+ public static List<String> getFilePathExternalFilePath(String path) {
+
+ // return the list of carbondata files in the given path.
+ CarbonFile segment = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+ CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+
+ if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+ return true;
+ }
+ return false;
+ }
+ });
+ List<String> filePaths = new ArrayList<>(dataFiles.length);
+ for (CarbonFile dfiles : dataFiles) {
+ filePaths.add(dfiles.getAbsolutePath());
+ }
+ return filePaths;
+ }
+
+ /**
+ * This method will read the schema file from a given path
+ *
+ * @param schemaFilePath
+ * @return
+ */
+ public static org.apache.carbondata.format.TableInfo inferSchemaFileExternalTable(
+ String carbonDataFilePath, AbsoluteTableIdentifier absoluteTableIdentifier,
+ boolean schemaExists) throws IOException {
+ TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+ public org.apache.thrift.TBase<org.apache.carbondata.format.TableInfo,
+ org.apache.carbondata.format.TableInfo._Fields> create() {
+ return new org.apache.carbondata.format.TableInfo();
+ }
+ };
+ if (schemaExists == false) {
+ List<String> filePaths =
+ getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
+ String fistFilePath = null;
+ try {
+ fistFilePath = filePaths.get(0);
+ } catch (Exception e) {
+ LOGGER.error("CarbonData file is not present in the table location");
+ }
+ CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(fistFilePath);
+ FileHeader fileHeader = carbonHeaderReader.readHeader();
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
+ for (int i = 0; i < table_columns.size(); i++) {
+ ColumnSchema col = thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i));
+ col.setColumnReferenceId(col.getColumnUniqueId());
+ columnSchemaList.add(col);
+ }
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setTableName(absoluteTableIdentifier.getTableName());
+ tableSchema.setBucketingInfo(null);
+ tableSchema.setSchemaEvalution(null);
+ tableSchema.setTableId(UUID.randomUUID().toString());
+ tableSchema.setListOfColumns(columnSchemaList);
+
+ ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter =
+ new ThriftWrapperSchemaConverterImpl();
+ SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry();
+ schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis());
+ SchemaEvolution schemaEvol = new SchemaEvolution();
+ List<SchemaEvolutionEntry> schEntryList = new ArrayList<>();
+ schEntryList.add(schemaEvolutionEntry);
+ schemaEvol.setSchemaEvolutionEntryList(schEntryList);
+ tableSchema.setSchemaEvalution(schemaEvol);
+
+ org.apache.carbondata.format.TableSchema thriftFactTable =
+ thriftWrapperSchemaConverter.fromWrapperToExternalTableSchema(tableSchema);
+ org.apache.carbondata.format.TableInfo tableInfo =
+ new org.apache.carbondata.format.TableInfo(thriftFactTable,
+ new ArrayList<org.apache.carbondata.format.TableSchema>());
+
+ tableInfo.setDataMapSchemas(null);
+ return tableInfo;
+ } else {
+ ThriftReader thriftReader = new ThriftReader(carbonDataFilePath, createTBase);
+ thriftReader.open();
+ org.apache.carbondata.format.TableInfo tableInfo =
+ (org.apache.carbondata.format.TableInfo) thriftReader.read();
+ thriftReader.close();
+ return tableInfo;
+ }
+ }
+
public static void dropDatabaseDirectory(String databasePath)
throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
new file mode 100644
index 0000000..b86b1cc
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.SingleTableProvider;
+import org.apache.carbondata.core.scan.filter.TableProvider;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+
+/**
+ * Input format of CarbonData file.
+ *
+ * @param <T>
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable {
+
+ public static final String READ_SUPPORT_CLASS = "carbon.read.support.class";
+ // comma separated list of input segment numbers
+ public static final String INPUT_SEGMENT_NUMBERS =
+ "mapreduce.input.carboninputformat.segmentnumbers";
+ private static final String VALIDATE_INPUT_SEGMENT_IDs =
+ "mapreduce.input.carboninputformat.validsegments";
+ // comma separated list of input files
+ public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
+ private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
+ private static final Log LOG = LogFactory.getLog(CarbonFileInputFormat.class);
+ private static final String FILTER_PREDICATE =
+ "mapreduce.input.carboninputformat.filter.predicate";
+ private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
+ private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
+ private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
+ private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
+ private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
+ public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
+ public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
+ private static final String PARTITIONS_TO_PRUNE =
+ "mapreduce.input.carboninputformat.partitions.to.prune";
+ public static final String UPADTE_T =
+ "mapreduce.input.carboninputformat.partitions.to.prune";
+
+ // a cache for carbon table, it will be used in task side
+ private CarbonTable carbonTable;
+
+ /**
+ * Set the `tableInfo` in `configuration`
+ */
+ public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
+ throws IOException {
+ if (null != tableInfo) {
+ configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
+ }
+ }
+
+ /**
+ * Get TableInfo object from `configuration`
+ */
+ private static TableInfo getTableInfo(Configuration configuration) throws IOException {
+ String tableInfoStr = configuration.get(TABLE_INFO);
+ if (tableInfoStr == null) {
+ return null;
+ } else {
+ TableInfo output = new TableInfo();
+ output.readFields(
+ new DataInputStream(
+ new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
+ return output;
+ }
+ }
+
+
+ public static void setTablePath(Configuration configuration, String tablePath) {
+ configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+ }
+
+ public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
+ configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
+ }
+
+
+ public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
+ throws IOException {
+ if (dataMapJob != null) {
+ String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
+ configuration.set(DATA_MAP_DSTR, toString);
+ }
+ }
+
+ public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
+ String jobString = configuration.get(DATA_MAP_DSTR);
+ if (jobString != null) {
+ return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
+ }
+ return null;
+ }
+
+ /**
+ * It sets unresolved filter expression.
+ *
+ * @param configuration
+ * @param filterExpression
+ */
+ public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
+ if (filterExpression == null) {
+ return;
+ }
+ try {
+ String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
+ configuration.set(FILTER_PREDICATE, filterString);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while setting filter expression to Job", e);
+ }
+ }
+
+ public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
+ if (projection == null || projection.isEmpty()) {
+ return;
+ }
+ String[] allColumns = projection.getAllColumns();
+ StringBuilder builder = new StringBuilder();
+ for (String column : allColumns) {
+ builder.append(column).append(",");
+ }
+ String columnString = builder.toString();
+ columnString = columnString.substring(0, columnString.length() - 1);
+ configuration.set(COLUMN_PROJECTION, columnString);
+ }
+
+ public static String getColumnProjection(Configuration configuration) {
+ return configuration.get(COLUMN_PROJECTION);
+ }
+
+
+ /**
+ * Set list of segments to access
+ */
+ public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
+ configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
+ }
+
+ /**
+ * Set `CARBON_INPUT_SEGMENTS` from property to configuration
+ */
+ public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
+ String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
+ String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
+ String segmentNumbersFromProperty = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
+ if (!segmentNumbersFromProperty.trim().equals("*")) {
+ CarbonFileInputFormat
+ .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
+ }
+ }
+
+ /**
+ * set list of segment to access
+ */
+ public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
+ configuration.set(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
+ }
+
+ /**
+ * get list of segment to access
+ */
+ public static boolean getValidateSegmentsToAccess(Configuration configuration) {
+ return configuration.get(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
+ .equalsIgnoreCase("true");
+ }
+
+ /**
+ * set list of partitions to prune
+ */
+ public static void setPartitionsToPrune(Configuration configuration,
+ List<PartitionSpec> partitions) {
+ if (partitions == null) {
+ return;
+ }
+ try {
+ String partitionString =
+ ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
+ configuration.set(PARTITIONS_TO_PRUNE, partitionString);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while setting patition information to Job", e);
+ }
+ }
+
+ /**
+ * get list of partitions to prune
+ */
+ private static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
+ throws IOException {
+ String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
+ if (partitionString != null) {
+ return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
+ }
+ return null;
+ }
+
+ public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+ throws IOException {
+ String tablePath = configuration.get(INPUT_DIR, "");
+ try {
+ return AbsoluteTableIdentifier
+ .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
+ } catch (InvalidConfigurationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * Configurations FileInputFormat.INPUT_DIR
+ * are used to get table path to read.
+ *
+ * @param job
+ * @return List<InputSplit> list of CarbonInputSplit
+ * @throws IOException
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+ CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
+ if (null == carbonTable) {
+ throw new IOException("Missing/Corrupt schema file for table.");
+ }
+ // TableDataMap blockletMap = DataMapStoreManager.getInstance()
+ // .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+
+ if (getValidateSegmentsToAccess(job.getConfiguration())) {
+ // get all valid segments and set them into the configuration
+ // check for externalTable segment (Segment_null)
+ // process and resolve the expression
+ Expression filter = getFilterPredicates(job.getConfiguration());
+ TableProvider tableProvider = new SingleTableProvider(carbonTable);
+ // this will be null in case of corrupt schema file.
+ PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
+
+ FilterResolverIntf filterInterface = CarbonInputFormatUtil
+ .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
+
+ String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null");
+ FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
+ if (FileFactory.isFileExist(segmentDir, fileType)) {
+ // if external table Segments are found, add it to the List
+ List<Segment> externalTableSegments = new ArrayList<Segment>();
+ Segment seg = new Segment("null", null);
+ externalTableSegments.add(seg);
+
+ Map<String, String> indexFiles =
+ new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir);
+
+ if (indexFiles.size() == 0) {
+ throw new RuntimeException("Index file not present to read the carbondata file");
+ }
+ // do block filtering and get split
+ List<InputSplit> splits =
+ getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
+
+ return splits;
+ }
+ }
+ return null;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
+ * are used to get table path to read.
+ *
+ * @return
+ * @throws IOException
+ */
+ private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
+ List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
+ List<Integer> oldPartitionIdList) throws IOException {
+
+ List<InputSplit> result = new LinkedList<InputSplit>();
+ UpdateVO invalidBlockVOForSegmentId = null;
+ Boolean isIUDTable = false;
+
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+ SegmentUpdateStatusManager updateStatusManager =
+ new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+ isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
+
+ // for each segment fetch blocks matching filter in Driver BTree
+ List<CarbonInputSplit> dataBlocksOfSegment =
+ getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+ validSegments, partitionInfo, oldPartitionIdList);
+ for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
+
+ // Get the UpdateVO for those tables on which IUD operations being performed.
+ if (isIUDTable) {
+ invalidBlockVOForSegmentId =
+ updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
+ }
+ String[] deleteDeltaFilePath = null;
+ if (isIUDTable) {
+ // In case IUD is not performed in this table avoid searching for
+ // invalidated blocks.
+ if (CarbonUtil
+ .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
+ invalidBlockVOForSegmentId, updateStatusManager)) {
+ continue;
+ }
+ // When iud is done then only get delete delta files for a block
+ try {
+ deleteDeltaFilePath = updateStatusManager
+ .getDeleteDeltaFilePath(inputSplit.getPath().toString(), inputSplit.getSegmentId());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
+ result.add(inputSplit);
+ }
+ return result;
+ }
+
+ protected Expression getFilterPredicates(Configuration configuration) {
+ try {
+ String filterExprString = configuration.get(FILTER_PREDICATE);
+ if (filterExprString == null) {
+ return null;
+ }
+ Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+ return (Expression) filter;
+ } catch (IOException e) {
+ throw new RuntimeException("Error while reading filter expression", e);
+ }
+ }
+
+ /**
+ * get data blocks of given segment
+ */
+ private List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
+ AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+ BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
+ List<Integer> oldPartitionIdList) throws IOException {
+
+ QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+ QueryStatistic statistic = new QueryStatistic();
+
+ // get tokens for all the required FileSystem for table path
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+ new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+ boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+ CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+ DataMapExprWrapper dataMapExprWrapper =
+ DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
+ DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
+ List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
+ List<ExtendedBlocklet> prunedBlocklets;
+ if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
+ DistributableDataMapFormat datamapDstr =
+ new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
+ segmentIds, partitionsToPrune,
+ BlockletDataMapFactory.class.getName());
+ prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+ // Apply expression on the blocklets.
+ prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
+ } else {
+ prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
+ }
+
+ List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+ int partitionIndex = 0;
+ List<Integer> partitionIdList = new ArrayList<>();
+ if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+ partitionIdList = partitionInfo.getPartitionIds();
+ }
+ for (ExtendedBlocklet blocklet : prunedBlocklets) {
+ long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
+ CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
+
+ // OldPartitionIdList is only used in alter table partition command because it change
+ // partition info first and then read data.
+ // For other normal query should use newest partitionIdList
+ if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+ if (oldPartitionIdList != null) {
+ partitionIndex = oldPartitionIdList.indexOf((int)partitionId);
+ } else {
+ partitionIndex = partitionIdList.indexOf((int)partitionId);
+ }
+ }
+ if (partitionIndex != -1) {
+ // matchedPartitions variable will be null in two cases as follows
+ // 1. the table is not a partition table
+ // 2. the table is a partition table, and all partitions are matched by query
+ // for partition table, the task id of carbaondata file name is the partition id.
+ // if this partition is not required, here will skip it.
+ if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
+ CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
+ if (inputSplit != null) {
+ resultFilterredBlocks.add(inputSplit);
+ }
+ }
+ }
+ }
+ statistic
+ .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+ recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+ return resultFilterredBlocks;
+ }
+
+ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
+ CarbonInputSplit split =
+ CarbonInputSplit.from(blocklet.getSegmentId(),
+ blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0,
+ blocklet.getLength(), blocklet.getLocations()),
+ ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
+ blocklet.getDataMapWriterPath());
+ split.setDetailInfo(blocklet.getDetailInfo());
+ return split;
+ }
+
+ @Override
+ public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
+ CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
+ return new CarbonRecordReader<T>(queryModel, readSupport);
+ }
+
+ public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
+ TableProvider tableProvider = new SingleTableProvider(carbonTable);
+
+ // query plan includes projection column
+ String projectionString = getColumnProjection(configuration);
+ String[] projectionColumnNames = null;
+ if (projectionString != null) {
+ projectionColumnNames = projectionString.split(",");
+ }
+ QueryModel queryModel = carbonTable.createQueryWithProjection(
+ projectionColumnNames, getDataTypeConverter(configuration));
+
+ // set the filter to the query model in order to filter blocklet before scan
+ Expression filter = getFilterPredicates(configuration);
+ boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
+ // getAllMeasures returns list of visible and invisible columns
+ boolean[] isFilterMeasures =
+ new boolean[carbonTable.getAllMeasures().size()];
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions,
+ isFilterMeasures);
+ queryModel.setIsFilterDimensions(isFilterDimensions);
+ queryModel.setIsFilterMeasures(isFilterMeasures);
+ FilterResolverIntf filterIntf = CarbonInputFormatUtil
+ .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
+ queryModel.setFilterExpressionResolverTree(filterIntf);
+
+ // update the file level index store if there are invalid segment
+ if (inputSplit instanceof CarbonMultiBlockSplit) {
+ CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+ List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+ if (invalidSegments.size() > 0) {
+ queryModel.setInvalidSegmentIds(invalidSegments);
+ }
+ List<UpdateVO> invalidTimestampRangeList =
+ split.getAllSplits().get(0).getInvalidTimestampRange();
+ if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
+ queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
+ }
+ }
+ return queryModel;
+ }
+
+ private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+ CarbonTable carbonTableTemp;
+ if (carbonTable == null) {
+ // carbon table should be created either from deserialized table info (schema saved in
+ // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+ TableInfo tableInfo = getTableInfo(configuration);
+ CarbonTable localCarbonTable;
+ if (tableInfo != null) {
+ localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+ } else {
+ String schemaPath = CarbonTablePath
+ .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
+ if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
+ TableInfo tableInfoInfer =
+ SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration));
+ localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
+ } else {
+ localCarbonTable =
+ SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration));
+ }
+ }
+ this.carbonTable = localCarbonTable;
+ return localCarbonTable;
+ } else {
+ carbonTableTemp = this.carbonTable;
+ return carbonTableTemp;
+ }
+ }
+
+
+ public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+ String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+ //By default it uses dictionary decoder read class
+ CarbonReadSupport<T> readSupport = null;
+ if (readSupportClass != null) {
+ try {
+ Class<?> myClass = Class.forName(readSupportClass);
+ Constructor<?> constructor = myClass.getConstructors()[0];
+ Object object = constructor.newInstance();
+ if (object instanceof CarbonReadSupport) {
+ readSupport = (CarbonReadSupport) object;
+ }
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Class " + readSupportClass + "not found", ex);
+ } catch (Exception ex) {
+ LOG.error("Error while creating " + readSupportClass, ex);
+ }
+ } else {
+ readSupport = new DictionaryDecodeReadSupport<>();
+ }
+ return readSupport;
+ }
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path filename) {
+ try {
+ // Don't split the file if it is local file system
+ FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
+ if (fileSystem instanceof LocalFileSystem) {
+ return false;
+ }
+ } catch (Exception e) {
+ return true;
+ }
+ return true;
+ }
+
+ /**
+ * return valid segment to access
+ */
+ private String[] getSegmentsToAccess(JobContext job) {
+ String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
+ if (segmentString.trim().isEmpty()) {
+ return new String[0];
+ }
+ return segmentString.split(",");
+ }
+
+ public static DataTypeConverter getDataTypeConverter(Configuration configuration)
+ throws IOException {
+ String converter = configuration.get(CARBON_CONVERTER);
+ if (converter == null) {
+ return new DataTypeConverterImpl();
+ }
+ return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
+ }
+
+ public static void setDatabaseName(Configuration configuration, String databaseName) {
+ if (null != databaseName) {
+ configuration.set(DATABASE_NAME, databaseName);
+ }
+ }
+
+ public static String getDatabaseName(Configuration configuration)
+ throws InvalidConfigurationException {
+ String databseName = configuration.get(DATABASE_NAME);
+ if (null == databseName) {
+ throw new InvalidConfigurationException("Database name is not set.");
+ }
+ return databseName;
+ }
+
+ public static void setTableName(Configuration configuration, String tableName) {
+ if (null != tableName) {
+ configuration.set(TABLE_NAME, tableName);
+ }
+ }
+
+ public static String getTableName(Configuration configuration)
+ throws InvalidConfigurationException {
+ String tableName = configuration.get(TABLE_NAME);
+ if (tableName == null) {
+ throw new InvalidConfigurationException("Table name is not set");
+ }
+ return tableName;
+ }
+
+ public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(
+ org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index dfa8dd1..ab7c333 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
/**
* TODO: It should be removed after store manager implementation.
@@ -59,6 +58,7 @@ public class SchemaReader {
throw new IOException("File does not exist: " + schemaFilePath);
}
}
+
/**
* the method returns the Wrapper TableInfo
*
@@ -79,4 +79,19 @@ public class SchemaReader {
carbonTableIdentifier.getTableName(),
identifier.getTablePath());
}
+
+
+ public static TableInfo inferSchema(AbsoluteTableIdentifier identifier)
+ throws IOException {
+ // This routine is going to infer schema from the carbondata file footer
+ // Convert the ColumnSchema -> TableSchema -> TableInfo.
+ // Return the TableInfo.
+ org.apache.carbondata.format.TableInfo tableInfo =
+ CarbonUtil.inferSchemaFileExternalTable(identifier.getTablePath(), identifier, false);
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ TableInfo wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, identifier.getDatabaseName(),
+ identifier.getTableName(), identifier.getTablePath());
+ return wrapperTableInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index b7f19fd..1c6cee9 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -105,6 +105,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-store-sdk</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
new file mode 100644
index 0000000..8b1f63f
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll {
+
+ var writerPath = new File(this.getClass.getResource("/").getPath
+ +
+ "../." +
+ "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+ .getCanonicalPath
+ //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ writerPath = writerPath.replace("\\", "/");
+
+
+ def buildTestData(persistSchema:Boolean) = {
+
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val schema = new StringBuilder()
+ .append("[ \n")
+ .append(" {\"name\":\"string\"},\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"height\":\"double\"}\n")
+ .append("]")
+ .toString()
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ if (persistSchema) {
+ builder.persistSchemaFile(true)
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ } else {
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ }
+
+ var i = 0
+ while (i < 100) {
+ writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+ i += 1
+ }
+ writer.close()
+ } catch {
+ case ex: Exception => None
+ case _ => None
+ }
+ }
+
+ def cleanTestData() = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ }
+
+ def deleteIndexFile(path: String, extension: String) : Unit = {
+ val file: CarbonFile = FileFactory
+ .getCarbonFile(path, FileFactory.getFileType(path))
+
+ for (eachDir <- file.listFiles) {
+ if (!eachDir.isDirectory) {
+ if (eachDir.getName.endsWith(extension)) {
+ CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+ }
+ } else {
+ deleteIndexFile(eachDir.getPath, extension)
+ }
+ }
+ }
+
+ override def beforeAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ // create carbon table and insert data
+ }
+
+ override def afterAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ }
+
+ //TO DO, need to remove segment dependency and tableIdentifier Dependency
+ test("read carbondata files (sdk Writer Output) using the Carbonfile ") {
+ buildTestData(false)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //new provider Carbonfile
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("Describe formatted sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable limit 3").show(false)
+
+ sql("select name from sdkOutputTable").show(false)
+
+ sql("select age from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false)
+
+ sql("select * from sdkOutputTable where name = 'robot3'").show(200, false)
+
+ sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false)
+
+ sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false)
+
+ sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false)
+
+ sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false)
+
+ sql("select count(*) from sdkOutputTable").show(200, false)
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ test("should not allow to alter datasource carbontable ") {
+ buildTestData(false)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //data source file format
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ val exception = intercept[MalformedCarbonCommandException]
+ {
+ sql("Alter table sdkOutputTable change age age BIGINT")
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported alter operation on Carbon external fileformat table"))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ test("Read sdk writer output file without index file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //data source file format
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ //org.apache.spark.SparkException: Index file not present to read the carbondata file
+ val exception = intercept[java.lang.RuntimeException]
+ {
+ sql("select * from sdkOutputTable").show(false)
+ }
+ assert(exception.getMessage().contains("Index file not present to read the carbondata file"))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+
+ test("Read sdk writer output file without Carbondata file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ val exception = intercept[Exception] {
+ // data source file format
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ |'$writerPath' """.stripMargin)
+ }
+ assert(exception.getMessage()
+ .contains("Operation not allowed: Invalid table path provided:"))
+
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+
+ test("Read sdk writer output file without any file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+ deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ val exception = intercept[Exception] {
+ //data source file format
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(false)
+ }
+ assert(exception.getMessage()
+ .contains("Operation not allowed: Invalid table path provided:"))
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
new file mode 100644
index 0000000..d284e50
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAndAfterAll {
+
+
+ override def beforeAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ }
+
+ override def afterAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ }
+
+ var writerPath = new File(this.getClass.getResource("/").getPath
+ +
+ "../." +
+ "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+ .getCanonicalPath
+ //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ writerPath = writerPath.replace("\\", "/");
+
+ val filePath = writerPath + "/Fact/Part0/Segment_null/"
+
+ def buildTestData(persistSchema:Boolean) = {
+
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val schema = new StringBuilder()
+ .append("[ \n")
+ .append(" {\"name\":\"string\"},\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"height\":\"double\"}\n")
+ .append("]")
+ .toString()
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ if (persistSchema) {
+ builder.persistSchemaFile(true)
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ } else {
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ }
+
+ var i = 0
+ while (i < 100) {
+ writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+ i += 1
+ }
+ writer.close()
+ } catch {
+ case ex: Exception => None
+ case _ => None
+ }
+ }
+
+ def cleanTestData() = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ }
+
+ def deleteIndexFile(path: String, extension: String) : Unit = {
+ val file: CarbonFile = FileFactory
+ .getCarbonFile(path, FileFactory.getFileType(path))
+
+ for (eachDir <- file.listFiles) {
+ if (!eachDir.isDirectory) {
+ if (eachDir.getName.endsWith(extension)) {
+ CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+ }
+ } else {
+ deleteIndexFile(eachDir.getPath, extension)
+ }
+ }
+ }
+
+ //TO DO, need to remove segment dependency and tableIdentifier Dependency
+ test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") {
+ buildTestData(false)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //data source file format
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+
+ sql("Describe formatted sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable limit 3").show(false)
+
+ sql("select name from sdkOutputTable").show(false)
+
+ sql("select age from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false)
+
+ sql("select * from sdkOutputTable where name = 'robot3'").show(200,false)
+
+ sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false)
+
+ sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false)
+
+ sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false)
+
+ sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false)
+
+ sql("select count(*) from sdkOutputTable").show(200,false)
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+
+
+ test("should not allow to alter datasource carbontable ") {
+ buildTestData(false)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+
+ val exception = intercept[MalformedCarbonCommandException]
+ {
+ sql("Alter table sdkOutputTable change age age BIGINT")
+ }
+ assert(exception.getMessage().contains("Unsupported alter operation on hive table"))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+
+ test("Read sdk writer output file without Carbondata file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ val exception = intercept[org.apache.spark.SparkException] {
+ // data source file format
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+ }
+ assert(exception.getMessage()
+ .contains("CarbonData file is not present in the location mentioned in DDL"))
+
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+
+
+ test("Read sdk writer output file without any file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ val exception = intercept[org.apache.spark.SparkException] {
+ //data source file format
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+
+ sql("select * from sdkOutputTable").show(false)
+ }
+ assert(exception.getMessage()
+ .contains("CarbonData file is not present in the location mentioned in DDL"))
+
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+
+ test("Read sdk writer output file withSchema") {
+ buildTestData(true)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //data source file format
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+
+ sql("Describe formatted sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable limit 3").show(false)
+
+ sql("select name from sdkOutputTable").show(false)
+
+ sql("select age from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false)
+
+ sql("select * from sdkOutputTable where name = 'robot3'").show(200, false)
+
+ sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false)
+
+ sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false)
+
+ sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false)
+
+ sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false)
+
+ sql("select count(*) from sdkOutputTable").show(200, false)
+
+ sql("DROP TABLE sdkOutputTable")
+
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+
+ test("Read sdk writer output file without index file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+ //org.apache.spark.SparkException: Index file not present to read the carbondata file
+ val exception = intercept[org.apache.spark.SparkException]
+ {
+ sql("select * from sdkOutputTable").show(false)
+ }
+ assert(exception.getMessage().contains("Index file not present to read the carbondata file"))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
new file mode 100644
index 0000000..9a46676
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+object TestSparkCarbonFileFormatWithSparkSession {
+
+ var writerPath = new File(this.getClass.getResource("/").getPath
+ +
+ "../." +
+ "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+ .getCanonicalPath
+ //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ writerPath = writerPath.replace("\\", "/");
+
+ val filePath = writerPath + "/Fact/Part0/Segment_null/"
+
+ def buildTestData(persistSchema:Boolean) = {
+
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val schema = new StringBuilder()
+ .append("[ \n")
+ .append(" {\"name\":\"string\"},\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"height\":\"double\"}\n")
+ .append("]")
+ .toString()
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ if (persistSchema) {
+ builder.persistSchemaFile(true)
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ } else {
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ }
+
+ var i = 0
+ while (i < 100) {
+ writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+ i += 1
+ }
+ writer.close()
+ } catch {
+ case ex: Exception => None
+ case _ => None
+ }
+ }
+
+ def cleanTestData() = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ }
+
+ def deleteIndexFile(path: String, extension: String) : Unit = {
+ val file: CarbonFile = FileFactory
+ .getCarbonFile(path, FileFactory.getFileType(path))
+
+ for (eachDir <- file.listFiles) {
+ if (!eachDir.isDirectory) {
+ if (eachDir.getName.endsWith(extension)) {
+ CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+ }
+ } else {
+ deleteIndexFile(eachDir.getPath, extension)
+ }
+ }
+ }
+
+ def main(args: Array[String]): Unit = {
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+ val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+ // clean data folder
+ if (true) {
+ val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+ clean(storeLocation)
+ clean(warehouse)
+ clean(metastoredb)
+ }
+
+ val spark = SparkSession
+ .builder()
+ .master("local")
+ .appName("TestSparkCarbonFileFormatWithSparkSession")
+ .enableHiveSupport()
+ .config("spark.sql.warehouse.dir", warehouse)
+ .config("javax.jdo.option.ConnectionURL",
+ s"jdbc:derby:;databaseName=$metastoredb;create=true")
+ .getOrCreate()
+
+ CarbonProperties.getInstance()
+ .addProperty("carbon.storelocation", storeLocation)
+
+ spark.sparkContext.setLogLevel("WARN")
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss")
+ .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
+ buildTestData(false)
+ assert(new File(filePath).exists())
+ //data source file format
+ if (spark.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ spark.sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (spark.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ spark.sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+
+ spark.sql("Describe formatted sdkOutputTable").show(false)
+
+ spark.sql("select * from sdkOutputTable").show(false)
+
+ spark.sql("select * from sdkOutputTable limit 3").show(false)
+
+ spark.sql("select name from sdkOutputTable").show(false)
+
+ spark.sql("select age from sdkOutputTable").show(false)
+
+ spark.sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false)
+
+ spark.sql("select * from sdkOutputTable where name = 'robot3'").show(200,false)
+
+ spark.sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false)
+
+ spark.sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false)
+
+ spark.sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false)
+
+ spark.sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false)
+
+ spark.sql("select count(*) from sdkOutputTable").show(200,false)
+
+ spark.sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+
+ spark.stop()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 49a8023..6afd2c0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.block.Distributable
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.filter.FilterUtil
import org.apache.carbondata.core.scan.model.QueryModel
@@ -50,7 +50,7 @@ import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstant
import org.apache.carbondata.core.statusmanager.FileFormat
import org.apache.carbondata.core.util._
import org.apache.carbondata.hadoop._
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.InitInputMetrics
@@ -90,13 +90,21 @@ class CarbonScanRDD(
val jobConf = new JobConf(conf)
SparkHadoopUtil.get.addCredentials(jobConf)
val job = Job.getInstance(jobConf)
- val format = prepareInputFormatForDriver(job.getConfiguration)
-
+ val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal")
+ val format = if (fileLevelExternal != null && fileLevelExternal.equalsIgnoreCase("true")) {
+ prepareFileInputFormatForDriver(job.getConfiguration)
+ } else {
+ prepareInputFormatForDriver(job.getConfiguration)
+ }
// initialise query_id for job
job.getConfiguration.set("query.id", queryId)
// get splits
val splits = format.getSplits(job)
+ if ((splits == null) && format.isInstanceOf[CarbonFileInputFormat[Object]]) {
+ throw new SparkException(
+ "CarbonData file not exist in the segment_null (SDK writer Output) path")
+ }
// separate split
// 1. for batch splits, invoke distributeSplits method to create partitions
@@ -113,7 +121,7 @@ class CarbonScanRDD(
}
val batchPartitions = distributeColumnarSplits(columnarSplits)
// check and remove InExpression from filterExpression
- checkAndRemoveInExpressinFromFilterExpression(format, batchPartitions)
+ checkAndRemoveInExpressinFromFilterExpression(batchPartitions)
if (streamSplits.isEmpty) {
batchPartitions.toArray
} else {
@@ -354,7 +362,9 @@ class CarbonScanRDD(
case _ =>
// create record reader for CarbonData file format
if (vectorReader) {
- val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats)
+ val carbonRecordReader = createVectorizedCarbonRecordReader(model,
+ inputMetricsStats,
+ "true")
if (carbonRecordReader == null) {
new CarbonRecordReader(model,
format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats)
@@ -431,6 +441,16 @@ class CarbonScanRDD(
createInputFormat(conf)
}
+ def prepareFileInputFormatForDriver(conf: Configuration): CarbonFileInputFormat[Object] = {
+ CarbonFileInputFormat.setTableInfo(conf, tableInfo)
+ CarbonFileInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
+ CarbonFileInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
+ if (partitionNames != null) {
+ CarbonFileInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
+ }
+ createFileInputFormat(conf)
+ }
+
private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = {
CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
val tableInfo1 = getTableInfo
@@ -441,6 +461,32 @@ class CarbonScanRDD(
createInputFormat(conf)
}
+ private def createFileInputFormat(conf: Configuration): CarbonFileInputFormat[Object] = {
+ val format = new CarbonFileInputFormat[Object]
+ CarbonFileInputFormat.setTablePath(conf,
+ identifier.appendWithLocalPrefix(identifier.getTablePath))
+ CarbonFileInputFormat.setQuerySegment(conf, identifier)
+ CarbonFileInputFormat.setFilterPredicates(conf, filterExpression)
+ CarbonFileInputFormat.setColumnProjection(conf, columnProjection)
+ CarbonFileInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+ if (CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+ CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
+ CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+ }
+
+ // when validate segments is disabled in thread local update it to CarbonTableInputFormat
+ val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+ if (carbonSessionInfo != null) {
+ CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+ .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+ identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
+ }
+ format
+ }
+
+
private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
val format = new CarbonTableInputFormat[Object]
CarbonTableInputFormat.setTablePath(conf,
@@ -485,7 +531,6 @@ class CarbonScanRDD(
* @param identifiedPartitions
*/
private def checkAndRemoveInExpressinFromFilterExpression(
- format: CarbonTableInputFormat[Object],
identifiedPartitions: mutable.Buffer[Partition]) = {
if (null != filterExpression) {
if (identifiedPartitions.nonEmpty &&
@@ -533,12 +578,13 @@ class CarbonScanRDD(
}
def createVectorizedCarbonRecordReader(queryModel: QueryModel,
- inputMetricsStats: InputMetricsStats): RecordReader[Void, Object] = {
+ inputMetricsStats: InputMetricsStats, enableBatch: String): RecordReader[Void, Object] = {
val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
try {
val cons = Class.forName(name).getDeclaredConstructors
cons.head.setAccessible(true)
- cons.head.newInstance(queryModel, inputMetricsStats).asInstanceOf[RecordReader[Void, Object]]
+ cons.head.newInstance(queryModel, inputMetricsStats, enableBatch)
+ .asInstanceOf[RecordReader[Void, Object]]
} catch {
case e: Exception =>
LOGGER.error(e)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 73da878..903bf44 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -91,10 +91,21 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
private InputMetricsStats inputMetricsStats;
- public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats) {
+ public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats,
+ String enableBatch) {
this.queryModel = queryModel;
this.inputMetricsStats = inputMetricsStats;
- enableReturningBatches();
+ if (enableBatch.equals("true")) {
+ enableReturningBatches();
+ }
+ }
+
+
+ /*
+ * Can be called before any rows are returned to enable returning columnar batches directly.
+ */
+ public void enableReturningBatches() {
+ returnColumnarBatch = true;
}
/**
@@ -273,12 +284,7 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
if (columnarBatch == null) initBatch();
}
- /*
- * Can be called before any rows are returned to enable returning columnar batches directly.
- */
- private void enableReturningBatches() {
- returnColumnarBatch = true;
- }
+
/**
* Advances to the next batch of rows. Returns false if there are no more.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index eb00ebf..5fd9639 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
-import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.types._
@@ -1015,7 +1015,7 @@ case class CarbonLoadDataCommand(
partitionSchema = partitionSchema,
dataSchema = dataSchema,
bucketSpec = catalogTable.bucketSpec,
- fileFormat = new CarbonFileFormat,
+ fileFormat = new SparkCarbonTableFormat,
options = options.toMap)(sparkSession = sparkSession)
CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
[4/6] carbondata git commit: [CARBONDATA-2244]fix creating
pre-aggregate table bug when there are invisibility
INSERT_IN_PROGRESS/INSERT_OVERWRITE_IN_PROGRESS segments on main table
Posted by ja...@apache.org.
[CARBONDATA-2244]fix creating pre-aggregate table bug when there are invisibility INSERT_IN_PROGRESS/INSERT_OVERWRITE_IN_PROGRESS segments on main table
When there are some invisibility INSERT_IN_PROGRESS/INSERT_OVERWRITE_IN_PROGRESS segments on main table, it can not create preaggregate table on it.
This closes #2050
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a386f1f4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a386f1f4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a386f1f4
Branch: refs/heads/carbonfile
Commit: a386f1f4e220210e031f10a73c26a7f56f57a603
Parents: 31011fc
Author: Zhang Zhichao <44...@qq.com>
Authored: Fri Mar 9 23:36:08 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 16 14:43:06 2018 +0800
----------------------------------------------------------------------
.../command/preaaggregate/PreAggregateTableHelper.scala | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a386f1f4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index 1f1e1e6..b64c91e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -165,12 +165,12 @@ case class PreAggregateTableHelper(
// need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
// table.
SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
- val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
- if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
- load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
+ if (SegmentStatusManager.isLoadInProgressInTable(parentTable)) {
throw new UnsupportedOperationException(
"Cannot create pre-aggregate table when insert is in progress on main table")
- } else if (loadAvailable.nonEmpty) {
+ }
+ val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
+ if (loadAvailable.nonEmpty) {
// Passing segmentToLoad as * because we want to load all the segments into the
// pre-aggregate table even if the user has set some segments on the parent table.
loadCommand.dataFrame = Some(PreAggregateUtil
[2/6] carbondata git commit: [CARBONDATA-2248]Fixed Memory leak in
parser/CarbonSparkSqlParser.scala
Posted by ja...@apache.org.
[CARBONDATA-2248]Fixed Memory leak in parser/CarbonSparkSqlParser.scala
In some scenarios where more sessions are created, there are many parser failure objects are accumulated in memory inside thread locals and causing memory leak in long run.
Solution: Remove the parser object from thread local after parsing of the query
This closes #2057
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5b48e70a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5b48e70a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5b48e70a
Branch: refs/heads/carbonfile
Commit: 5b48e70a8ab7bd662d85790aca8e3a8975e82bfe
Parents: d4f9003
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Mar 12 21:00:22 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Mar 15 17:40:47 2018 +0800
----------------------------------------------------------------------
.../org/apache/spark/sql/parser/CarbonSparkSqlParser.scala | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b48e70a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index ef4836e..e0fff08 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -36,7 +36,7 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.hadoop.util.SchemaReader
import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
/**
* Concrete parser for Spark SQL stateENABLE_INMEMORY_MERGE_SORT_DEFAULTments and carbon specific
@@ -52,9 +52,12 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
override def parsePlan(sqlText: String): LogicalPlan = {
CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
try {
- super.parsePlan(sqlText)
+ val parsedPlan = super.parsePlan(sqlText)
+ CarbonScalaUtil.cleanParserThreadLocals
+ parsedPlan
} catch {
case ce: MalformedCarbonCommandException =>
+ CarbonScalaUtil.cleanParserThreadLocals
throw ce
case ex =>
try {
[3/6] carbondata git commit: [CARBONDATA-2250][DataLoad] Reduce
massive object generation in global sort
Posted by ja...@apache.org.
[CARBONDATA-2250][DataLoad] Reduce massive object generation in global sort
Generate compatator outside the function, otherwise it will be generated for every row
This closes #2059
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/31011fc2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/31011fc2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/31011fc2
Branch: refs/heads/carbonfile
Commit: 31011fc29f85d949bdabaa3551b7513eb1183ee5
Parents: 5b48e70
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Wed Mar 14 14:45:53 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Mar 15 17:43:39 2018 +0800
----------------------------------------------------------------------
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 13 ++++++-------
.../processing/sort/sortdata/NewRowComparator.java | 4 +++-
.../sort/sortdata/NewRowComparatorForNormalDims.java | 5 ++++-
3 files changed, 13 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/31011fc2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 1062cd7..dc238fb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -80,15 +80,14 @@ object DataLoadProcessBuilderOnSpark {
// 3. Sort
val configuration = DataLoadProcessBuilder.createConfiguration(model)
val sortParameters = SortParameters.createSortParameters(configuration)
+ val rowComparator: Comparator[Array[AnyRef]] =
+ if (sortParameters.getNoDictionaryCount > 0) {
+ new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn)
+ } else {
+ new NewRowComparatorForNormalDims(sortParameters.getDimColCount)
+ }
object RowOrdering extends Ordering[Array[AnyRef]] {
def compare(rowA: Array[AnyRef], rowB: Array[AnyRef]): Int = {
- val rowComparator: Comparator[Array[AnyRef]] =
- if (sortParameters.getNoDictionaryCount > 0) {
- new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn)
- } else {
- new NewRowComparatorForNormalDims(sortParameters.getDimColCount)
- }
-
rowComparator.compare(rowA, rowB)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/31011fc2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
index 3f94533..f47ecc7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
@@ -17,11 +17,13 @@
package org.apache.carbondata.processing.sort.sortdata;
+import java.io.Serializable;
import java.util.Comparator;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-public class NewRowComparator implements Comparator<Object[]> {
+public class NewRowComparator implements Comparator<Object[]>, Serializable {
+ private static final long serialVersionUID = -1739874611112709436L;
/**
* mapping of dictionary dimensions and no dictionary of sort_column.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/31011fc2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
index 7538c92..aea83ba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
@@ -16,13 +16,16 @@
*/
package org.apache.carbondata.processing.sort.sortdata;
+import java.io.Serializable;
import java.util.Comparator;
/**
* This class is used as comparator for comparing dims which are non high cardinality dims.
* Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
*/
-public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
+public class NewRowComparatorForNormalDims implements Comparator<Object[]>, Serializable {
+ private static final long serialVersionUID = -1749874611112709432L;
+
/**
* dimension count
*/