You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ahmad Hassan <ah...@gmail.com> on 2018/10/09 15:09:45 UTC

getRuntimeContext(): The runtime context has not been initialized.

Hi,

We want to use MapState inside fold function to keep the map of all
products that we see in 24 hour window to store huge state in rocksdb
rather than overflowing heap. However, I don't seem to initialise mapstate
within foldfunction or any class that is extending RichMapFunction

private transient MapStateDescriptor<String, String> descr = new
MapStateDescriptor<>("mymap", String.class, String.class);
this.getRuntimeContext().getMapState(descr);

I get error

java.lang.IllegalStateException: The runtime context has not been
initialized.
at
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)


Any clues how to get the runtime context please?

Thanks.

Best regards

Re: getRuntimeContext(): The runtime context has not been initialized.

Posted by Ahmad Hassan <ah...@gmail.com>.
Hi,

Yes we can replace foldfunction with aggregatefunction, not an issue. But
the problem remains the same, how to use mapstate to store and update state
of each product instead of keeping whole HashMap of products on heap
memory. We are running flink 1.6.0.

Yes we can see up to 24millions products in 24 hr window. The composite key
<tenant, product> will cause millions of windows in 24 hr for 24million
products for just one tenant. That is why we chosse tenant as key and then
use map to store products metrics for incoming events.

Any known design how to deal this in flink please?

Thanks.
Best regards



