You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sandeep Vakacharla <sv...@fanatics.com> on 2016/11/02 23:00:57 UTC

Reg. custom sink for Flink streaming

Hi there,

I have the following use case-

I have data coming from Kafka which I need to stream and write each message to a database. I’m using kafka-flink connector for streaming data from Kafka. I don’t want to use flink sinks to write date from stream.

I’m doing the following which doesn’t seem to work-

messageStream
        .rebalance()
        .map(new MapFunction<String, Object>() {
            @Override
            public String map(String value) {
                getDbSession().execute("insert into TABLE_XXX (key, event_timeuuid, data) " +
                        "VALUES ("+ i+",null, value); ");
                return value;
            }
        })

How can I iterate over each message in the stream and do something with that message?

Thanks

Information contained in this e-mail message is confidential. This e-mail message is intended only for the personal use of the recipient(s) named above. If you are not an intended recipient, do not read, distribute or reproduce this transmission (including any attachments). If you have received this email in error, please immediately notify the sender by email reply and delete the original message.

Re: Link read avro from Kafka Connect Issue

Posted by Dayong <wi...@gmail.com>.
Confirmed. It is Magic byte ahead of each avro message. I am able to get it flink consumer work. Thanks you, Dave :)

Thanks,
Dayong

> On Nov 3, 2016, at 8:01 AM, Dayong <wi...@gmail.com> wrote:
> 
> Not quite sure, will try to find out today.
> 
> Thanks,
> Dayong
> 
>> On Nov 2, 2016, at 9:59 PM, "Tauzell, Dave" <Da...@surescripts.com> wrote:
>> 
>> Is Kafka connect adding some bytes to the beginning of the avro with the scheme registry id?
>> 
>> Dave
>> 
>>> On Nov 2, 2016, at 18:43, Will Du <wi...@gmail.com> wrote:
>>> 
>>> By using the kafka-avro-console-consumer I am able to get rich message from kafka connect with AvroConvert, but it got no output except schema from Flink
>>> 
>>> By using the producer with defaultEncoding, the kafka-avro-console-consumer throws exceptions show how. But Flink consumer works. But my target is to get Flink costume avro data produced by Kafka connect
>>> 
>>>> On Nov 2, 2016, at 7:36 PM, Will Du <wi...@gmail.com> wrote:
>>>> 
>>>> 
>>>> On Nov 2, 2016, at 7:31 PM, Will Du <willddy@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi folks,
>>>> I am trying to consume avro data from Kafka in Flink. The data is produced by Kafka connect using AvroConverter. I have created a AvroDeserializationSchema.java <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by Flink consumer. Then, I use following code to read it.
>>>> 
>>>> public static void main(String[] args) throws Exception {
>>>>            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>            Properties properties = new Properties();
>>>>            properties.setProperty("bootstrap.servers", “localhost:9092");
>>>>            properties.setProperty("zookeeper.connect", “localhost:2181”);
>>>> Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
>>>>                                     + "\"type\": \"record\", "
>>>>                                     + "\"fields\": "
>>>>                                     +" [ "
>>>>                                     + "  { \"name\": \"name\", \"type\": \"string\" },"
>>>>                                     + "  { \"name\": \"symbol\", \"type\": \"string\" },"
>>>>                                     + "  { \"name\": \"exchange\", \"type\": \"string\"}"
>>>>                                     + "] "
>>>>                                     +"}");
>>>> 
>>>>            AvroDeserializationSchema avroSchema = new AvroDeserializationSchema<>(schema);
>>>>            FlinkKafkaConsumer09<GenericRecord> kafkaConsumer =
>>>>                new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, properties);
>>>>            DataStream<GenericRecord> messageStream = env.addSource(kafkaConsumer);
>>>>            messageStream.rebalance().print();
>>>>            env.execute("Flink AVRO KAFKA Test");
>>>> }
>>>> 
>>>> Once, I run the code, I am able to get the schema information only as follows.
>>>> {"name":"", "symbol":"", "exchange":""}
>>>> {"name":"", "symbol":"", "exchange":""}
>>>> {"name":"", "symbol":"", "exchange":""}
>>>> {"name":"", "symbol":"", "exchange":”"}
>>>> 
>>>> Could anyone help to find out the issues why I cannot decode it?
>>>> 
>>>> Further troubleshooting, I found out if I use a kafka producer here <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send the avro data especially using kafka.serializer.DefaultEncoder. Above code can get correct result. Does any body know how to either set DefaultEncoder in Kafka Connect or set it when writing customized kafka connect? Or in the other way, how should I modify the AvroDeserializationSchema.java for instead?
>>>> 
>>>> Thanks, I’ll post this to the Flink user group as well.
>>>> Will
>> This e-mail and any files transmitted with it are confidential, may contain sensitive information, and are intended solely for the use of the individual or entity to whom they are addressed. If you have received this e-mail in error, please notify the sender by reply e-mail immediately and destroy all copies of the e-mail and any attachments.

Re: Link read avro from Kafka Connect Issue

Posted by Dayong <wi...@gmail.com>.
Not quite sure, will try to find out today.

Thanks,
Dayong

> On Nov 2, 2016, at 9:59 PM, "Tauzell, Dave" <Da...@surescripts.com> wrote:
> 
> Is Kafka connect adding some bytes to the beginning of the avro with the scheme registry id?
> 
> Dave
> 
>> On Nov 2, 2016, at 18:43, Will Du <wi...@gmail.com> wrote:
>> 
>> By using the kafka-avro-console-consumer I am able to get rich message from kafka connect with AvroConvert, but it got no output except schema from Flink
>> 
>> By using the producer with defaultEncoding, the kafka-avro-console-consumer throws exceptions show how. But Flink consumer works. But my target is to get Flink costume avro data produced by Kafka connect
>> 
>>> On Nov 2, 2016, at 7:36 PM, Will Du <wi...@gmail.com> wrote:
>>> 
>>> 
>>> On Nov 2, 2016, at 7:31 PM, Will Du <willddy@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi folks,
>>> I am trying to consume avro data from Kafka in Flink. The data is produced by Kafka connect using AvroConverter. I have created a AvroDeserializationSchema.java <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by Flink consumer. Then, I use following code to read it.
>>> 
>>> public static void main(String[] args) throws Exception {
>>>             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>             Properties properties = new Properties();
>>>             properties.setProperty("bootstrap.servers", “localhost:9092");
>>>             properties.setProperty("zookeeper.connect", “localhost:2181”);
>>> Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
>>>                                      + "\"type\": \"record\", "
>>>                                      + "\"fields\": "
>>>                                      +" [ "
>>>                                      + "  { \"name\": \"name\", \"type\": \"string\" },"
>>>                                      + "  { \"name\": \"symbol\", \"type\": \"string\" },"
>>>                                      + "  { \"name\": \"exchange\", \"type\": \"string\"}"
>>>                                      + "] "
>>>                                      +"}");
>>> 
>>>             AvroDeserializationSchema avroSchema = new AvroDeserializationSchema<>(schema);
>>>             FlinkKafkaConsumer09<GenericRecord> kafkaConsumer =
>>>                 new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, properties);
>>>             DataStream<GenericRecord> messageStream = env.addSource(kafkaConsumer);
>>>             messageStream.rebalance().print();
>>>             env.execute("Flink AVRO KAFKA Test");
>>> }
>>> 
>>> Once, I run the code, I am able to get the schema information only as follows.
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":”"}
>>> 
>>> Could anyone help to find out the issues why I cannot decode it?
>>> 
>>> Further troubleshooting, I found out if I use a kafka producer here <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send the avro data especially using kafka.serializer.DefaultEncoder. Above code can get correct result. Does any body know how to either set DefaultEncoder in Kafka Connect or set it when writing customized kafka connect? Or in the other way, how should I modify the AvroDeserializationSchema.java for instead?
>>> 
>>> Thanks, I’ll post this to the Flink user group as well.
>>> Will
>> 
> This e-mail and any files transmitted with it are confidential, may contain sensitive information, and are intended solely for the use of the individual or entity to whom they are addressed. If you have received this e-mail in error, please notify the sender by reply e-mail immediately and destroy all copies of the e-mail and any attachments.

