You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sridhar Chellappa <fl...@gmail.com> on 2017/08/28 14:38:37 UTC

Null Pointer Exception on Trying to read a message from Kafka

Folks,

I have a KafkaConsumer that I am trying to read messages from. When I try
to create a DataStream from the KafkConsumer (env.addSource()) I get the
following exception :

Any idea on how can this happen?

java.lang.NullPointerException
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)

Re: Null Pointer Exception on Trying to read a message from Kafka

Posted by Sridhar Chellappa <fl...@gmail.com>.
OK. I got past the problem. Basically, I had to change

public class MyKafkaMessageSerDeSchema implements
DeserializationSchema<MyKafkaMessage>,
SerializationSchema<MyKafkaMessage> {

    @Override
    public MyKafkaMessage deserialize(byte[] message) throws IOException {
        MyKafkaMessage MyKafkaMessage = null;
        try {
            MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return MyKafkaMessage;
        }
    }

    @Override
    public boolean isEndOfStream(MyKafkaMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<MyKafkaMessage> getProducedType() {
        return TypeExtractor.getForClass(MyKafkaMessage.class);;
-------------------------> Add Type Info
    }

    @Override
    public byte[] serialize(MyKafkaMessage element) {
        return
element.byteArray();
--------------------------> modify serializer
    }
}


When I run my program, I get another exception :


java.lang.NullPointerException
	at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
	at java.util.AbstractList.add(AbstractList.java:108)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)


On Tue, Aug 29, 2017 at 8:43 AM, Ted Yu <yu...@gmail.com> wrote:

