You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by "Luciano Resende (JIRA)" <ji...@apache.org> on 2018/06/16 10:00:00 UTC

[jira] [Commented] (BAHIR-168) Kinesis support in Structured Streaming

    [ https://issues.apache.org/jira/browse/BAHIR-168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514738#comment-16514738 ] 

Luciano Resende commented on BAHIR-168:
---------------------------------------

Sorry for the delay responding, I have been traveling. 

Detailed steps to contribute a new extension is documented at 
[http://bahir.apache.org/contributing-extensions/]

I would recommend creating a PR at the Bahir repository, but as we are in the process of migrating the extensions to Data Source V2, it would be good if we could have it already based on the Data Source V2, otherwise it will not be able to be released when we do the Bahir release supporting Spark 2.3

> Kinesis support in Structured Streaming
> ---------------------------------------
>
>                 Key: BAHIR-168
>                 URL: https://issues.apache.org/jira/browse/BAHIR-168
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Spark Structured Streaming Connectors
>    Affects Versions: Spark-2.3
>            Reporter: Takako Shimamoto
>            Priority: Major
>
> Implement Kinesis based sources and sinks for Structured Streaming
> h2. Kinesis Sources
> I hope that [this|https://github.com/qubole/kinesis-sql] will be contributed to Apache Bahir, as commented in the SPARK-18165.
> h2. Kinesis Sinks
> I've implemented a Sink here: [https://github.com/shimamoto/bahir/tree/kinesis-writer/sql-kinesis]
>  This requires the Spark 2.3 and datasource v2 API. I plan on opening a PR, but Bahir hasn't supported Spark 2.3 yet. We need to handle BAHIR-167 first.
> A brief overview is listed below.
> h3. Description
> Add a new Kinesis Sink and Kinesis Relation for writing streaming and batch queries, respectively, to AWS Kinesis.
> The Dataframe being written to Kinesis should have the following columns in schema:
> ||Column||Type||
> |partitionKey (optional)|string|
> |data (required)|string or binary|
> If the partition key column is missing, then a SHA-256 digest of data as a hex string will be automatically added.
> h4. Streaming Kinesis Sink
> {code}
> val df = inputStream.toDS().toDF("partitionKey", "data")
> val writer = df.writeStream
>   .format("kinesis")
>   .option("streamName", "test-stream")
>   .option("region", "us-east-1")
>   .option("checkpointLocation", checkpointDir.getCanonicalPath)
>   .start()
> {code}
> h4. Batch Kinesis Sink
> {code}
> val df = Seq("partitionKey-1" -> "data1", "partitionKey-2" -> "data2")
>   .toDF("partitionKey", "data")
> df.write
>   .format("kinesis")
>   .option("streamName", "test-stream")
>   .option("region", "us-east-1")
>   .save()
> {code}
> h3. Configuration
> The following options must be set for both batch and streaming queries.
> ||Option||value||default||meaning||
> |streamName|string|-|The stream name associated with the Sink.|
> |region|string|-|The region name for Kinesis Stream.|
> The following configurations are optional.
> ||Option||value||default||meaning||
> |chunksize|int|50|Rate limit on maximum number of records processed per PutRecords request.|
> |endpoint|string|(none)|Only use this if using a non-standard service endpoint.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)