You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/03/01 06:21:34 UTC
carbondata git commit: [CARBONDATA-2215][Documentation] Describe
CarbonStreamParser in streaming-guide.md
Repository: carbondata
Updated Branches:
refs/heads/master 8da252c52 -> 3bcba4c4e
[CARBONDATA-2215][Documentation] Describe CarbonStreamParser in streaming-guide.md
This closes #2016
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3bcba4c4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3bcba4c4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3bcba4c4
Branch: refs/heads/master
Commit: 3bcba4c4e851ec4e7962499c5d79fd94dc83d9a1
Parents: 8da252c
Author: Zhang Zhichao <44...@qq.com>
Authored: Wed Feb 28 23:07:38 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Thu Mar 1 14:15:59 2018 +0800
----------------------------------------------------------------------
docs/streaming-guide.md | 74 ++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 74 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3bcba4c4/docs/streaming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index 201f8e0..aa9eaef 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -152,6 +152,80 @@ property name | default | description
--- | --- | ---
carbon.streaming.auto.handoff.enabled | true | whether to auto trigger handoff operation
+## Stream data parser
+Config the property "carbon.stream.parser" to define a stream parser to convert InternalRow to Object[] when write stream data.
+
+property name | default | description
+--- | --- | ---
+carbon.stream.parser | org.apache.carbondata.streaming.parser.CSVStreamParserImp | the class of the stream parser
+
+Currently CarbonData support two parsers, as following:
+
+**1. org.apache.carbondata.streaming.parser.CSVStreamParserImp**: This is the default stream parser, it gets a line data(String type) from the first index of InternalRow and converts this String to Object[].
+
+**2. org.apache.carbondata.streaming.parser.RowStreamParserImp**: This stream parser will auto convert InternalRow to Object[] according to schema of this `DataSet`, for example:
+
+```scala
+ case class FileElement(school: Array[String], age: Int)
+ case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
+ ...
+
+ var qry: StreamingQuery = null
+ val readSocketDF = spark.readStream
+ .format("socket")
+ .option("host", "localhost")
+ .option("port", 9099)
+ .load()
+ .as[String]
+ .map(_.split(","))
+ .map { fields => {
+ val tmp = fields(4).split("\\$")
+ val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
+ StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file)
+ } }
+
+ // Write data from socket stream to carbondata file
+ qry = readSocketDF.writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime("5 seconds"))
+ .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+ .option("dbName", "default")
+ .option("tableName", "carbon_table")
+ .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+ CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
+ .start()
+
+ ...
+```
+
+### How to implement a customized stream parser
+If user needs to implement a customized stream parser to convert a specific InternalRow to Object[], it needs to implement `initialize` method and `parserRow` method of interface `CarbonStreamParser`, for example:
+
+```scala
+ package org.XXX.XXX.streaming.parser
+
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.types.StructType
+
+ class XXXStreamParserImp extends CarbonStreamParser {
+
+ override def initialize(configuration: Configuration, structType: StructType): Unit = {
+ // user can get the properties from "configuration"
+ }
+
+ override def parserRow(value: InternalRow): Array[Object] = {
+ // convert InternalRow to Object[](Array[Object] in Scala)
+ }
+
+ override def close(): Unit = {
+ }
+ }
+
+```
+
+and then set the property "carbon.stream.parser" to "org.XXX.XXX.streaming.parser.XXXStreamParserImp".
+
## Close streaming table
Use below command to handoff all streaming segments to columnar format segments and modify the streaming property to false, this table becomes a normal table.
```sql