You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Jaroslaw Nowosad <ya...@gmail.com> on 2022/06/24 13:33:39 UTC

[Datafusion] Streaming - integration with kafka - kafka_writer

Hi,
I am just trying to integrate datafusion with kafka,  final goal is to have
end-to-end streaming. But I started from a "different side" -> step 1 is to
publish output to kafka, so I copied code/ created kafka publisher:
https://github.com/yarenty/arrow-datafusion/tree/master/datafusion/core/src/physical_plan/kafka

Test case is here:
https://github.com/yarenty/arrow-datafusion/blob/master/datafusion/core/tests/ordered_sql_to_kafka.rs


All finished with something like this:
```rust

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();
    ctx.register_csv("example", "tests/example.csv",
CsvReadOptions::new()).await?;

    let df = ctx
        .sql("SELECT a, MIN(b) as bmin FROM example GROUP BY a ORDER BY a
LIMIT 100")
        .await?;

    // kafka context
    let stream_ctx = KafkaContext::with_config(
        KafkaConfig::new("test_topic")
            .set("bootstrap.servers", "127.0.0.1:9092")
            .set("compression.codec", "snappy"),
    );

    df.publish_to_kafka( stream_ctx).await?;

    Ok(())
}
```

Still not sure if this is the correct way to do it and if I put code in the
proper places ... still: learning something new every day.

Is there any other place where you can share code / check ideas?

Jaro
yarenty@gmail.com