You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by aj <aj...@gmail.com> on 2020/02/24 16:47:00 UTC

Map Of DataStream getting NullPointer Exception

I am trying below piece of code to create multiple datastreams object and
store in map.

for (EventConfig eventConfig : eventTypesList) {
            LOGGER.info("creating a stream for ",
eventConfig.getEvent_name());
            String key = eventConfig.getEvent_name();
            final StreamingFileSink<GenericRecord> sink =
StreamingFileSink.forBulkFormat
                    (path,
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            DataStream<GenericRecord> stream =
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
                if
(genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
                    return true;
                }
                return false;
            });

            Tuple2<DataStream<GenericRecord>,
StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
            streamMap.put(key, tuple2);
        }

        DataStream<GenericRecord> searchStream =
streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
        searchStream.map(new
Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));


I am getting Nullpointer Exception when trying to get the stream from map
value at :


*DataStream<GenericRecord> searchStream =
streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*

As per my understanding, this is due to the map is local to main and not
broadcast to tasks.
If I want to do this how should I do, please help me to resolve this?



-- 
Thanks & Regards,
Anuj Jain



<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Map Of DataStream getting NullPointer Exception

Posted by Arvid Heise <ar...@ververica.com>.
Hi Anuj,

is that piece of code in your first mail in the same main? Then at this
point, nothing has been executed in Flink.

So we are looking at a normal Java programming error that you can easily
debug and unit test.
Most likely, there is no event config for *SEARCH_LIST_KEYLESS* or there is
a spelling error.

On Thu, Feb 27, 2020 at 9:23 AM aj <aj...@gmail.com> wrote:

> Hi Khachatryan,
>
> This is the use case to create multiple streams:
>
> I have a use case where multiple types of Avro records are coming in
> single Kafka topic as we are suing TopicRecordNameStrategy for the subject
> in the schema registry. Now I have written a consumer to read that topic
> and build a Datastream of GenericRecord. Now I can not sink this stream to
> hdfs/s3 in parquet format as this stream contains different types of schema
> records. So I am filtering different records for each type by applying a
> filter and creating different streams and then sinking each stream
> separately.
>
> So can you please help me create multiple dynamic streams with the code
> that I shared. How to resolve this issue?
>
> On Tue, Feb 25, 2020 at 10:46 PM Khachatryan Roman <
> khachatryan.roman@gmail.com> wrote:
>
>> As I understand from code, streamMap is a Java map, not Scala. So you can
>> get NPE while unreferencing the value you got from it.
>>
>> Also, the approach looks a bit strange.
>> Can you describe what are you trying to achieve?
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Feb 24, 2020 at 5:47 PM aj <aj...@gmail.com> wrote:
>>
>>>
>>> I am trying below piece of code to create multiple datastreams object
>>> and store in map.
>>>
>>> for (EventConfig eventConfig : eventTypesList) {
>>>             LOGGER.info("creating a stream for ",
>>> eventConfig.getEvent_name());
>>>             String key = eventConfig.getEvent_name();
>>>             final StreamingFileSink<GenericRecord> sink =
>>> StreamingFileSink.forBulkFormat
>>>                     (path,
>>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
>>>                     .withBucketAssigner(new EventTimeBucketAssigner())
>>>                     .build();
>>>
>>>             DataStream<GenericRecord> stream =
>>> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
>>>                 if
>>> (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
>>> {
>>>                     return true;
>>>                 }
>>>                 return false;
>>>             });
>>>
>>>             Tuple2<DataStream<GenericRecord>,
>>> StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
>>>             streamMap.put(key, tuple2);
>>>         }
>>>
>>>         DataStream<GenericRecord> searchStream =
>>> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
>>>         searchStream.map(new
>>> Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));
>>>
>>>
>>> I am getting Nullpointer Exception when trying to get the stream from
>>> map value at :
>>>
>>>
>>> *DataStream<GenericRecord> searchStream =
>>> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*
>>>
>>> As per my understanding, this is due to the map is local to main and not
>>> broadcast to tasks.
>>> If I want to do this how should I do, please help me to resolve this?
>>>
>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Re: Map Of DataStream getting NullPointer Exception