> The NPE came from this line:
>
>         StreamRecord<T> copy = castRecord.copy(serializer.
> copy(castRecord.getValue()));
>
> Either serializer or castRecord was null.
>
> I wonder if this has been fixed in 1.3.2 release.
>
> On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa <fl...@gmail.com>
> wrote:
>
>> Kafka Version is 0.10.0
>>
>> On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <fl...@gmail.com>
>> wrote:
>>
>>> 1.3.0
>>>
>>> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Which Flink version are you using (so that line numbers can be matched
>>>> with source code) ?
>>>>
>>>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <
>>>> flinkenthu@gmail.com> wrote:
>>>>
>>>>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>>>>>                 getStreamSource(env, parameterTool);
>>>>>         );
>>>>>
>>>>>
>>>>>
>>>>>         public RichParallelSourceFunction<MyKafkaMessage>
>>>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>>>>> parameterTool) {
>>>>>
>>>>>            // MyKAfkaMessage is a ProtoBuf message
>>>>>             env.getConfig().registerTypeWi
>>>>> thKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);
>>>>>
>>>>>             KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>>>>>                     new KafkaDataSource<MyKafkaMessage>(parameterTool,
>>>>> new MyKafkaMessageSerDeSchema());
>>>>>
>>>>>             return flinkCepConsumer;
>>>>>         }
>>>>>
>>>>>
>>>>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>>>>
>>>>>     public KafkaDataSource(ParameterTool parameterTool,
>>>>> DeserializationSchema<T> deserializer) {
>>>>>         super(
>>>>>                 Arrays.asList(parameterTool.ge
>>>>> tRequired("topic").split(",")),
>>>>>                 deserializer,
>>>>>                 parameterTool.getProperties()
>>>>>         );
>>>>>
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>> public class MyKafkaMessageSerDeSchema implements
>>>>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>>>>> {
>>>>>
>>>>>     @Override
>>>>>     public MyKafkaMessage deserialize(byte[] message) throws
>>>>> IOException {
>>>>>         MyKafkaMessage MyKafkaMessage = null;
>>>>>         try {
>>>>>             MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>>>>>         } catch (InvalidProtocolBufferException e) {
>>>>>             e.printStackTrace();
>>>>>         } finally {
>>>>>             return MyKafkaMessage;
>>>>>         }
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>>>>         return false;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public TypeInformation<MyKafkaMessage> getProducedType() {
>>>>>         return null;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public byte[] serialize(MyKafkaMessage element) {
>>>>>         return new byte[0];
>>>>>     }
>>>>> }
>>>>>
>>>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>
>>>>>> Which version of Flink / Kafka are you using ?
>>>>>>
>>>>>> Can you show the snippet of code where you create the DataStream ?
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <
>>>>>> flinkenthu@gmail.com> wrote:
>>>>>>
>>>>>>> Folks,
>>>>>>>
>>>>>>> I have a KafkaConsumer that I am trying to read messages from. When
>>>>>>> I try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>>>>>> the following exception :
>>>>>>>
>>>>>>> Any idea on how can this happen?
>>>>>>>
>>>>>>> java.lang.NullPointerException
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>>>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>>>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>>>>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>>>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>>>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>>>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>>>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>>>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Null Pointer Exception on Trying to read a message from Kafka

Posted by Ted Yu <yu...@gmail.com>.
The NPE came from this line:

        StreamRecord<T> copy =
castRecord.copy(serializer.copy(castRecord.getValue()));

Either serializer or castRecord was null.

I wonder if this has been fixed in 1.3.2 release.

On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa <fl...@gmail.com>
wrote:

> Kafka Version is 0.10.0
>
> On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <fl...@gmail.com>
> wrote:
>
>> 1.3.0
>>
>> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> Which Flink version are you using (so that line numbers can be matched
>>> with source code) ?
>>>
>>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <flinkenthu@gmail.com
>>> > wrote:
>>>
>>>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>>>>                 getStreamSource(env, parameterTool);
>>>>         );
>>>>
>>>>
>>>>
>>>>         public RichParallelSourceFunction<MyKafkaMessage>
>>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>>>> parameterTool) {
>>>>
>>>>            // MyKAfkaMessage is a ProtoBuf message
>>>>             env.getConfig().registerTypeWi
>>>> thKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);
>>>>
>>>>             KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>>>>                     new KafkaDataSource<MyKafkaMessage>(parameterTool,
>>>> new MyKafkaMessageSerDeSchema());
>>>>
>>>>             return flinkCepConsumer;
>>>>         }
>>>>
>>>>
>>>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>>>
>>>>     public KafkaDataSource(ParameterTool parameterTool,
>>>> DeserializationSchema<T> deserializer) {
>>>>         super(
>>>>                 Arrays.asList(parameterTool.ge
>>>> tRequired("topic").split(",")),
>>>>                 deserializer,
>>>>                 parameterTool.getProperties()
>>>>         );
>>>>
>>>>     }
>>>>
>>>> }
>>>>
>>>> public class MyKafkaMessageSerDeSchema implements
>>>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>>>> {
>>>>
>>>>     @Override
>>>>     public MyKafkaMessage deserialize(byte[] message) throws
>>>> IOException {
>>>>         MyKafkaMessage MyKafkaMessage = null;
>>>>         try {
>>>>             MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>>>>         } catch (InvalidProtocolBufferException e) {
>>>>             e.printStackTrace();
>>>>         } finally {
>>>>             return MyKafkaMessage;
>>>>         }
>>>>     }
>>>>
>>>>     @Override
>>>>     public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>>>         return false;
>>>>     }
>>>>
>>>>     @Override
>>>>     public TypeInformation<MyKafkaMessage> getProducedType() {
>>>>         return null;
>>>>     }
>>>>
>>>>     @Override
>>>>     public byte[] serialize(MyKafkaMessage element) {
>>>>         return new byte[0];
>>>>     }
>>>> }
>>>>
>>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>>> Which version of Flink / Kafka are you using ?
>>>>>
>>>>> Can you show the snippet of code where you create the DataStream ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <
>>>>> flinkenthu@gmail.com> wrote:
>>>>>
>>>>>> Folks,
>>>>>>
>>>>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>>>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>>>>> the following exception :
>>>>>>
>>>>>> Any idea on how can this happen?
>>>>>>
>>>>>> java.lang.NullPointerException
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>>>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Null Pointer Exception on Trying to read a message from Kafka

Posted by Sridhar Chellappa <fl...@gmail.com>.
Kafka Version is 0.10.0

On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:

