You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by "Takako Shimamoto (JIRA)" <ji...@apache.org> on 2018/05/25 05:36:00 UTC

[jira] [Created] (BAHIR-168) Kinesis support in Structured Streaming

Takako Shimamoto created BAHIR-168:
--------------------------------------

             Summary: Kinesis support in Structured Streaming
                 Key: BAHIR-168
                 URL: https://issues.apache.org/jira/browse/BAHIR-168
             Project: Bahir
          Issue Type: New Feature
            Reporter: Takako Shimamoto


Implement Kinesis based sources and sinks for Structured Streaming
h2. Kinesis Sources

I hope that [this|https://github.com/qubole/kinesis-sql] will be contributed to Apache Bahir, as commented in the SPARK-18165.
h2. Kinesis Sinks

I've implemented a Sink here: [https://github.com/shimamoto/bahir/tree/kinesis-writer/sql-kinesis]
 This requires the Spark 2.3 and datasource v2 API. I plan on opening a PR, but Bahir hasn't supported Spark 2.3 yet. We need to handle BAHIR-167 first.

A brief overview is listed below.
h3. Description

Add a new Kinesis Sink and Kinesis Relation for writing streaming and batch queries, respectively, to AWS Kinesis.

The Dataframe being written to Kinesis should have the following columns in schema:
||Column||Type||
|partitionKey (optional)|string|
|data (required)|string or binary|

If the partition key column is missing, then a SHA-256 digest of data as a hex string will be automatically added.
h4. Streaming Kinesis Sink
{code}
val df = inputStream.toDS().toDF("partitionKey", "data")

val writer = df.writeStream
  .format("kinesis")
  .option("streamName", "test-stream")
  .option("region", "us-east-1")
  .option("checkpointLocation", checkpointDir.getCanonicalPath)
  .start()
{code}
h4. Batch Kinesis Sink
{code}
val df = Seq("partitionKey-1" -> "data1", "partitionKey-2" -> "data2")
  .toDF("partitionKey", "data")

df.write
  .format("kinesis")
  .option("streamName", "test-stream")
  .option("region", "us-east-1")
  .save()
{code}
h3. Configuration

The following options must be set for both batch and streaming queries.
||Option||value||default||meaning||
|streamName|string|-|The stream name associated with the Sink.|
|region|string|-|The region name for Kinesis Stream.|

The following configurations are optional.
||Option||value||default||meaning||
|chunksize|int|50|Rate limit on maximum number of records processed per PutRecords request.|
|endpoint|string|(none)|Only use this if using a non-standard service endpoint.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)