You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kenny Gorman <ke...@eventador.io> on 2017/10/17 23:28:09 UTC

Stumped writing to KafkaJSONSink

I am hoping you guys can help me. I am stumped how to actually write to Kafka using Kafka09JsonTableSink using the Table API. Here is my code below, I am hoping you guys can shed some light on how this should be done. I don’t see any methods for the actual write to Kafka. I am probably doing something stupid. TIA.

Thanks!
Kenny

// run some SQL to filter results where a key is not null
String sql = "SELECT icao FROM flights WHERE icao is not null";
tableEnv.registerTableSource("flights", kafkaTableSource);
Table result = tableEnv.sql(sql);

// create a partition for the data going into kafka
FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
        params.getRequired("write-topic"),
        params.getProperties(),
        partition);

result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do this, but no such method..

Re: Stumped writing to KafkaJSONSink

Posted by Fabian Hueske <fh...@gmail.com>.
No worries :-) Thanks for the notice.

2017-10-18 15:07 GMT+02:00 Kenny Gorman <ke...@eventador.io>:

> Yep we hung out and got it working. I should have replied sooner! Thx for
> the reply.
>
> -kg
>
> On Oct 18, 2017, at 7:06 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Kenny,
>
> this look almost correct.
> The Table class has a method writeToSink(TableSink) that should address
> your use case (so the same as yours but without the TableEnvironment
> argument).
>
> Does that work for you?
> If not what kind of error and error message do you get?
>
> Best, Fabian
>
> 2017-10-18 1:28 GMT+02:00 Kenny Gorman <ke...@eventador.io>:
>
>> I am hoping you guys can help me. I am stumped how to actually write to
>> Kafka using Kafka09JsonTableSink using the Table API. Here is my code
>> below, I am hoping you guys can shed some light on how this should be done.
>> I don’t see any methods for the actual write to Kafka. I am probably doing
>> something stupid. TIA.
>>
>> Thanks!
>> Kenny
>>
>> // run some SQL to filter results where a key is not null
>> String sql = "SELECT icao FROM flights WHERE icao is not null";
>> tableEnv.registerTableSource("flights", kafkaTableSource);
>> Table result = tableEnv.sql(sql);
>>
>> // create a partition for the data going into kafka
>> FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>>
>> // create new tablesink of JSON to kafka
>> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>>         params.getRequired("write-topic"),
>>         params.getProperties(),
>>         partition);
>>
>> result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do
>> this, but no such method..
>
>
>

Re: Stumped writing to KafkaJSONSink

Posted by Kenny Gorman <ke...@eventador.io>.
Yep we hung out and got it working. I should have replied sooner! Thx for the reply.

-kg

> On Oct 18, 2017, at 7:06 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Kenny,
> 
> this look almost correct. 
> The Table class has a method writeToSink(TableSink) that should address your use case (so the same as yours but without the TableEnvironment argument).
> 
> Does that work for you?
> If not what kind of error and error message do you get?
> 
> Best, Fabian
> 
> 2017-10-18 1:28 GMT+02:00 Kenny Gorman <ke...@eventador.io>:
>> I am hoping you guys can help me. I am stumped how to actually write to Kafka using Kafka09JsonTableSink using the Table API. Here is my code below, I am hoping you guys can shed some light on how this should be done. I don’t see any methods for the actual write to Kafka. I am probably doing something stupid. TIA.
>> 
>> Thanks!
>> Kenny
>> 
>> // run some SQL to filter results where a key is not null
>> String sql = "SELECT icao FROM flights WHERE icao is not null";
>> tableEnv.registerTableSource("flights", kafkaTableSource);
>> Table result = tableEnv.sql(sql);
>> 
>> // create a partition for the data going into kafka
>> FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>> 
>> // create new tablesink of JSON to kafka
>> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>>         params.getRequired("write-topic"),
>>         params.getProperties(),
>>         partition);
>> 
>> result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do this, but no such method..
> 

Re: Stumped writing to KafkaJSONSink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Kenny,

this look almost correct.
The Table class has a method writeToSink(TableSink) that should address
your use case (so the same as yours but without the TableEnvironment
argument).

Does that work for you?
If not what kind of error and error message do you get?

Best, Fabian

2017-10-18 1:28 GMT+02:00 Kenny Gorman <ke...@eventador.io>:

> I am hoping you guys can help me. I am stumped how to actually write to
> Kafka using Kafka09JsonTableSink using the Table API. Here is my code
> below, I am hoping you guys can shed some light on how this should be done.
> I don’t see any methods for the actual write to Kafka. I am probably doing
> something stupid. TIA.
>
> Thanks!
> Kenny
>
> // run some SQL to filter results where a key is not null
> String sql = "SELECT icao FROM flights WHERE icao is not null";
> tableEnv.registerTableSource("flights", kafkaTableSource);
> Table result = tableEnv.sql(sql);
>
> // create a partition for the data going into kafka
> FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>
> // create new tablesink of JSON to kafka
> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>         params.getRequired("write-topic"),
>         params.getProperties(),
>         partition);
>
> result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do
> this, but no such method..