Re: Link read avro from Kafka Connect Issue

Posted by "Tauzell, Dave" <Da...@surescripts.com>.
Is Kafka connect adding some bytes to the beginning of the avro with the scheme registry id?

Dave

> On Nov 2, 2016, at 18:43, Will Du <wi...@gmail.com> wrote:
>
> By using the kafka-avro-console-consumer I am able to get rich message from kafka connect with AvroConvert, but it got no output except schema from Flink
>
> By using the producer with defaultEncoding, the kafka-avro-console-consumer throws exceptions show how. But Flink consumer works. But my target is to get Flink costume avro data produced by Kafka connect
>
>> On Nov 2, 2016, at 7:36 PM, Will Du <wi...@gmail.com> wrote:
>>
>>
>> On Nov 2, 2016, at 7:31 PM, Will Du <willddy@gmail.com <ma...@gmail.com>> wrote:
>>
>> Hi folks,
>> I am trying to consume avro data from Kafka in Flink. The data is produced by Kafka connect using AvroConverter. I have created a AvroDeserializationSchema.java <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by Flink consumer. Then, I use following code to read it.
>>
>> public static void main(String[] args) throws Exception {
>>              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>              Properties properties = new Properties();
>>              properties.setProperty("bootstrap.servers", “localhost:9092");
>>              properties.setProperty("zookeeper.connect", “localhost:2181”);
>> Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
>>                                       + "\"type\": \"record\", "
>>                                       + "\"fields\": "
>>                                       +" [ "
>>                                       + "  { \"name\": \"name\", \"type\": \"string\" },"
>>                                       + "  { \"name\": \"symbol\", \"type\": \"string\" },"
>>                                       + "  { \"name\": \"exchange\", \"type\": \"string\"}"
>>                                       + "] "
>>                                       +"}");
>>
>>              AvroDeserializationSchema avroSchema = new AvroDeserializationSchema<>(schema);
>>              FlinkKafkaConsumer09<GenericRecord> kafkaConsumer =
>>                  new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, properties);
>>              DataStream<GenericRecord> messageStream = env.addSource(kafkaConsumer);
>>              messageStream.rebalance().print();
>>              env.execute("Flink AVRO KAFKA Test");
>> }
>>
>> Once, I run the code, I am able to get the schema information only as follows.
>> {"name":"", "symbol":"", "exchange":""}
>> {"name":"", "symbol":"", "exchange":""}
>> {"name":"", "symbol":"", "exchange":""}
>> {"name":"", "symbol":"", "exchange":”"}
>>
>> Could anyone help to find out the issues why I cannot decode it?
>>
>> Further troubleshooting, I found out if I use a kafka producer here <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send the avro data especially using kafka.serializer.DefaultEncoder. Above code can get correct result. Does any body know how to either set DefaultEncoder in Kafka Connect or set it when writing customized kafka connect? Or in the other way, how should I modify the AvroDeserializationSchema.java for instead?
>>
>> Thanks, I’ll post this to the Flink user group as well.
>> Will
>
This e-mail and any files transmitted with it are confidential, may contain sensitive information, and are intended solely for the use of the individual or entity to whom they are addressed. If you have received this e-mail in error, please notify the sender by reply e-mail immediately and destroy all copies of the e-mail and any attachments.

Re: Link read avro from Kafka Connect Issue

Posted by Will Du <wi...@gmail.com>.
By using the kafka-avro-console-consumer I am able to get rich message from kafka connect with AvroConvert, but it got no output except schema from Flink 

By using the producer with defaultEncoding, the kafka-avro-console-consumer throws exceptions show how. But Flink consumer works. But my target is to get Flink costume avro data produced by Kafka connect

> On Nov 2, 2016, at 7:36 PM, Will Du <wi...@gmail.com> wrote:
> 
> 
> On Nov 2, 2016, at 7:31 PM, Will Du <willddy@gmail.com <ma...@gmail.com>> wrote:
> 
> Hi folks,
> I am trying to consume avro data from Kafka in Flink. The data is produced by Kafka connect using AvroConverter. I have created a AvroDeserializationSchema.java <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by Flink consumer. Then, I use following code to read it.
> 
> public static void main(String[] args) throws Exception {
>               StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>               Properties properties = new Properties();
>               properties.setProperty("bootstrap.servers", “localhost:9092");
>               properties.setProperty("zookeeper.connect", “localhost:2181”);
> Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
> 	                                   + "\"type\": \"record\", "
> 	                                   + "\"fields\": "
> 	                                   +" [ "
> 	                                   + "  { \"name\": \"name\", \"type\": \"string\" },"
> 	                                   + "  { \"name\": \"symbol\", \"type\": \"string\" },"
> 	                                   + "  { \"name\": \"exchange\", \"type\": \"string\"}"
> 	                                   + "] "
> 	                                   +"}");
> 
>               AvroDeserializationSchema avroSchema = new AvroDeserializationSchema<>(schema);
>               FlinkKafkaConsumer09<GenericRecord> kafkaConsumer = 
>               	new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, properties);
>               DataStream<GenericRecord> messageStream = env.addSource(kafkaConsumer);
>               messageStream.rebalance().print();
>               env.execute("Flink AVRO KAFKA Test");
> }
> 
> Once, I run the code, I am able to get the schema information only as follows.
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":”"}
> 
> Could anyone help to find out the issues why I cannot decode it?
> 
> Further troubleshooting, I found out if I use a kafka producer here <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send the avro data especially using kafka.serializer.DefaultEncoder. Above code can get correct result. Does any body know how to either set DefaultEncoder in Kafka Connect or set it when writing customized kafka connect? Or in the other way, how should I modify the AvroDeserializationSchema.java for instead?
> 
> Thanks, I’ll post this to the Flink user group as well.
> Will


