You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Pradeep Anumala <pr...@gmail.com> on 2017/04/19 23:38:07 UTC

Read from and Write to Kafka through flink

Hi,
  I am a beginner with Apache Flink. I am trying to write to Kafka through
a file and read the data from kafka. I see there is an API to read from and
write to kafka.

The following writes to kafka
FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
            "localhost:9092",            // broker list
            "my-topic",                  // target topic
            new SimpleStringSchema());   // serialization schema

Is there any API which takes input as file and writes the file content to
kafka ?


My second question
-------------------------
I have run the kafka producer on the terminal
I am trying to read from kafka using the below code. But this doesn't print
any output though I am giving some input in the producer terminal.
The program quickly executes comes out. Please let me know how I can read
from kafka ?

 DataStream<String> data = env.addSource(new
FlinkKafkaConsumer010<String>("myTopic",new SimpleStringSchema(),
                props)).print();

Re: Read from and Write to Kafka through flink

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Pradeep,

There is not single API or connector to take input as a file and writing it to Kafka.
In Flink, this operation consists of 2 parts, 1) source reading from input, and 2) sink producing to Kafka.
So, all you have to have a job that consists of that source and sink.

You’ve already figured out 2). For 1), you can take a look at the built-in file reading source: `StreamExecutionEnvironment.readFile`.

The program quickly executes comes out. 

I might need some more information here:
Do you mean that the job finished executing very fast?
If so, there should be an error of some kind. Could you find and paste it here?

If the job is actually running, and you’re constantly writing to the Kafka topic, but the job just isn’t consuming them, there are a few things you could probably check:
1) are you sure the Kafka broker is the same version as the connector you are using?
2) make sure that you are using different consumer groups, if the offsets are committed back to Kafka. Check out [1] to see in which conditions offsets are committed.

By the way, I’m continuing this thread only on the user@ mailing list, as that’s the more suitable place for this.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
On 20 April 2017 at 7:38:36 AM, Pradeep Anumala (pradeep.anumala1@gmail.com) wrote:

Hi,  
I am a beginner with Apache Flink. I am trying to write to Kafka through  
a file and read the data from kafka. I see there is an API to read from and  
write to kafka.  

The following writes to kafka  
FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(  
"localhost:9092", // broker list  
"my-topic", // target topic  
new SimpleStringSchema()); // serialization schema  

Is there any API which takes input as file and writes the file content to  
kafka ?  


My second question  
-------------------------  
I have run the kafka producer on the terminal  
I am trying to read from kafka using the below code. But this doesn't print  
any output though I am giving some input in the producer terminal.  
The program quickly executes comes out. Please let me know how I can read  
from kafka ?  

DataStream<String> data = env.addSource(new  
FlinkKafkaConsumer010<String>("myTopic",new SimpleStringSchema(),  
props)).print();