On Thu, 11 Oct 2018 at 12:14, Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi Ahmad,
>
> Few comments from my side:
>
>     1. FoldFunction is deprecated because of many problems, e.g. no
> possibility to merge contents of windows. Therefore you should at least use
> the AggregateFunction.
>
>     2. I am not sure if you need to store this in RocksDB, do you expect
> 24millions product per each tenant in a single window?
>
>     3. I think what you could do is first compute stats for composite key
> <tenant, product> and then aggregate them in subsequent operation(if you
> need to). This way you could distribute the workload to more parallel
> instances.
>
> Best,
>
> Dawid
>
> On 11/10/18 11:33, Ahmad Hassan wrote:
>
> Hi All,
>
> Thanks for the replies. Here is the code snippet of what we want to
> achieve:
>
> We have sliding windows of 24hrs with 5 minutes apart.
>
> inStream
>  .filter(Objects::nonNull)
>  .keyBy("tenant")
>  .window(SlidingProcessingTimeWindows.of(Time.minutes(1440),
> Time.minutes(5)))
>  .fold(new DefaultVector(), new CalculationFold(), new
> MetricCalculationApply());
>
> public class CalculationFold implements FoldFunction<Event, DefaultVector>
> {
> private final MapState<String, DefaultProductMetricVector> products;
> private transient MapStateDescriptor<String, DefaultProductMetricVector>
> descr;
>
> @Override
> public DefaultVector fold(DefaultVector stats, Event event)
> {
> if (products.contains(event.getProductId))
> {
> DefaultProductMetricVector product = products.get(event.getProductId);
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> else
> {
> DefaultProductMetricVector product = new DefaultProductMetricVector();
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> return stats;
> }
>
> *        // Fold function do not allow the open method and
> this.getRuntimeContext*
> //public void open(Configuration parameters) throws Exception
> //{
> // descr = new MapStateDescriptor<>("product", String.class,
> DefaultProductMetricVector.class);
> // products = this.getRuntimeContext().getMapState(descr);
> //}
> }
>
>
> We expect millions of unique products in 24 hour window so that is the
> reason we want to store state on rocksdb of each product class
> DefaultProductMetricVector instance. Otherwise, my understanding is that is
> that if i instantiate a java hashmap of products within DefaultVector fold
> accumulator then for each incoming event the full set of products will be
> deserialised and stored on heap which will eventually cause heap overflow
> error.
>
> Please can you tell us how to solve this problem.
>
> Thanks.
>
> Best Regards,
>
>
> On Wed, 10 Oct 2018 at 10:21, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Yes, it would be good to post your code.
>> Are you using a FoldFunction in a window (if yes, what window) or as a
>> running aggregate?
>>
>> In general, collecting state in a FoldFunction is usually not something
>> that you should do. Did you consider using an AggregateFunction?
>>
>> Fabian
>>
>> Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler <
>> chesnay@apache.org>:
>>
>>> In which method are you calling getRuntimeContext()? This method can
>>> only be used after open() has been called.
>>>
>>> On 09.10.2018 17:09, Ahmad Hassan wrote:
>>>
>>> Hi,
>>>
>>> We want to use MapState inside fold function to keep the map of all
>>> products that we see in 24 hour window to store huge state in rocksdb
>>> rather than overflowing heap. However, I don't seem to initialise mapstate
>>> within foldfunction or any class that is extending RichMapFunction
>>>
>>> private transient MapStateDescriptor<String, String> descr = new
>>> MapStateDescriptor<>("mymap", String.class, String.class);
>>> this.getRuntimeContext().getMapState(descr);
>>>
>>> I get error
>>>
>>> java.lang.IllegalStateException: The runtime context has not been
>>> initialized.
>>> at
>>> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>>>
>>>
>>> Any clues how to get the runtime context please?
>>>
>>> Thanks.
>>>
>>> Best regards
>>>
>>>
>>>
>

Re: getRuntimeContext(): The runtime context has not been initialized.

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Ahmad,

Few comments from my side:

    1. FoldFunction is deprecated because of many problems, e.g. no
possibility to merge contents of windows. Therefore you should at least
use the AggregateFunction.

    2. I am not sure if you need to store this in RocksDB, do you expect
24millions product per each tenant in a single window?

    3. I think what you could do is first compute stats for composite
key <tenant, product> and then aggregate them in subsequent operation(if
you need to). This way you could distribute the workload to more
parallel instances.

Best,

Dawid


On 11/10/18 11:33, Ahmad Hassan wrote:
> Hi All,
>
> Thanks for the replies. Here is the code snippet of what we want to
> achieve:
>
> We have sliding windows of 24hrs with 5 minutes apart.
>
> inStream
>  .filter(Objects::nonNull)
>  .keyBy("tenant")
>  .window(SlidingProcessingTimeWindows.of(Time.minutes(1440),
> Time.minutes(5)))
>  .fold(new DefaultVector(), new CalculationFold(), new
> MetricCalculationApply());
>
> public class CalculationFold implements FoldFunction<Event, DefaultVector>
> {
> private final MapState<String, DefaultProductMetricVector> products;
> private transient MapStateDescriptor<String,
> DefaultProductMetricVector> descr;
>
> @Override
> public DefaultVector fold(DefaultVector stats, Event event)
> {
> if (products.contains(event.getProductId))
> {
> DefaultProductMetricVector product = products.get(event.getProductId);
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> else
> {
> DefaultProductMetricVector product = new DefaultProductMetricVector();
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> return stats;
> }
>
> *        // Fold function do not allow the open method and
> this.getRuntimeContext*
> //public void open(Configuration parameters) throws Exception
> //{
> // descr = new MapStateDescriptor<>("product", String.class,
> DefaultProductMetricVector.class);
> // products = this.getRuntimeContext().getMapState(descr);
> //}
> }
>
>
> We expect millions of unique products in 24 hour window so that is the
> reason we want to store state on rocksdb of each product class
> DefaultProductMetricVector instance. Otherwise, my understanding is
> that is that if i instantiate a java hashmap of products within
> DefaultVector fold accumulator then for each incoming event the full
> set of products will be deserialised and stored on heap which will
> eventually cause heap overflow error.
>
> Please can you tell us how to solve this problem.
>
> Thanks.
>
> Best Regards,
>
>
> On Wed, 10 Oct 2018 at 10:21, Fabian Hueske <fhueske@gmail.com
> <ma...@gmail.com>> wrote:
>
>     Yes, it would be good to post your code.
>     Are you using a FoldFunction in a window (if yes, what window) or
>     as a running aggregate?
>
>     In general, collecting state in a FoldFunction is usually not
>     something that you should do. Did you consider using an
>     AggregateFunction?
>
>     Fabian
>
>     Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler
>     <chesnay@apache.org <ma...@apache.org>>:
>
>         In which method are you calling getRuntimeContext()? This
>         method can only be used after open() has been called.
>
>         On 09.10.2018 17:09, Ahmad Hassan wrote:
>>         Hi,
>>
>>         We want to use MapState inside fold function to keep the map
>>         of all products that we see in 24 hour window to store huge
>>         state in rocksdb rather than overflowing heap. However, I
>>         don't seem to initialise mapstate within foldfunction or any
>>         class that is extending RichMapFunction
>>
>>         private transient MapStateDescriptor<String, String> descr =
>>         new MapStateDescriptor<>("mymap", String.class, String.class);
>>         this.getRuntimeContext().getMapState(descr);
>>
>>         I get error
>>
>>         java.lang.IllegalStateException: The runtime context has not
>>         been initialized.
>>         at
>>         org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>>
>>
>>         Any clues how to get the runtime context please?
>>
>>         Thanks.
>>
>>         Best regards
>
>


Re: getRuntimeContext(): The runtime context has not been initialized.

Posted by Ahmad Hassan <ah...@gmail.com>.
Any help/pointers on this please ?

Thanks.

On Thu, 11 Oct 2018 at 10:33, Ahmad Hassan <ah...@gmail.com> wrote:

> Hi All,
>
> Thanks for the replies. Here is the code snippet of what we want to
> achieve:
>
> We have sliding windows of 24hrs with 5 minutes apart.
>
> inStream
>  .filter(Objects::nonNull)
>  .keyBy("tenant")
>  .window(SlidingProcessingTimeWindows.of(Time.minutes(1440),
> Time.minutes(5)))
>  .fold(new DefaultVector(), new CalculationFold(), new
> MetricCalculationApply());
>
> public class CalculationFold implements FoldFunction<Event, DefaultVector>
> {
> private final MapState<String, DefaultProductMetricVector> products;
> private transient MapStateDescriptor<String, DefaultProductMetricVector>
> descr;
>
> @Override
> public DefaultVector fold(DefaultVector stats, Event event)
> {
> if (products.contains(event.getProductId))
> {
> DefaultProductMetricVector product = products.get(event.getProductId);
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> else
> {
> DefaultProductMetricVector product = new DefaultProductMetricVector();
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> return stats;
> }
>
> *        // Fold function do not allow the open method and
> this.getRuntimeContext*
> //public void open(Configuration parameters) throws Exception
> //{
> // descr = new MapStateDescriptor<>("product", String.class,
> DefaultProductMetricVector.class);
> // products = this.getRuntimeContext().getMapState(descr);
> //}
> }
>
>
> We expect millions of unique products in 24 hour window so that is the
> reason we want to store state on rocksdb of each product class
> DefaultProductMetricVector instance. Otherwise, my understanding is that is
> that if i instantiate a java hashmap of products within DefaultVector fold
> accumulator then for each incoming event the full set of products will be
> deserialised and stored on heap which will eventually cause heap overflow
> error.
>
> Please can you tell us how to solve this problem.
>
> Thanks.
>
> Best Regards,
>
>
> On Wed, 10 Oct 2018 at 10:21, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Yes, it would be good to post your code.
>> Are you using a FoldFunction in a window (if yes, what window) or as a
>> running aggregate?
>>
>> In general, collecting state in a FoldFunction is usually not something
>> that you should do. Did you consider using an AggregateFunction?
>>
>> Fabian
>>
>> Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler <
>> chesnay@apache.org>:
>>
>>> In which method are you calling getRuntimeContext()? This method can
>>> only be used after open() has been called.
>>>
>>> On 09.10.2018 17:09, Ahmad Hassan wrote:
>>>
>>> Hi,
>>>
>>> We want to use MapState inside fold function to keep the map of all
>>> products that we see in 24 hour window to store huge state in rocksdb
>>> rather than overflowing heap. However, I don't seem to initialise mapstate
>>> within foldfunction or any class that is extending RichMapFunction
>>>
>>> private transient MapStateDescriptor<String, String> descr = new
>>> MapStateDescriptor<>("mymap", String.class, String.class);
>>> this.getRuntimeContext().getMapState(descr);
>>>
>>> I get error
>>>
>>> java.lang.IllegalStateException: The runtime context has not been
>>> initialized.
>>> at
>>> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>>>
>>>
>>> Any clues how to get the runtime context please?
>>>
>>> Thanks.
>>>
>>> Best regards
>>>
>>>
>>>

Re: getRuntimeContext(): The runtime context has not been initialized.

Posted by Ahmad Hassan <ah...@gmail.com>.
Hi All,

Thanks for the replies. Here is the code snippet of what we want to achieve:

We have sliding windows of 24hrs with 5 minutes apart.

inStream
 .filter(Objects::nonNull)
 .keyBy("tenant")
 .window(SlidingProcessingTimeWindows.of(Time.minutes(1440),
Time.minutes(5)))
 .fold(new DefaultVector(), new CalculationFold(), new
MetricCalculationApply());

public class CalculationFold implements FoldFunction<Event, DefaultVector>
{
private final MapState<String, DefaultProductMetricVector> products;
private transient MapStateDescriptor<String, DefaultProductMetricVector>
descr;

@Override
public DefaultVector fold(DefaultVector stats, Event event)
{
if (products.contains(event.getProductId))
{
DefaultProductMetricVector product = products.get(event.getProductId);
product.updatePrice(event.getPrice);
products.put(event.getProductId, product);
}
else
{
DefaultProductMetricVector product = new DefaultProductMetricVector();
product.updatePrice(event.getPrice);
products.put(event.getProductId, product);
}
return stats;
}

*        // Fold function do not allow the open method and
this.getRuntimeContext*
//public void open(Configuration parameters) throws Exception
//{
// descr = new MapStateDescriptor<>("product", String.class,
DefaultProductMetricVector.class);
// products = this.getRuntimeContext().getMapState(descr);
//}
}


We expect millions of unique products in 24 hour window so that is the
reason we want to store state on rocksdb of each product class
DefaultProductMetricVector instance. Otherwise, my understanding is that is
that if i instantiate a java hashmap of products within DefaultVector fold
accumulator then for each incoming event the full set of products will be
deserialised and stored on heap which will eventually cause heap overflow
error.

Please can you tell us how to solve this problem.

Thanks.

Best Regards,


On Wed, 10 Oct 2018 at 10:21, Fabian Hueske <fh...@gmail.com> wrote:

> Yes, it would be good to post your code.
> Are you using a FoldFunction in a window (if yes, what window) or as a
> running aggregate?
>
> In general, collecting state in a FoldFunction is usually not something
> that you should do. Did you consider using an AggregateFunction?
>
> Fabian
>
> Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler <
> chesnay@apache.org>:
>
>> In which method are you calling getRuntimeContext()? This method can only
>> be used after open() has been called.
>>
>> On 09.10.2018 17:09, Ahmad Hassan wrote:
>>
>> Hi,
>>
>> We want to use MapState inside fold function to keep the map of all
>> products that we see in 24 hour window to store huge state in rocksdb
>> rather than overflowing heap. However, I don't seem to initialise mapstate
>> within foldfunction or any class that is extending RichMapFunction
>>
>> private transient MapStateDescriptor<String, String> descr = new
>> MapStateDescriptor<>("mymap", String.class, String.class);
>> this.getRuntimeContext().getMapState(descr);
>>
>> I get error
>>
>> java.lang.IllegalStateException: The runtime context has not been
>> initialized.
>> at
>> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>>
>>
>> Any clues how to get the runtime context please?
>>
>> Thanks.
>>
>> Best regards
>>
>>
>>

Re: getRuntimeContext(): The runtime context has not been initialized.

Posted by Fabian Hueske <fh...@gmail.com>.
Yes, it would be good to post your code.
Are you using a FoldFunction in a window (if yes, what window) or as a
running aggregate?

In general, collecting state in a FoldFunction is usually not something
that you should do. Did you consider using an AggregateFunction?

Fabian

Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler <
chesnay@apache.org>:

> In which method are you calling getRuntimeContext()? This method can only
> be used after open() has been called.
>
> On 09.10.2018 17:09, Ahmad Hassan wrote:
>
> Hi,
>
> We want to use MapState inside fold function to keep the map of all
> products that we see in 24 hour window to store huge state in rocksdb
> rather than overflowing heap. However, I don't seem to initialise mapstate
> within foldfunction or any class that is extending RichMapFunction
>
> private transient MapStateDescriptor<String, String> descr = new
> MapStateDescriptor<>("mymap", String.class, String.class);
> this.getRuntimeContext().getMapState(descr);
>
> I get error
>
> java.lang.IllegalStateException: The runtime context has not been
> initialized.
> at
> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>
>
> Any clues how to get the runtime context please?
>
> Thanks.
>
> Best regards
>
>
>

Re: getRuntimeContext(): The runtime context has not been initialized.

Posted by Chesnay Schepler <ch...@apache.org>.
In which method are you calling getRuntimeContext()? This method can 
only be used after open() has been called.

On 09.10.2018 17:09, Ahmad Hassan wrote:
> Hi,
>
> We want to use MapState inside fold function to keep the map of all 
> products that we see in 24 hour window to store huge state in rocksdb 
> rather than overflowing heap. However, I don't seem to initialise 
> mapstate within foldfunction or any class that is extending 
> RichMapFunction
>
> private transient MapStateDescriptor<String, String> descr = new 
> MapStateDescriptor<>("mymap", String.class, String.class);
> this.getRuntimeContext().getMapState(descr);
>
> I get error
>
> java.lang.IllegalStateException: The runtime context has not been 
> initialized.
> at 
> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>
>
> Any clues how to get the runtime context please?
>
> Thanks.
>
> Best regards