You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sridhar Chellappa <fl...@gmail.com> on 2017/08/28 14:38:37 UTC
Null Pointer Exception on Trying to read a message from Kafka
Folks,
I have a KafkaConsumer that I am trying to read messages from. When I try
to create a DataStream from the KafkConsumer (env.addSource()) I get the
following exception :
Any idea on how can this happen?
java.lang.NullPointerException
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Re: Null Pointer Exception on Trying to read a message from Kafka
Posted by Sridhar Chellappa <fl...@gmail.com>.
OK. I got past the problem. Basically, I had to change
public class MyKafkaMessageSerDeSchema implements
DeserializationSchema<MyKafkaMessage>,
SerializationSchema<MyKafkaMessage> {
@Override
public MyKafkaMessage deserialize(byte[] message) throws IOException {
MyKafkaMessage MyKafkaMessage = null;
try {
MyKafkaMessage = MyKafkaMessage.parseFrom(message);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
} finally {
return MyKafkaMessage;
}
}
@Override
public boolean isEndOfStream(MyKafkaMessage nextElement) {
return false;
}
@Override
public TypeInformation<MyKafkaMessage> getProducedType() {
return TypeExtractor.getForClass(MyKafkaMessage.class);;
-------------------------> Add Type Info
}
@Override
public byte[] serialize(MyKafkaMessage element) {
return
element.byteArray();
--------------------------> modify serializer
}
}
When I run my program, I get another exception :
java.lang.NullPointerException
at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
at java.util.AbstractList.add(AbstractList.java:108)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
On Tue, Aug 29, 2017 at 8:43 AM, Ted Yu <yu...@gmail.com> wrote:
> The NPE came from this line:
>
> StreamRecord<T> copy = castRecord.copy(serializer.
> copy(castRecord.getValue()));
>
> Either serializer or castRecord was null.
>
> I wonder if this has been fixed in 1.3.2 release.
>
> On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa <fl...@gmail.com>
> wrote:
>
>> Kafka Version is 0.10.0
>>
>> On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <fl...@gmail.com>
>> wrote:
>>
>>> 1.3.0
>>>
>>> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Which Flink version are you using (so that line numbers can be matched
>>>> with source code) ?
>>>>
>>>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <
>>>> flinkenthu@gmail.com> wrote:
>>>>
>>>>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>>>>> getStreamSource(env, parameterTool);
>>>>> );
>>>>>
>>>>>
>>>>>
>>>>> public RichParallelSourceFunction<MyKafkaMessage>
>>>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>>>>> parameterTool) {
>>>>>
>>>>> // MyKAfkaMessage is a ProtoBuf message
>>>>> env.getConfig().registerTypeWi
>>>>> thKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);
>>>>>
>>>>> KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>>>>> new KafkaDataSource<MyKafkaMessage>(parameterTool,
>>>>> new MyKafkaMessageSerDeSchema());
>>>>>
>>>>> return flinkCepConsumer;
>>>>> }
>>>>>
>>>>>
>>>>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>>>>
>>>>> public KafkaDataSource(ParameterTool parameterTool,
>>>>> DeserializationSchema<T> deserializer) {
>>>>> super(
>>>>> Arrays.asList(parameterTool.ge
>>>>> tRequired("topic").split(",")),
>>>>> deserializer,
>>>>> parameterTool.getProperties()
>>>>> );
>>>>>
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>> public class MyKafkaMessageSerDeSchema implements
>>>>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>>>>> {
>>>>>
>>>>> @Override
>>>>> public MyKafkaMessage deserialize(byte[] message) throws
>>>>> IOException {
>>>>> MyKafkaMessage MyKafkaMessage = null;
>>>>> try {
>>>>> MyKafkaMessage = MyKafkaMessage.parseFrom(message);
>>>>> } catch (InvalidProtocolBufferException e) {
>>>>> e.printStackTrace();
>>>>> } finally {
>>>>> return MyKafkaMessage;
>>>>> }
>>>>> }
>>>>>
>>>>> @Override
>>>>> public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>>>> return false;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public TypeInformation<MyKafkaMessage> getProducedType() {
>>>>> return null;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public byte[] serialize(MyKafkaMessage element) {
>>>>> return new byte[0];
>>>>> }
>>>>> }
>>>>>
>>>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>
>>>>>> Which version of Flink / Kafka are you using ?
>>>>>>
>>>>>> Can you show the snippet of code where you create the DataStream ?
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <
>>>>>> flinkenthu@gmail.com> wrote:
>>>>>>
>>>>>>> Folks,
>>>>>>>
>>>>>>> I have a KafkaConsumer that I am trying to read messages from. When
>>>>>>> I try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>>>>>> the following exception :
>>>>>>>
>>>>>>> Any idea on how can this happen?
>>>>>>>
>>>>>>> java.lang.NullPointerException
>>>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>>>>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>>>>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>>>>> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>>>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>>>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>>>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>>>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>>>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>>>>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Re: Null Pointer Exception on Trying to read a message from Kafka
Posted by Ted Yu <yu...@gmail.com>.
The NPE came from this line:
StreamRecord<T> copy =
castRecord.copy(serializer.copy(castRecord.getValue()));
Either serializer or castRecord was null.
I wonder if this has been fixed in 1.3.2 release.
On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa <fl...@gmail.com>
wrote:
> Kafka Version is 0.10.0
>
> On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <fl...@gmail.com>
> wrote:
>
>> 1.3.0
>>
>> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> Which Flink version are you using (so that line numbers can be matched
>>> with source code) ?
>>>
>>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <flinkenthu@gmail.com
>>> > wrote:
>>>
>>>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>>>> getStreamSource(env, parameterTool);
>>>> );
>>>>
>>>>
>>>>
>>>> public RichParallelSourceFunction<MyKafkaMessage>
>>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>>>> parameterTool) {
>>>>
>>>> // MyKAfkaMessage is a ProtoBuf message
>>>> env.getConfig().registerTypeWi
>>>> thKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);
>>>>
>>>> KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>>>> new KafkaDataSource<MyKafkaMessage>(parameterTool,
>>>> new MyKafkaMessageSerDeSchema());
>>>>
>>>> return flinkCepConsumer;
>>>> }
>>>>
>>>>
>>>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>>>
>>>> public KafkaDataSource(ParameterTool parameterTool,
>>>> DeserializationSchema<T> deserializer) {
>>>> super(
>>>> Arrays.asList(parameterTool.ge
>>>> tRequired("topic").split(",")),
>>>> deserializer,
>>>> parameterTool.getProperties()
>>>> );
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>> public class MyKafkaMessageSerDeSchema implements
>>>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>>>> {
>>>>
>>>> @Override
>>>> public MyKafkaMessage deserialize(byte[] message) throws
>>>> IOException {
>>>> MyKafkaMessage MyKafkaMessage = null;
>>>> try {
>>>> MyKafkaMessage = MyKafkaMessage.parseFrom(message);
>>>> } catch (InvalidProtocolBufferException e) {
>>>> e.printStackTrace();
>>>> } finally {
>>>> return MyKafkaMessage;
>>>> }
>>>> }
>>>>
>>>> @Override
>>>> public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>>> return false;
>>>> }
>>>>
>>>> @Override
>>>> public TypeInformation<MyKafkaMessage> getProducedType() {
>>>> return null;
>>>> }
>>>>
>>>> @Override
>>>> public byte[] serialize(MyKafkaMessage element) {
>>>> return new byte[0];
>>>> }
>>>> }
>>>>
>>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>>> Which version of Flink / Kafka are you using ?
>>>>>
>>>>> Can you show the snippet of code where you create the DataStream ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <
>>>>> flinkenthu@gmail.com> wrote:
>>>>>
>>>>>> Folks,
>>>>>>
>>>>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>>>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>>>>> the following exception :
>>>>>>
>>>>>> Any idea on how can this happen?
>>>>>>
>>>>>> java.lang.NullPointerException
>>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>>>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>>>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>>>> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>>>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Re: Null Pointer Exception on Trying to read a message from Kafka
Posted by Sridhar Chellappa <fl...@gmail.com>.
Kafka Version is 0.10.0
On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:
> 1.3.0
>
> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Which Flink version are you using (so that line numbers can be matched
>> with source code) ?
>>
>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <fl...@gmail.com>
>> wrote:
>>
>>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>>> getStreamSource(env, parameterTool);
>>> );
>>>
>>>
>>>
>>> public RichParallelSourceFunction<MyKafkaMessage>
>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>>> parameterTool) {
>>>
>>> // MyKAfkaMessage is a ProtoBuf message
>>> env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
>>> ProtobufSerializer.class);
>>>
>>> KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>>> new KafkaDataSource<MyKafkaMessage>(parameterTool,
>>> new MyKafkaMessageSerDeSchema());
>>>
>>> return flinkCepConsumer;
>>> }
>>>
>>>
>>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>>
>>> public KafkaDataSource(ParameterTool parameterTool,
>>> DeserializationSchema<T> deserializer) {
>>> super(
>>> Arrays.asList(parameterTool.ge
>>> tRequired("topic").split(",")),
>>> deserializer,
>>> parameterTool.getProperties()
>>> );
>>>
>>> }
>>>
>>> }
>>>
>>> public class MyKafkaMessageSerDeSchema implements
>>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>>> {
>>>
>>> @Override
>>> public MyKafkaMessage deserialize(byte[] message) throws IOException
>>> {
>>> MyKafkaMessage MyKafkaMessage = null;
>>> try {
>>> MyKafkaMessage = MyKafkaMessage.parseFrom(message);
>>> } catch (InvalidProtocolBufferException e) {
>>> e.printStackTrace();
>>> } finally {
>>> return MyKafkaMessage;
>>> }
>>> }
>>>
>>> @Override
>>> public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>> return false;
>>> }
>>>
>>> @Override
>>> public TypeInformation<MyKafkaMessage> getProducedType() {
>>> return null;
>>> }
>>>
>>> @Override
>>> public byte[] serialize(MyKafkaMessage element) {
>>> return new byte[0];
>>> }
>>> }
>>>
>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Which version of Flink / Kafka are you using ?
>>>>
>>>> Can you show the snippet of code where you create the DataStream ?
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <
>>>> flinkenthu@gmail.com> wrote:
>>>>
>>>>> Folks,
>>>>>
>>>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>>>> the following exception :
>>>>>
>>>>> Any idea on how can this happen?
>>>>>
>>>>> java.lang.NullPointerException
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>>> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>>
>>>>
>>>
>>
>
Re: Null Pointer Exception on Trying to read a message from Kafka
Posted by Sridhar Chellappa <fl...@gmail.com>.
1.3.0
On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yu...@gmail.com> wrote:
> Which Flink version are you using (so that line numbers can be matched
> with source code) ?
>
> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <fl...@gmail.com>
> wrote:
>
>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>> getStreamSource(env, parameterTool);
>> );
>>
>>
>>
>> public RichParallelSourceFunction<MyKafkaMessage>
>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>> parameterTool) {
>>
>> // MyKAfkaMessage is a ProtoBuf message
>> env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
>> ProtobufSerializer.class);
>>
>> KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>> new KafkaDataSource<MyKafkaMessage>(parameterTool,
>> new MyKafkaMessageSerDeSchema());
>>
>> return flinkCepConsumer;
>> }
>>
>>
>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>
>> public KafkaDataSource(ParameterTool parameterTool,
>> DeserializationSchema<T> deserializer) {
>> super(
>> Arrays.asList(parameterTool.ge
>> tRequired("topic").split(",")),
>> deserializer,
>> parameterTool.getProperties()
>> );
>>
>> }
>>
>> }
>>
>> public class MyKafkaMessageSerDeSchema implements
>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>> {
>>
>> @Override
>> public MyKafkaMessage deserialize(byte[] message) throws IOException {
>> MyKafkaMessage MyKafkaMessage = null;
>> try {
>> MyKafkaMessage = MyKafkaMessage.parseFrom(message);
>> } catch (InvalidProtocolBufferException e) {
>> e.printStackTrace();
>> } finally {
>> return MyKafkaMessage;
>> }
>> }
>>
>> @Override
>> public boolean isEndOfStream(MyKafkaMessage nextElement) {
>> return false;
>> }
>>
>> @Override
>> public TypeInformation<MyKafkaMessage> getProducedType() {
>> return null;
>> }
>>
>> @Override
>> public byte[] serialize(MyKafkaMessage element) {
>> return new byte[0];
>> }
>> }
>>
>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> Which version of Flink / Kafka are you using ?
>>>
>>> Can you show the snippet of code where you create the DataStream ?
>>>
>>> Cheers
>>>
>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <flinkenthu@gmail.com
>>> > wrote:
>>>
>>>> Folks,
>>>>
>>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>>> the following exception :
>>>>
>>>> Any idea on how can this happen?
>>>>
>>>> java.lang.NullPointerException
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>
>>
>
Re: Null Pointer Exception on Trying to read a message from Kafka
Posted by Ted Yu <yu...@gmail.com>.
Which Flink version are you using (so that line numbers can be matched with
source code) ?
On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:
> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
> getStreamSource(env, parameterTool);
> );
>
>
>
> public RichParallelSourceFunction<MyKafkaMessage> getStreamSource(StreamExecutionEnvironment
> env, ParameterTool parameterTool) {
>
> // MyKAfkaMessage is a ProtoBuf message
> env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
> ProtobufSerializer.class);
>
> KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
> new KafkaDataSource<MyKafkaMessage>(parameterTool,
> new MyKafkaMessageSerDeSchema());
>
> return flinkCepConsumer;
> }
>
>
> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>
> public KafkaDataSource(ParameterTool parameterTool,
> DeserializationSchema<T> deserializer) {
> super(
> Arrays.asList(parameterTool.getRequired("topic").split(","
> )),
> deserializer,
> parameterTool.getProperties()
> );
>
> }
>
> }
>
> public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>,
> SerializationSchema<MyKafkaMessage> {
>
> @Override
> public MyKafkaMessage deserialize(byte[] message) throws IOException {
> MyKafkaMessage MyKafkaMessage = null;
> try {
> MyKafkaMessage = MyKafkaMessage.parseFrom(message);
> } catch (InvalidProtocolBufferException e) {
> e.printStackTrace();
> } finally {
> return MyKafkaMessage;
> }
> }
>
> @Override
> public boolean isEndOfStream(MyKafkaMessage nextElement) {
> return false;
> }
>
> @Override
> public TypeInformation<MyKafkaMessage> getProducedType() {
> return null;
> }
>
> @Override
> public byte[] serialize(MyKafkaMessage element) {
> return new byte[0];
> }
> }
>
> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Which version of Flink / Kafka are you using ?
>>
>> Can you show the snippet of code where you create the DataStream ?
>>
>> Cheers
>>
>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <fl...@gmail.com>
>> wrote:
>>
>>> Folks,
>>>
>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>> the following exception :
>>>
>>> Any idea on how can this happen?
>>>
>>> java.lang.NullPointerException
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>
>
Re: Null Pointer Exception on Trying to read a message from Kafka
Posted by Sridhar Chellappa <fl...@gmail.com>.
DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
getStreamSource(env, parameterTool);
);
public RichParallelSourceFunction<MyKafkaMessage>
getStreamSource(StreamExecutionEnvironment env, ParameterTool
parameterTool) {
// MyKAfkaMessage is a ProtoBuf message
env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
ProtobufSerializer.class);
KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
new KafkaDataSource<MyKafkaMessage>(parameterTool, new
MyKafkaMessageSerDeSchema());
return flinkCepConsumer;
}
public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
public KafkaDataSource(ParameterTool parameterTool,
DeserializationSchema<T> deserializer) {
super(
Arrays.asList(parameterTool.getRequired("topic").split(",")),
deserializer,
parameterTool.getProperties()
);
}
}
public class MyKafkaMessageSerDeSchema implements
DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> {
@Override
public MyKafkaMessage deserialize(byte[] message) throws IOException {
MyKafkaMessage MyKafkaMessage = null;
try {
MyKafkaMessage = MyKafkaMessage.parseFrom(message);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
} finally {
return MyKafkaMessage;
}
}
@Override
public boolean isEndOfStream(MyKafkaMessage nextElement) {
return false;
}
@Override
public TypeInformation<MyKafkaMessage> getProducedType() {
return null;
}
@Override
public byte[] serialize(MyKafkaMessage element) {
return new byte[0];
}
}
On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:
> Which version of Flink / Kafka are you using ?
>
> Can you show the snippet of code where you create the DataStream ?
>
> Cheers
>
> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <fl...@gmail.com>
> wrote:
>
>> Folks,
>>
>> I have a KafkaConsumer that I am trying to read messages from. When I try
>> to create a DataStream from the KafkConsumer (env.addSource()) I get the
>> following exception :
>>
>> Any idea on how can this happen?
>>
>> java.lang.NullPointerException
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>
Re: Null Pointer Exception on Trying to read a message from Kafka
Posted by Ted Yu <yu...@gmail.com>.
Which version of Flink / Kafka are you using ?
Can you show the snippet of code where you create the DataStream ?
Cheers
On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:
> Folks,
>
> I have a KafkaConsumer that I am trying to read messages from. When I try
> to create a DataStream from the KafkConsumer (env.addSource()) I get the
> following exception :
>
> Any idea on how can this happen?
>
> java.lang.NullPointerException
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
> at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
>
>