You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sridhar Chellappa <fl...@gmail.com> on 2017/07/11 07:12:36 UTC
Not able to read Kafka Messages with FlinkKafkaConsumer010
I am pretty sure I am doing something wrong here. Just that I do not
understand why?
I wrote a small program that reads messages from Kafka and prints it out.
public class Main {
private static final int CHECKPOINT_INTERVAL = 100000;
private static Properties getpropsFromEnv() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", System.getenv("KAFKA_ADDRESS"));
props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID"));
props.setProperty("topic", System.getenv("KAFKA_TOPIC"));
return props;
}
public static void main(String[] args) throws Exception {
Properties props = getpropsFromEnv();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(CHECKPOINT_INTERVAL);
env.setParallelism(1);
FlinkKafkaConsumer010<LogMessage> flinkConsumer =
new FlinkKafkaConsumer010<LogMessage>(
Arrays.asList(parameterTool.getRequired("topic").split(",")), new
LogDeserializationSchema(), parameterTool.getProperties()
);
DataStream<LogMessage> logMessageDataStream =
env.addSource(flinkConsumer);
logMessageDataStream.print();
env.execute("SomeJob");
}
}
public class LogDeserializationSchema implements
DeserializationSchema<LogMessage> {
@Override
public LogMessage deserialize(byte[] message) {
LogMessage logMessage = null;
try {
logMessage = LogMessage.parseFrom(message);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
} finally {
return logMessage;
}
}
@Override
public boolean isEndOfStream(LogMessage nextElement) {
return false;
}
@Override
public TypeInformation<LogMessage> getProducedType() {
return TypeExtractor.getForClass(LogMessage.class);
}
}
When I run this program, I do not see any messages being read by the consumer.
Things to note :
1. I ran kafka-console-consumer using the same Kafka parameters and
saw continuous output.
2. My Gradle file has the following depencies :
dependencies {
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7'
compile 'org.aeonbits.owner:owner:1.0.9'
compile group: 'com.mashape.unirest', name: 'unirest-java',
version: '1.4.9' // For driver Suspension
compile group: 'joda-time', name: 'joda-time', version: '2.9.4'
compile 'com.google.protobuf:protobuf-java-util:3.1.0'
/*
* Flink Dependencies
*/
compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0'
compile group: 'org.apache.flink', name:
'flink-connector-kafka-0.10_2.10', version: '1.3.0'
compile group: 'org.apache.flink', name:
'flink-streaming-java_2.10', version: '1.3.0'
compile group: 'org.apache.flink', name: 'flink-clients_2.10',
version: '1.3.0'
}
Can Someone please help ?
Re: Not able to read Kafka Messages with FlinkKafkaConsumer010
Posted by Sridhar Chellappa <fl...@gmail.com>.
Thanks Ziyad. That was a cut and paste error. Anyway, I figured out a
solution to the issue. All of my Flink dependancies were pointing at 1.3.1.
Pointing at 1.3.0 resolved the issue.
On Wed, Jul 12, 2017 at 2:17 AM, Ziyad Muhammed <mm...@gmail.com> wrote:
> Hi Sridhar
>
> Are you using *ParameterTool *to set the properties? I couldn't see it in
> your code, but you use it in the below line:
>
> FlinkKafkaConsumer010<LogMessage> flinkConsumer =
> new FlinkKafkaConsumer010<LogMessage>(
> Arrays.asList(parameterTool.getRequired("topic").split(",")), new LogDeserializationSchema(), parameterTool.getProperties());
>
>
> Make sure that the correct properties are passed to FlinkKafkaConsumer.
>
>
> Best
>
> Ziyad
>
>
>
> Best Regards
> *Ziyad Muhammed Mohiyudheen *
> 407, Internationales Studienzentrum Berlin
> Theodor-Heuss-Platz 5
> 14052 Berlin
> *Ph: +49 176 6587 3343 <%2B49%20176%206587%203343>*
> *Mail to*: *mmziyad@gmail.com <mm...@gmail.com>*
>
> On Tue, Jul 11, 2017 at 9:12 AM, Sridhar Chellappa <fl...@gmail.com>
> wrote:
>
>> I am pretty sure I am doing something wrong here. Just that I do not
>> understand why?
>>
>> I wrote a small program that reads messages from Kafka and prints it out.
>>
>>
>>
>> public class Main {
>>
>> private static final int CHECKPOINT_INTERVAL = 100000;
>>
>>
>> private static Properties getpropsFromEnv() {
>> Properties props = new Properties();
>> props.setProperty("bootstrap.servers", System.getenv("KAFKA_ADDRESS"));
>> props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID"));
>> props.setProperty("topic", System.getenv("KAFKA_TOPIC"));
>> return props;
>> }
>>
>> public static void main(String[] args) throws Exception {
>>
>>
>> Properties props = getpropsFromEnv();
>>
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(CHECKPOINT_INTERVAL);
>> env.setParallelism(1);
>> FlinkKafkaConsumer010<LogMessage> flinkConsumer =
>> new FlinkKafkaConsumer010<LogMessage>(
>> Arrays.asList(parameterTool.getRequired("topic").split(",")), new LogDeserializationSchema(), parameterTool.getProperties()
>> );
>>
>> DataStream<LogMessage> logMessageDataStream = env.addSource(flinkConsumer);
>> logMessageDataStream.print();
>>
>> env.execute("SomeJob");
>>
>> }
>> }
>>
>> public class LogDeserializationSchema implements DeserializationSchema<LogMessage> {
>>
>> @Override
>> public LogMessage deserialize(byte[] message) {
>> LogMessage logMessage = null;
>> try {
>> logMessage = LogMessage.parseFrom(message);
>> } catch (InvalidProtocolBufferException e) {
>> e.printStackTrace();
>> } finally {
>> return logMessage;
>> }
>> }
>>
>> @Override
>> public boolean isEndOfStream(LogMessage nextElement) {
>> return false;
>> }
>>
>> @Override
>> public TypeInformation<LogMessage> getProducedType() {
>> return TypeExtractor.getForClass(LogMessage.class);
>> }
>> }
>>
>>
>> When I run this program, I do not see any messages being read by the consumer.
>>
>> Things to note :
>>
>> 1. I ran kafka-console-consumer using the same Kafka parameters and saw continuous output.
>>
>> 2. My Gradle file has the following depencies :
>>
>> dependencies {
>> compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7'
>>
>>
>> compile 'org.aeonbits.owner:owner:1.0.9'
>> compile group: 'com.mashape.unirest', name: 'unirest-java', version: '1.4.9' // For driver Suspension
>> compile group: 'joda-time', name: 'joda-time', version: '2.9.4'
>> compile 'com.google.protobuf:protobuf-java-util:3.1.0'
>>
>> /*
>> * Flink Dependencies
>> */
>> compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0'
>> compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.10', version: '1.3.0'
>> compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', version: '1.3.0'
>> compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: '1.3.0'
>>
>>
>>
>>
>> }
>>
>> Can Someone please help ?
>>
>>
>
Re: Not able to read Kafka Messages with FlinkKafkaConsumer010
Posted by Ziyad Muhammed <mm...@gmail.com>.
Hi Sridhar
Are you using *ParameterTool *to set the properties? I couldn't see it in
your code, but you use it in the below line:
FlinkKafkaConsumer010<LogMessage> flinkConsumer =
new FlinkKafkaConsumer010<LogMessage>(
Arrays.asList(parameterTool.getRequired("topic").split(",")), new
LogDeserializationSchema(), parameterTool.getProperties());
Make sure that the correct properties are passed to FlinkKafkaConsumer.
Best
Ziyad
Best Regards
*Ziyad Muhammed Mohiyudheen *
407, Internationales Studienzentrum Berlin
Theodor-Heuss-Platz 5
14052 Berlin
*Ph: +49 176 6587 3343 <%2B49%20176%206587%203343>*
*Mail to*: *mmziyad@gmail.com <mm...@gmail.com>*
On Tue, Jul 11, 2017 at 9:12 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:
> I am pretty sure I am doing something wrong here. Just that I do not
> understand why?
>
> I wrote a small program that reads messages from Kafka and prints it out.
>
>
>
> public class Main {
>
> private static final int CHECKPOINT_INTERVAL = 100000;
>
>
> private static Properties getpropsFromEnv() {
> Properties props = new Properties();
> props.setProperty("bootstrap.servers", System.getenv("KAFKA_ADDRESS"));
> props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID"));
> props.setProperty("topic", System.getenv("KAFKA_TOPIC"));
> return props;
> }
>
> public static void main(String[] args) throws Exception {
>
>
> Properties props = getpropsFromEnv();
>
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(CHECKPOINT_INTERVAL);
> env.setParallelism(1);
> FlinkKafkaConsumer010<LogMessage> flinkConsumer =
> new FlinkKafkaConsumer010<LogMessage>(
> Arrays.asList(parameterTool.getRequired("topic").split(",")), new LogDeserializationSchema(), parameterTool.getProperties()
> );
>
> DataStream<LogMessage> logMessageDataStream = env.addSource(flinkConsumer);
> logMessageDataStream.print();
>
> env.execute("SomeJob");
>
> }
> }
>
> public class LogDeserializationSchema implements DeserializationSchema<LogMessage> {
>
> @Override
> public LogMessage deserialize(byte[] message) {
> LogMessage logMessage = null;
> try {
> logMessage = LogMessage.parseFrom(message);
> } catch (InvalidProtocolBufferException e) {
> e.printStackTrace();
> } finally {
> return logMessage;
> }
> }
>
> @Override
> public boolean isEndOfStream(LogMessage nextElement) {
> return false;
> }
>
> @Override
> public TypeInformation<LogMessage> getProducedType() {
> return TypeExtractor.getForClass(LogMessage.class);
> }
> }
>
>
> When I run this program, I do not see any messages being read by the consumer.
>
> Things to note :
>
> 1. I ran kafka-console-consumer using the same Kafka parameters and saw continuous output.
>
> 2. My Gradle file has the following depencies :
>
> dependencies {
> compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7'
>
>
> compile 'org.aeonbits.owner:owner:1.0.9'
> compile group: 'com.mashape.unirest', name: 'unirest-java', version: '1.4.9' // For driver Suspension
> compile group: 'joda-time', name: 'joda-time', version: '2.9.4'
> compile 'com.google.protobuf:protobuf-java-util:3.1.0'
>
> /*
> * Flink Dependencies
> */
> compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0'
> compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.10', version: '1.3.0'
> compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', version: '1.3.0'
> compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: '1.3.0'
>
>
>
>
> }
>
> Can Someone please help ?
>
>