You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Jaroslaw Nowosad <ya...@gmail.com> on 2022/06/24 13:33:39 UTC
[Datafusion] Streaming - integration with kafka - kafka_writer
Hi,
I am just trying to integrate datafusion with kafka, final goal is to have
end-to-end streaming. But I started from a "different side" -> step 1 is to
publish output to kafka, so I copied code/ created kafka publisher:
https://github.com/yarenty/arrow-datafusion/tree/master/datafusion/core/src/physical_plan/kafka
Test case is here:
https://github.com/yarenty/arrow-datafusion/blob/master/datafusion/core/tests/ordered_sql_to_kafka.rs
All finished with something like this:
```rust
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv("example", "tests/example.csv",
CsvReadOptions::new()).await?;
let df = ctx
.sql("SELECT a, MIN(b) as bmin FROM example GROUP BY a ORDER BY a
LIMIT 100")
.await?;
// kafka context
let stream_ctx = KafkaContext::with_config(
KafkaConfig::new("test_topic")
.set("bootstrap.servers", "127.0.0.1:9092")
.set("compression.codec", "snappy"),
);
df.publish_to_kafka( stream_ctx).await?;
Ok(())
}
```
Still not sure if this is the correct way to do it and if I put code in the
proper places ... still: learning something new every day.
Is there any other place where you can share code / check ideas?
Jaro
yarenty@gmail.com