Link read avro from Kafka Connect Issue

Posted by Will Du <wi...@gmail.com>.
On Nov 2, 2016, at 7:31 PM, Will Du <wi...@gmail.com> wrote:

Hi folks,
I am trying to consume avro data from Kafka in Flink. The data is produced by Kafka connect using AvroConverter. I have created a AvroDeserializationSchema.java <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by Flink consumer. Then, I use following code to read it.

public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              Properties properties = new Properties();
              properties.setProperty("bootstrap.servers", “localhost:9092");
              properties.setProperty("zookeeper.connect", “localhost:2181”);
Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
	                                   + "\"type\": \"record\", "
	                                   + "\"fields\": "
	                                   +" [ "
	                                   + "  { \"name\": \"name\", \"type\": \"string\" },"
	                                   + "  { \"name\": \"symbol\", \"type\": \"string\" },"
	                                   + "  { \"name\": \"exchange\", \"type\": \"string\"}"
	                                   + "] "
	                                   +"}");

              AvroDeserializationSchema avroSchema = new AvroDeserializationSchema<>(schema);
              FlinkKafkaConsumer09<GenericRecord> kafkaConsumer = 
              	new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, properties);
              DataStream<GenericRecord> messageStream = env.addSource(kafkaConsumer);
              messageStream.rebalance().print();
              env.execute("Flink AVRO KAFKA Test");
}