> 1.3.0
>
> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Which Flink version are you using (so that line numbers can be matched
>> with source code) ?
>>
>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <fl...@gmail.com>
>> wrote:
>>
>>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>>>                 getStreamSource(env, parameterTool);
>>>         );
>>>
>>>
>>>
>>>         public RichParallelSourceFunction<MyKafkaMessage>
>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>>> parameterTool) {
>>>
>>>            // MyKAfkaMessage is a ProtoBuf message
>>>             env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
>>> ProtobufSerializer.class);
>>>
>>>             KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>>>                     new KafkaDataSource<MyKafkaMessage>(parameterTool,
>>> new MyKafkaMessageSerDeSchema());
>>>
>>>             return flinkCepConsumer;
>>>         }
>>>
>>>
>>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>>
>>>     public KafkaDataSource(ParameterTool parameterTool,
>>> DeserializationSchema<T> deserializer) {
>>>         super(
>>>                 Arrays.asList(parameterTool.ge
>>> tRequired("topic").split(",")),
>>>                 deserializer,
>>>                 parameterTool.getProperties()
>>>         );
>>>
>>>     }
>>>
>>> }
>>>
>>> public class MyKafkaMessageSerDeSchema implements
>>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>>> {
>>>
>>>     @Override
>>>     public MyKafkaMessage deserialize(byte[] message) throws IOException
>>> {
>>>         MyKafkaMessage MyKafkaMessage = null;
>>>         try {
>>>             MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>>>         } catch (InvalidProtocolBufferException e) {
>>>             e.printStackTrace();
>>>         } finally {
>>>             return MyKafkaMessage;
>>>         }
>>>     }
>>>
>>>     @Override
>>>     public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>>         return false;
>>>     }
>>>
>>>     @Override
>>>     public TypeInformation<MyKafkaMessage> getProducedType() {
>>>         return null;
>>>     }
>>>
>>>     @Override
>>>     public byte[] serialize(MyKafkaMessage element) {
>>>         return new byte[0];
>>>     }
>>> }
>>>
>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Which version of Flink / Kafka are you using ?
>>>>
>>>> Can you show the snippet of code where you create the DataStream ?
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <
>>>> flinkenthu@gmail.com> wrote:
>>>>
>>>>> Folks,
>>>>>
>>>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>>>> the following exception :
>>>>>
>>>>> Any idea on how can this happen?
>>>>>
>>>>> java.lang.NullPointerException
>>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Null Pointer Exception on Trying to read a message from Kafka

Posted by Sridhar Chellappa <fl...@gmail.com>.
1.3.0

On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yu...@gmail.com> wrote:

> Which Flink version are you using (so that line numbers can be matched
> with source code) ?
>
> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <fl...@gmail.com>
> wrote:
>
>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>>                 getStreamSource(env, parameterTool);
>>         );
>>
>>
>>
>>         public RichParallelSourceFunction<MyKafkaMessage>
>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>> parameterTool) {
>>
>>            // MyKAfkaMessage is a ProtoBuf message
>>             env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
>> ProtobufSerializer.class);
>>
>>             KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>>                     new KafkaDataSource<MyKafkaMessage>(parameterTool,
>> new MyKafkaMessageSerDeSchema());
>>
>>             return flinkCepConsumer;
>>         }
>>
>>
>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>
>>     public KafkaDataSource(ParameterTool parameterTool,
>> DeserializationSchema<T> deserializer) {
>>         super(
>>                 Arrays.asList(parameterTool.ge
>> tRequired("topic").split(",")),
>>                 deserializer,
>>                 parameterTool.getProperties()
>>         );
>>
>>     }
>>
>> }
>>
>> public class MyKafkaMessageSerDeSchema implements
>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>> {
>>
>>     @Override
>>     public MyKafkaMessage deserialize(byte[] message) throws IOException {
>>         MyKafkaMessage MyKafkaMessage = null;
>>         try {
>>             MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>>         } catch (InvalidProtocolBufferException e) {
>>             e.printStackTrace();
>>         } finally {
>>             return MyKafkaMessage;
>>         }
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>         return false;
>>     }
>>
>>     @Override
>>     public TypeInformation<MyKafkaMessage> getProducedType() {
>>         return null;
>>     }
>>
>>     @Override
>>     public byte[] serialize(MyKafkaMessage element) {
>>         return new byte[0];
>>     }
>> }
>>
>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> Which version of Flink / Kafka are you using ?
>>>
>>> Can you show the snippet of code where you create the DataStream ?
>>>
>>> Cheers
>>>
>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <flinkenthu@gmail.com
>>> > wrote:
>>>
>>>> Folks,
>>>>
>>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>>> the following exception :
>>>>
>>>> Any idea on how can this happen?
>>>>
>>>> java.lang.NullPointerException
>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>
>>
>

Re: Null Pointer Exception on Trying to read a message from Kafka

Posted by Ted Yu <yu...@gmail.com>.
Which Flink version are you using (so that line numbers can be matched with
source code) ?

On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:

> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>                 getStreamSource(env, parameterTool);
>         );
>
>
>
>         public RichParallelSourceFunction<MyKafkaMessage> getStreamSource(StreamExecutionEnvironment
> env, ParameterTool parameterTool) {
>
>            // MyKAfkaMessage is a ProtoBuf message
>             env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
> ProtobufSerializer.class);
>
>             KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>                     new KafkaDataSource<MyKafkaMessage>(parameterTool,
> new MyKafkaMessageSerDeSchema());
>
>             return flinkCepConsumer;
>         }
>
>
> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>
>     public KafkaDataSource(ParameterTool parameterTool,
> DeserializationSchema<T> deserializer) {
>         super(
>                 Arrays.asList(parameterTool.getRequired("topic").split(","
> )),
>                 deserializer,
>                 parameterTool.getProperties()
>         );
>
>     }
>
> }
>
> public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>,
> SerializationSchema<MyKafkaMessage> {
>
>     @Override
>     public MyKafkaMessage deserialize(byte[] message) throws IOException {
>         MyKafkaMessage MyKafkaMessage = null;
>         try {
>             MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>         } catch (InvalidProtocolBufferException e) {
>             e.printStackTrace();
>         } finally {
>             return MyKafkaMessage;
>         }
>     }
>
>     @Override
>     public boolean isEndOfStream(MyKafkaMessage nextElement) {
>         return false;
>     }
>
>     @Override
>     public TypeInformation<MyKafkaMessage> getProducedType() {
>         return null;
>     }
>
>     @Override
>     public byte[] serialize(MyKafkaMessage element) {
>         return new byte[0];
>     }
> }
>
> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Which version of Flink / Kafka are you using ?
>>
>> Can you show the snippet of code where you create the DataStream ?
>>
>> Cheers
>>
>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <fl...@gmail.com>
>> wrote:
>>
>>> Folks,
>>>
>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>> the following exception :
>>>
>>> Any idea on how can this happen?
>>>
>>> java.lang.NullPointerException
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> 	at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>
>

Re: Null Pointer Exception on Trying to read a message from Kafka

Posted by Sridhar Chellappa <fl...@gmail.com>.
DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
                getStreamSource(env, parameterTool);
        );



        public RichParallelSourceFunction<MyKafkaMessage>
getStreamSource(StreamExecutionEnvironment env, ParameterTool
parameterTool) {

           // MyKAfkaMessage is a ProtoBuf message

env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
ProtobufSerializer.class);

            KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
                    new KafkaDataSource<MyKafkaMessage>(parameterTool, new
MyKafkaMessageSerDeSchema());

            return flinkCepConsumer;
        }


public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {

    public KafkaDataSource(ParameterTool parameterTool,
DeserializationSchema<T> deserializer) {
        super(

Arrays.asList(parameterTool.getRequired("topic").split(",")),
                deserializer,
                parameterTool.getProperties()
        );

    }

}

public class MyKafkaMessageSerDeSchema implements
DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> {

    @Override
    public MyKafkaMessage deserialize(byte[] message) throws IOException {
        MyKafkaMessage MyKafkaMessage = null;
        try {
            MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return MyKafkaMessage;
        }
    }

    @Override
    public boolean isEndOfStream(MyKafkaMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<MyKafkaMessage> getProducedType() {
        return null;
    }

    @Override
    public byte[] serialize(MyKafkaMessage element) {
        return new byte[0];
    }
}

On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yu...@gmail.com> wrote:

> Which version of Flink / Kafka are you using ?
>
> Can you show the snippet of code where you create the DataStream ?
>
> Cheers
>
> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <fl...@gmail.com>
> wrote:
>
>> Folks,
>>
>> I have a KafkaConsumer that I am trying to read messages from. When I try
>> to create a DataStream from the KafkConsumer (env.addSource()) I get the
>> following exception :
>>
>> Any idea on how can this happen?
>>
>> java.lang.NullPointerException
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> 	at java.lang.Thread.run(Thread.java:748)
>>
>>
>

Re: Null Pointer Exception on Trying to read a message from Kafka

Posted by Ted Yu <yu...@gmail.com>.
Which version of Flink / Kafka are you using ?

Can you show the snippet of code where you create the DataStream ?

Cheers

On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:

> Folks,
>
> I have a KafkaConsumer that I am trying to read messages from. When I try
> to create a DataStream from the KafkConsumer (env.addSource()) I get the
> following exception :
>
> Any idea on how can this happen?
>
> java.lang.NullPointerException
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:748)
>
>