You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/30 09:27:40 UTC
[3/3] carbondata git commit: [CARBONDATA-1174] Streaming Ingestion -
schema validation and streaming examples
[CARBONDATA-1174] Streaming Ingestion - schema validation and streaming examples
1.schema validation of input data if its from a file source when schema is specified.
2.added streaming examples - for file stream and socket stream sources
This closes #1352
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8c987392
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8c987392
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8c987392
Branch: refs/heads/streaming_ingest
Commit: 8c98739279e5ffe49b70ae6664d19a7f596cf18c
Parents: a9d951e
Author: Aniket Adnaik <an...@gmail.com>
Authored: Thu Jun 15 11:57:43 2017 -0700
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Oct 30 14:56:57 2017 +0530
----------------------------------------------------------------------
.../streaming/CarbonStreamingCommitInfo.java | 6 +-
...CarbonStreamingIngestFileSourceExample.scala | 146 +++++++++++++
...rbonStreamingIngestSocketSourceExample.scala | 160 ++++++++++++++
.../examples/utils/StreamingExampleUtil.scala | 145 +++++++++++++
.../org/apache/spark/sql/CarbonSource.scala | 210 +++++++++++++++++--
.../CarbonStreamingOutpurWriteFactory.scala | 88 --------
.../CarbonStreamingOutputWriteFactory.scala | 88 ++++++++
.../CarbonSourceSchemaValidationTest.scala | 61 ++++++
8 files changed, 801 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
index 6cf303a..2027566 100644
--- a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
@@ -36,7 +36,7 @@ public class CarbonStreamingCommitInfo {
private long batchID;
- private String fileOffset;
+ private long fileOffset;
private long transactionID; // future use
@@ -67,6 +67,8 @@ public class CarbonStreamingCommitInfo {
this.batchID = batchID;
this.transactionID = -1;
+
+ this.fileOffset = 0;
}
public String getDataBase() {
@@ -93,7 +95,7 @@ public class CarbonStreamingCommitInfo {
return batchID;
}
- public String getFileOffset() {
+ public long getFileOffset() {
return fileOffset;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
new file mode 100644
index 0000000..ebe0a5c
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.streaming.ProcessingTime
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.utils.{StreamingExampleUtil}
+
+/**
+ * Covers spark structured streaming scenario where user streams data
+ * from a file source (input source) and write into carbondata table(output sink).
+ * This example uses csv file as a input source and writes
+ * into target carbon table. The target carbon table must exist.
+ */
+
+object CarbonStreamingIngestFileSourceExample {
+
+ def main(args: Array[String]) {
+
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+ val metastoredb = s"$rootPath/examples/spark2/target"
+ val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
+ val streamTableName = s"_carbon_file_stream_table_"
+ val streamTablePath = s"$storeLocation/default/$streamTableName"
+ val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+ // cleanup residual files, if any
+ StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+
+ import org.apache.spark.sql.CarbonSession._
+ val spark = SparkSession
+ .builder()
+ .master("local")
+ .appName("CarbonFileStreamingExample")
+ .config("spark.sql.warehouse.dir", warehouse)
+ .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+ spark.sparkContext.setLogLevel("ERROR")
+
+ // Writes Dataframe to CarbonData file:
+ import spark.implicits._
+ import org.apache.spark.sql.types._
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS $streamTableName")
+ // Create target carbon table
+ spark.sql(
+ s"""
+ | CREATE TABLE $streamTableName(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'""".stripMargin)
+
+ // Generate CSV data and write to CSV file
+ StreamingExampleUtil.generateCSVDataFile(spark, 1, csvDataDir, SaveMode.Overwrite)
+
+ // scalastyle:off
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$csvDataDir'
+ | INTO TABLE $streamTableName
+ | OPTIONS('FILEHEADER'='id,name,city,salary'
+ | )""".stripMargin)
+ // scalastyle:on
+
+ // check initial table data
+ spark.sql(s""" SELECT * FROM $streamTableName """).show()
+
+ // define custom schema
+ val inputSchema = new StructType().
+ add("id", "integer").
+ add("name", "string").
+ add("city", "string").
+ add("salary", "float")
+
+ // setup csv file as a input streaming source
+ val csvReadDF = spark.readStream.
+ format("csv").
+ option("sep", ",").
+ schema(inputSchema).
+ option("path", csvDataDir).
+ option("header", "true").
+ load()
+
+ // Write data from csv format streaming source to carbondata target format
+ // set trigger to every 1 second
+ val qry = csvReadDF.writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime("1 seconds"))
+ .option("checkpointLocation", ckptLocation)
+ .option("path", streamTablePath)
+ .start()
+
+ // In a separate thread append data every 2 seconds to existing csv
+ val gendataThread: Thread = new Thread() {
+ override def run(): Unit = {
+ for (i <- 1 to 5) {
+ Thread.sleep(2)
+ StreamingExampleUtil.
+ generateCSVDataFile(spark, i * 10 + 1, csvDataDir, SaveMode.Append)
+ }
+ }
+ }
+ gendataThread.start()
+ gendataThread.join()
+
+ // stop streaming execution after 5 sec delay
+ Thread.sleep(5000)
+ qry.stop()
+
+ // verify streaming data is added into the table
+ spark.sql(s""" SELECT * FROM $streamTableName """).show()
+
+ // Cleanup residual files and table data
+ StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+ spark.sql(s"DROP TABLE IF EXISTS $streamTableName")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
new file mode 100644
index 0000000..bbf7ef2
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.streaming.ProcessingTime
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.utils.StreamingExampleUtil
+
+
+/**
+ * This example reads stream data from socket source (input) and write into
+ * existing carbon table(output).
+ *
+ * It uses localhost and port (9999) to create a socket and write to it.
+ * Exmaples uses two threads one to write data to socket and other thread
+ * to receive data from socket and write into carbon table.
+ */
+
+// scalastyle:off println
+object CarbonStreamingIngestSocketSourceExample {
+
+ def main(args: Array[String]) {
+
+ // setup localhost and port number
+ val host = "localhost"
+ val port = 9999
+ // setup paths
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+ val metastoredb = s"$rootPath/examples/spark2/target"
+ val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
+ val streamTableName = s"_carbon_socket_stream_table_"
+ val streamTablePath = s"$storeLocation/default/$streamTableName"
+ val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+ // cleanup residual files, if any
+ StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+
+ import org.apache.spark.sql.CarbonSession._
+ val spark = SparkSession
+ .builder()
+ .master("local[2]")
+ .appName("CarbonNetworkStreamingExample")
+ .config("spark.sql.warehouse.dir", warehouse)
+ .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+ spark.sparkContext.setLogLevel("ERROR")
+
+ // Writes Dataframe to CarbonData file:
+ import spark.implicits._
+
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
+
+ // Create target carbon table and populate with initial data
+ spark.sql(
+ s"""
+ | CREATE TABLE ${streamTableName}(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'""".stripMargin)
+
+ // Generate CSV data and write to CSV file
+ StreamingExampleUtil.generateCSVDataFile(spark, 1, csvDataDir, SaveMode.Overwrite)
+
+ // load the table
+ // scalastyle:off
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$csvDataDir'
+ | INTO TABLE ${streamTableName}
+ | OPTIONS('FILEHEADER'='id,name,city,salary'
+ | )""".stripMargin)
+
+
+ spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
+
+ // Create server socket in main thread
+ val serverSocket = StreamingExampleUtil.createserverSocket(host, port)
+
+ // Start client thread to receive streaming data and write into carbon
+ val streamWriterThread: Thread = new Thread() {
+ override def run(): Unit= {
+
+ try {
+ // Setup read stream to read input data from socket
+ val readSocketDF = spark.readStream
+ .format("socket")
+ .option("host", host)
+ .option("port", port)
+ .load()
+
+ // Write data from socket stream to carbondata file
+ val qry = readSocketDF.writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime("2 seconds"))
+ .option("checkpointLocation", ckptLocation)
+ .option("path", streamTablePath)
+ .start()
+
+ qry.awaitTermination()
+ } catch {
+ case e: InterruptedException => println("Done reading and writing streaming data")
+ }
+ }
+ }
+ streamWriterThread.start()
+
+ // wait for client to connection request and accept
+ val clientSocket = StreamingExampleUtil.waitToForClientConnection(serverSocket.get)
+
+ // Write to client's connected socket every 2 seconds, for 5 times
+ StreamingExampleUtil.writeToSocket(clientSocket, 5, 2, 11)
+
+ Thread.sleep(2000)
+ // interrupt client thread to stop streaming query
+ streamWriterThread.interrupt()
+ //wait for client thread to finish
+ streamWriterThread.join()
+
+ //Close the server socket
+ serverSocket.get.close()
+
+ // verify streaming data is added into the table
+ // spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
+
+ // Cleanup residual files and table data
+ StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+ spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
new file mode 100644
index 0000000..6eab491
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples.utils
+
+import java.io.{IOException, PrintWriter}
+import java.net.{ServerSocket, Socket}
+
+import scala.tools.nsc.io.Path
+
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+
+
+/**
+ * Utility functions for streaming ingest examples
+ */
+
+// scalastyle:off println
+object StreamingExampleUtil {
+
+ // Clean up directories recursively, accepts variable arguments
+ def cleanUpDir(dirPaths: String*): Unit = {
+
+ // if (args.length < 1) {
+ if (dirPaths.size < 1) {
+ System.err.println("Usage: StreamingCleanupUtil <dirPath> [dirpath]...")
+ System.exit(1)
+ }
+
+ var i = 0
+ while (i < dirPaths.size) {
+ try {
+ val path: Path = Path(dirPaths(i))
+ path.deleteRecursively()
+ } catch {
+ case ioe: IOException => println("IO Exception while deleting files recursively" + ioe)
+ }
+ i = i + 1
+ }
+ }
+
+ // Generates csv data and write to csv files at given path
+ def generateCSVDataFile(spark: SparkSession,
+ idStart: Int,
+ csvDirPath: String,
+ saveMode: SaveMode): Unit = {
+ // Create csv data frame file
+ val csvRDD = spark.sparkContext.parallelize(1 to 10)
+ .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id))
+ val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary")
+
+
+ csvDataDF.write
+ .option("header", "false")
+ .mode(saveMode)
+ .csv(csvDirPath)
+ }
+
+ // Generates csv data frame and returns to caller
+ def generateCSVDataDF(spark: SparkSession,
+ idStart: Int): DataFrame = {
+ // Create csv data frame file
+ val csvRDD = spark.sparkContext.parallelize(1 to 10)
+ .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id))
+ val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary")
+ csvDataDF
+ }
+
+ // Create server socket for socket streaming source
+ def createserverSocket(host: String, port: Int): Option[ServerSocket] = {
+ try {
+ Some(new ServerSocket(port))
+ } catch {
+ case e: java.net.ConnectException =>
+ println("Error Connecting to" + host + ":" + port, e)
+ None
+ }
+ }
+
+ // Create server socket for socket streaming source
+ def waitToForClientConnection(serverSocket: ServerSocket): Socket = {
+ serverSocket.accept()
+ }
+
+ // Create server socket for socket streaming source
+ def closeServerSocket(serverSocket: ServerSocket): Unit = {
+ serverSocket.close()
+ }
+
+ // write periodically on given socket
+ def writeToSocket(clientSocket: Socket,
+ iterations: Int,
+ delay: Int,
+ startID: Int): Unit = {
+
+ var nItr = 10
+ var nDelay = 5
+
+ // iterations range check
+ if (iterations >= 1 || iterations <= 50) {
+ nItr = iterations
+ } else {
+ println("Number of iterations exceeds limit. Setting to default 10 iterations")
+ }
+
+ // delay range check (1 second to 60 seconds)
+ if (delay >= 1 || delay <= 60) {
+ nDelay = delay
+ } else {
+ println("Delay exceeds the limit. Setting it to default 2 seconds")
+ }
+
+ val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+
+ var j = startID
+
+ for (i <- startID to startID + nItr) {
+ // write 5 records per iteration
+ for (id <- j to j + 5 ) {
+ socketWriter.println(id.toString + ", name_" + i
+ + ", city_" + i + ", " + (i*10000.00).toString)
+ }
+ j = j + 5
+ socketWriter.flush()
+ Thread.sleep(nDelay*1000)
+ }
+ socketWriter.close()
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index d496de2..6eacb19 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -17,14 +17,20 @@
package org.apache.spark.sql
+import java.io.{BufferedWriter, FileWriter, IOException}
+import java.util.UUID
+
import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.language.implicitConversions
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job
+import org.apache.parquet.schema.InvalidSchemaException
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory}
import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
@@ -33,10 +39,12 @@ import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types._
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -44,7 +52,6 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
/**
* Carbon relation provider compliant to data source api.
* Creates carbon relations
@@ -52,7 +59,11 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class CarbonSource extends CreatableRelationProvider with RelationProvider
with SchemaRelationProvider with DataSourceRegister with FileFormat {
- override def shortName(): String = "carbondata"
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ override def shortName(): String = {
+ "carbondata"
+ }
// will be called if hive supported create table command is provided
override def createRelation(sqlContext: SQLContext,
@@ -181,8 +192,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
/**
* Returns the path of the table
- *
- * @param sparkSession
+ * @param sparkSession
* @param dbName
* @param tableName
* @return
@@ -217,22 +227,196 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
* be put here. For example, user defined output committer can be configured here
* by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
*/
- def prepareWrite(
+ override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
+ dataSchema: StructType): OutputWriterFactory = {
+
+ // Check if table with given path exists
+ // validateTable(options.get("path").get)
+ validateTable(options("path"))
+
+ /* Check id streaming data schema matches with carbon table schema
+ * Data from socket source does not have schema attached to it,
+ * Following check is to ignore schema validation for socket source.
+ */
+ if (!(dataSchema.size.equals(1) &&
+ dataSchema.fields(0).dataType.equals(StringType))) {
+ val path = options.get("path")
+ val tablePath: String = path match {
+ case Some(value) => value
+ case None => ""
+ }
+
+ val carbonTableSchema: org.apache.carbondata.format.TableSchema =
+ getTableSchema(sparkSession: SparkSession, tablePath: String)
+ val isSchemaValid = validateSchema(carbonTableSchema, dataSchema)
+
+ if(!isSchemaValid) {
+ LOGGER.error("Schema Validation Failed: streaming data schema"
+ + "does not match with carbon table schema")
+ throw new InvalidSchemaException("Schema Validation Failed : " +
+ "streaming data schema does not match with carbon table schema")
+ }
+ }
+ new CarbonStreamingOutputWriterFactory()
+ }
/**
- * When possible, this method should return the schema of the given `files`. When the format
- * does not support inference, or no valid files are given should return None. In these cases
- * Spark will require that user specify the schema manually.
+ * Read schema from existing carbon table
+ * @param sparkSession
+ * @param tablePath carbon table path
+ * @return TableSchema read from provided table path
*/
- def inferSchema(
+ private def getTableSchema(
+ sparkSession: SparkSession,
+ tablePath: String): org.apache.carbondata.format.TableSchema = {
+
+ val formattedTablePath = tablePath.replace('\\', '/')
+ val names = formattedTablePath.split("/")
+ if (names.length < 3) {
+ throw new IllegalArgumentException("invalid table path: " + tablePath)
+ }
+ val tableName : String = names(names.length - 1)
+ val dbName : String = names(names.length - 2)
+ val storePath = formattedTablePath.substring(0,
+ formattedTablePath.lastIndexOf
+ (dbName.concat(CarbonCommonConstants.FILE_SEPARATOR)
+ .concat(tableName)) - 1)
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val thriftTableInfo: org.apache.carbondata.format.TableInfo =
+ metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession)
+
+ val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table
+ factTable
+ }
+
+ /**
+ * Validates streamed schema against existing table schema
+ * @param carbonTableSchema existing carbon table schema
+ * @param dataSchema streamed data schema
+ * @return true if schema validation is successful else false
+ */
+ private def validateSchema(
+ carbonTableSchema: org.apache.carbondata.format.TableSchema,
+ dataSchema: StructType): Boolean = {
+
+ val columnnSchemaValues = carbonTableSchema.getTable_columns
+ .asScala.sortBy(_.schemaOrdinal)
+
+ var columnDataTypes = new ListBuffer[String]()
+ columnnSchemaValues.foreach(columnDataType =>
+ columnDataTypes.append(columnDataType.data_type.toString))
+ val tableColumnDataTypeList = columnDataTypes.toList
+
+ var streamSchemaDataTypes = new ListBuffer[String]()
+ dataSchema.fields.foreach(item => streamSchemaDataTypes
+ .append(mapStreamingDataTypeToString(item.dataType.toString)))
+ val streamedDataTypeList = streamSchemaDataTypes.toList
+
+ val isValid = tableColumnDataTypeList == streamedDataTypeList
+ isValid
+ }
+
+ /**
+ * Maps streamed datatype to carbon datatype
+ * @param dataType
+ * @return String
+ */
+ def mapStreamingDataTypeToString(dataType: String): String = {
+ import org.apache.carbondata.format.DataType
+ dataType match {
+ case "IntegerType" => DataType.INT.toString
+ case "StringType" => DataType.STRING.toString
+ case "DateType" => DataType.DATE.toString
+ case "DoubleType" => DataType.DOUBLE.toString
+ case "FloatType" => DataType.DOUBLE.toString
+ case "LongType" => DataType.LONG.toString
+ case "ShortType" => DataType.SHORT.toString
+ case "TimestampType" => DataType.TIMESTAMP.toString
+ }
+ }
+
+ /**
+ * Validates if given table exists or throws exception
+ * @param tablePath existing carbon table path
+ * @return None
+ */
+ private def validateTable(tablePath: String): Unit = {
+
+ val formattedTablePath = tablePath.replace('\\', '/')
+ val names = formattedTablePath.split("/")
+ if (names.length < 3) {
+ throw new IllegalArgumentException("invalid table path: " + tablePath)
+ }
+ val tableName : String = names(names.length - 1)
+ val dbName : String = names(names.length - 2)
+ val storePath = formattedTablePath.substring(0,
+ formattedTablePath.lastIndexOf
+ (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
+ .concat(tableName)).toString) - 1)
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
+ new AbsoluteTableIdentifier(storePath,
+ new CarbonTableIdentifier(dbName, tableName,
+ UUID.randomUUID().toString))
+
+ if (!checkIfTableExists(absoluteTableIdentifier)) {
+ throw new NoSuchTableException(dbName, tableName)
+ }
+ }
+
+ /**
+ * Checks if table exists by checking its schema file
+ * @param absoluteTableIdentifier
+ * @return Boolean
+ */
+ private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean = {
+ val carbonTablePath: CarbonTablePath = CarbonStorePath
+ .getCarbonTablePath(absoluteTableIdentifier)
+ val schemaFilePath: String = carbonTablePath.getSchemaFilePath
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)
+ }
+
+ /**
+ * If user wants to stream data from carbondata table source
+ * and if following conditions are true:
+ * 1. No schema provided by the user in readStream()
+ * 2. spark.sql.streaming.schemaInference is set to true
+ * carbondata can infer table schema from a valid table path
+ * The schema inference is not mandatory, but good have.
+ * When possible, this method should return the schema of the given `files`.
+ * If the format does not support schema inference, or no valid files
+ * are given it should return None. In these cases Spark will require that
+ * user specify the schema manually.
+ */
+ override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
- files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType))
+ files: Seq[FileStatus]): Option[StructType] = {
+
+ val path = options.get("path")
+ val tablePath: String = path match {
+ case Some(value) => value
+ case None => ""
+ }
+ // Check if table with given path exists
+ validateTable(tablePath)
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val carbonTableSchema: org.apache.carbondata.format.TableSchema =
+ getTableSchema(sparkSession: SparkSession, tablePath: String)
+ val columnnSchemaValues = carbonTableSchema.getTable_columns
+ .asScala.sortBy(_.schemaOrdinal)
+
+ var structFields = new ArrayBuffer[StructField]()
+ columnnSchemaValues.foreach(columnSchema =>
+ structFields.append(StructField(columnSchema.column_name,
+ CatalystSqlParser.parseDataType(columnSchema.data_type.toString), true)))
+ Some(new StructType(structFields.toArray))
+ }
}
object CarbonSource {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
deleted file mode 100644
index be69885..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.streaming
-
-
-import java.util.concurrent.ConcurrentHashMap
-
-import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.spark.sql.execution.datasources.OutputWriterFactory
-import org.apache.spark.sql.types.StructType
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
-
-class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
-
- /**
- * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
- * this method gets called by each task on executor side
- * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
- *
- * @param path Path to write the file.
- * @param dataSchema Schema of the rows to be written. Partition columns are not
- * included in the schema if the relation being written is
- * partitioned.
- * @param context The Hadoop MapReduce task context.
- */
-
- override def newInstance(
- path: String,
-
- dataSchema: StructType,
-
- context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
-
- new CarbonStreamingOutputWriter(path, context)
- }
-
- override def getFileExtension(context: TaskAttemptContext): String = {
-
- CarbonTablePath.STREAM_FILE_NAME_EXT
- }
-
-}
-
-object CarbonStreamingOutpurWriterFactory {
-
- private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
-
- def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
-
- if (writers.contains(path)) {
- throw new IllegalArgumentException(path + "writer already exists")
- }
-
- writers.put(path, writer)
- }
-
- def getWriter(path: String): CarbonStreamingOutputWriter = {
-
- writers.get(path)
- }
-
- def containsWriter(path: String): Boolean = {
-
- writers.containsKey(path)
- }
-
- def removeWriter(path: String): Unit = {
-
- writers.remove(path)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
new file mode 100644
index 0000000..c5e4226
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
@@ -0,0 +1,88 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.streaming
+
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+
+class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
+
+ /**
+ * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
+ * this method gets called by each task on executor side
+ * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
+ *
+ * @param path Path to write the file.
+ * @param dataSchema Schema of the rows to be written. Partition columns are not
+ * included in the schema if the relation being written is
+ * partitioned.
+ * @param context The Hadoop MapReduce task context.
+ */
+
+ override def newInstance(
+ path: String,
+
+ dataSchema: StructType,
+
+ context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
+
+ new CarbonStreamingOutputWriter(path, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+
+ CarbonTablePath.STREAM_FILE_NAME_EXT
+ }
+
+}
+
+object CarbonStreamingOutputWriterFactory {
+
+ private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
+
+ def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
+
+ if (writers.contains(path)) {
+ throw new IllegalArgumentException(path + "writer already exists")
+ }
+
+ writers.put(path, writer)
+ }
+
+ def getWriter(path: String): CarbonStreamingOutputWriter = {
+
+ writers.get(path)
+ }
+
+ def containsWriter(path: String): Boolean = {
+
+ writers.containsKey(path)
+ }
+
+ def removeWriter(path: String): Unit = {
+
+ writers.remove(path)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
new file mode 100644
index 0000000..f00eea5
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata.streaming
+
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.sql.common.util.Spark2QueryTest
+import org.apache.spark.sql.{CarbonSource, SparkSession}
+import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test for schema validation during streaming ingestion
+ * Validates streamed schema(source) against existing table(target) schema.
+ */
+
+class CarbonSourceSchemaValidationTest extends Spark2QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll() {
+ sql("DROP TABLE IF EXISTS _carbon_stream_table_")
+ }
+
+ test("Testing validate schema method with correct values ") {
+
+ val spark = SparkSession.builder
+ .appName("StreamIngestSchemaValidation")
+ .master("local")
+ .getOrCreate()
+
+ val carbonSource = new CarbonSource
+ val job = new Job()
+ val warehouseLocation = TestQueryExecutor.warehouse
+
+ sql("CREATE TABLE _carbon_stream_table_(id int,name string)STORED BY 'carbondata'")
+ val tablePath: String = s"$warehouseLocation/default/_carbon_stream_table_"
+ val dataSchema = StructType(Array(StructField("id", IntegerType, true), StructField("name", StringType, true)))
+ val res = carbonSource.prepareWrite(spark, job, Map("path" -> tablePath), dataSchema)
+ assert(res.isInstanceOf[CarbonStreamingOutputWriterFactory])
+ }
+
+ override def afterAll() {
+ sql("DROP TABLE IF EXISTS _carbon_stream_table_")
+ }
+
+}