Once, I run the code, I am able to get the schema information only as follows.
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":”"}

Could anyone help to find out the issues why I cannot decode it?

Further troubleshooting, I found out if I use a kafka producer here <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send the avro data especially using kafka.serializer.DefaultEncoder. Above code can get correct result. Does any body know how to either set DefaultEncoder in Kafka Connect or set it when writing customized kafka connect? Or in the other way, how should I modify the AvroDeserializationSchema.java for instead?

Thanks, I’ll post this to the Flink user group as well.
Will

Link read avro from Kafka Connect Issue

Posted by Will Du <wi...@gmail.com>.
Hi folks,
I am trying to consume avro data from Kafka in Flink. The data is produced by Kafka connect using AvroConverter. I have created a AvroDeserializationSchema.java <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by Flink consumer. Then, I use following code to read it.

public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              Properties properties = new Properties();
              properties.setProperty("bootstrap.servers", “localhost:9092");
              properties.setProperty("zookeeper.connect", “localhost:2181”);
Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
	                                   + "\"type\": \"record\", "
	                                   + "\"fields\": "
	                                   +" [ "
	                                   + "  { \"name\": \"name\", \"type\": \"string\" },"
	                                   + "  { \"name\": \"symbol\", \"type\": \"string\" },"
	                                   + "  { \"name\": \"exchange\", \"type\": \"string\"}"
	                                   + "] "
	                                   +"}");

              AvroDeserializationSchema avroSchema = new AvroDeserializationSchema<>(schema);
              FlinkKafkaConsumer09<GenericRecord> kafkaConsumer = 
              	new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, properties);
              DataStream<GenericRecord> messageStream = env.addSource(kafkaConsumer);
              messageStream.rebalance().print();
              env.execute("Flink AVRO KAFKA Test");
}

Once, I run the code, I am able to get the schema information only as follows.
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":”"}

Could anyone help to find out the issues why I cannot decode it?

Further troubleshooting, I found out if I use a kafka producer here <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send the avro data especially using kafka.serializer.DefaultEncoder. Above code can get correct result. Does any body know how to either set DefaultEncoder in Kafka Connect or set it when writing customized kafka connect? Or in the other way, how should I modify the AvroDeserializationSchema.java for instead?

Thanks, I’ll post this to the Kafka user group as well.
Will





Re: Reg. custom sink for Flink streaming

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

a MapFunction should be the way to go for this use case.
What exactly is not working? Do you get an exception? Is the map method not
called?

Best, Fabian

2016-11-03 0:00 GMT+01:00 Sandeep Vakacharla <sv...@fanatics.com>:

> Hi there,
>
>
>
> I have the following use case-
>
>
>
> I have data coming from Kafka which I need to stream and write each
> message to a database. I’m using kafka-flink connector for streaming data
> from Kafka. I don’t want to use flink sinks to write date from stream.
>
>
>
> I’m doing the following which doesn’t seem to work-
>
>
>
> messageStream
>         .rebalance()
>         .map(*new *MapFunction<String, Object>() {
>             @Override
>             *public *String map(String value) {
>                 getDbSession().execute(*"insert into TABLE_XXX (key,
> event_timeuuid, data) " *+
>                         *"VALUES ("*+ i+*",null, value); "*);
>                 *return *value;
>             }
>         })
>
>
>
> How can I iterate over each message in the stream and do something with
> that message?
>
>
>
> Thanks
>
>
> Information contained in this e-mail message is confidential. This e-mail
> message is intended only for the personal use of the recipient(s) named
> above. If you are not an intended recipient, do not read, distribute or
> reproduce this transmission (including any attachments). If you have
> received this email in error, please immediately notify the sender by email
> reply and delete the original message.
>