Posted by aj <aj...@gmail.com>.
Hi Khachatryan,

This is the use case to create multiple streams:

I have a use case where multiple types of Avro records are coming in single
Kafka topic as we are suing TopicRecordNameStrategy for the subject in the
schema registry. Now I have written a consumer to read that topic and build
a Datastream of GenericRecord. Now I can not sink this stream to hdfs/s3 in
parquet format as this stream contains different types of schema records.
So I am filtering different records for each type by applying a filter and
creating different streams and then sinking each stream separately.

So can you please help me create multiple dynamic streams with the code
that I shared. How to resolve this issue?

On Tue, Feb 25, 2020 at 10:46 PM Khachatryan Roman <
khachatryan.roman@gmail.com> wrote:

> As I understand from code, streamMap is a Java map, not Scala. So you can
> get NPE while unreferencing the value you got from it.
>
> Also, the approach looks a bit strange.
> Can you describe what are you trying to achieve?
>
> Regards,
> Roman
>
>
> On Mon, Feb 24, 2020 at 5:47 PM aj <aj...@gmail.com> wrote:
>
>>
>> I am trying below piece of code to create multiple datastreams object and
>> store in map.
>>
>> for (EventConfig eventConfig : eventTypesList) {
>>             LOGGER.info("creating a stream for ",
>> eventConfig.getEvent_name());
>>             String key = eventConfig.getEvent_name();
>>             final StreamingFileSink<GenericRecord> sink =
>> StreamingFileSink.forBulkFormat
>>                     (path,
>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
>>                     .withBucketAssigner(new EventTimeBucketAssigner())
>>                     .build();
>>
>>             DataStream<GenericRecord> stream =
>> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
>>                 if
>> (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
>> {
>>                     return true;
>>                 }
>>                 return false;
>>             });
>>
>>             Tuple2<DataStream<GenericRecord>,
>> StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
>>             streamMap.put(key, tuple2);
>>         }
>>
>>         DataStream<GenericRecord> searchStream =
>> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
>>         searchStream.map(new
>> Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));
>>
>>
>> I am getting Nullpointer Exception when trying to get the stream from map
>> value at :
>>
>>
>> *DataStream<GenericRecord> searchStream =
>> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*
>>
>> As per my understanding, this is due to the map is local to main and not
>> broadcast to tasks.
>> If I want to do this how should I do, please help me to resolve this?
>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Map Of DataStream getting NullPointer Exception

Posted by Khachatryan Roman <kh...@gmail.com>.
As I understand from code, streamMap is a Java map, not Scala. So you can
get NPE while unreferencing the value you got from it.

Also, the approach looks a bit strange.
Can you describe what are you trying to achieve?

Regards,
Roman


On Mon, Feb 24, 2020 at 5:47 PM aj <aj...@gmail.com> wrote:

>
> I am trying below piece of code to create multiple datastreams object and
> store in map.
>
> for (EventConfig eventConfig : eventTypesList) {
>             LOGGER.info("creating a stream for ",
> eventConfig.getEvent_name());
>             String key = eventConfig.getEvent_name();
>             final StreamingFileSink<GenericRecord> sink =
> StreamingFileSink.forBulkFormat
>                     (path,
> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
>                     .withBucketAssigner(new EventTimeBucketAssigner())
>                     .build();
>
>             DataStream<GenericRecord> stream =
> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
>                 if
> (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
> {
>                     return true;
>                 }
>                 return false;
>             });
>
>             Tuple2<DataStream<GenericRecord>,
> StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
>             streamMap.put(key, tuple2);
>         }
>
>         DataStream<GenericRecord> searchStream =
> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
>         searchStream.map(new
> Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));
>
>
> I am getting Nullpointer Exception when trying to get the stream from map
> value at :
>
>
> *DataStream<GenericRecord> searchStream =
> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*
>
> As per my understanding, this is due to the map is local to main and not
> broadcast to tasks.
> If I want to do this how should I do, please help me to resolve this?
>
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>