You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/03 12:43:56 UTC

[09/25] carbondata git commit: [CARBONDATA-2215][Documentation] Describe CarbonStreamParser in streaming-guide.md

[CARBONDATA-2215][Documentation] Describe CarbonStreamParser in streaming-guide.md

This closes #2016


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/87361a80
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/87361a80
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/87361a80

Branch: refs/heads/branch-1.3
Commit: 87361a8069503a6a3fa6b31e54ed9849259c81c9
Parents: 28c3701
Author: Zhang Zhichao <44...@qq.com>
Authored: Wed Feb 28 23:07:38 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Mar 3 17:47:42 2018 +0530

----------------------------------------------------------------------
 docs/streaming-guide.md | 74 ++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 74 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/87361a80/docs/streaming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index 201f8e0..aa9eaef 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -152,6 +152,80 @@ property name | default | description
 --- | --- | ---
 carbon.streaming.auto.handoff.enabled | true | whether to auto trigger handoff operation
 
+## Stream data parser
+Config the property "carbon.stream.parser" to define a stream parser to convert InternalRow to Object[] when write stream data.
+
+property name | default | description
+--- | --- | ---
+carbon.stream.parser | org.apache.carbondata.streaming.parser.CSVStreamParserImp | the class of the stream parser
+
+Currently CarbonData support two parsers, as following:
+
+**1. org.apache.carbondata.streaming.parser.CSVStreamParserImp**: This is the default stream parser, it gets a line data(String type) from the first index of InternalRow and converts this String to Object[].
+
+**2. org.apache.carbondata.streaming.parser.RowStreamParserImp**: This stream parser will auto convert InternalRow to Object[] according to schema of this `DataSet`, for example:
+
+```scala
+ case class FileElement(school: Array[String], age: Int)
+ case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
+ ...
+
+ var qry: StreamingQuery = null
+ val readSocketDF = spark.readStream
+   .format("socket")
+   .option("host", "localhost")
+   .option("port", 9099)
+   .load()
+   .as[String]
+   .map(_.split(","))
+   .map { fields => {
+     val tmp = fields(4).split("\\$")
+     val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
+     StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file)
+   } }
+
+ // Write data from socket stream to carbondata file
+ qry = readSocketDF.writeStream
+   .format("carbondata")
+   .trigger(ProcessingTime("5 seconds"))
+   .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+   .option("dbName", "default")
+   .option("tableName", "carbon_table")
+   .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+     CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
+   .start()
+
+ ...
+```
+
+### How to implement a customized stream parser
+If user needs to implement a customized stream parser to convert a specific InternalRow to Object[], it needs to implement `initialize` method and `parserRow` method of interface `CarbonStreamParser`, for example:
+
+```scala
+ package org.XXX.XXX.streaming.parser
+ 
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.types.StructType
+ 
+ class XXXStreamParserImp extends CarbonStreamParser {
+ 
+   override def initialize(configuration: Configuration, structType: StructType): Unit = {
+     // user can get the properties from "configuration"
+   }
+   
+   override def parserRow(value: InternalRow): Array[Object] = {
+     // convert InternalRow to Object[](Array[Object] in Scala) 
+   }
+   
+   override def close(): Unit = {
+   }
+ }
+   
+```
+
+and then set the property "carbon.stream.parser" to "org.XXX.XXX.streaming.parser.XXXStreamParserImp".
+
 ## Close streaming table
 Use below command to handoff all streaming segments to columnar format segments and modify the streaming property to false, this table becomes a normal table.
 ```sql