You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Colletta, Edward" <Ed...@FMR.COM> on 2020/10/08 10:12:54 UTC

state access causing segmentation fault

Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2 instances.

I have a KeyedProcessFunction that is causing a segmentation fault, crashing the flink task manager.  The seems to be caused by using 3 State variables in the operator.  The crash happens consistently after some load is processed.
This is the second time I have encountered this.   The first time I had 3 ValueState variables, this time I had 2 ValueState variables and a MapState variable.  Both times the error was alleviated by removing one of the state variables.
This time I replaced the 2 valueState variables with a Tuple2 of the types of the individual variables.   I can try to put together a minimal example, but I was wondering if anyone has encountered this problem.

Are there any documented limits of the number of state variables 1 operator can use?

For background the reason I use multiple state variables is the operator is processing 2 types of inputs, Left and Right.  When Left is received it is put it into a PriorityQueue. When the Right type is received I put that into a ring buffer.
I replaced the PriorityQueue with a queue of Ids and MapState to hold the elements.  So I have Left stored in a queue ValueState variable and MapState variable, and Right is stored in the ring buffer ValueState variable.



RE: state access causing segmentation fault

Posted by "Colletta, Edward" <Ed...@FMR.COM>.
Thanks Arvid,

I added static to ExecQueue and this did fix the problem.  I tested without static on RingBufferExec because it seems that if ExecQueue is static nested, there should be no reference to the MyKeyedProcessFunction object as RingBufferExec is an inner class of ExecQueue.

However, I did that just for the test.  For my prod code, going forward,  I am following flink’s rules for POJO types, adding static to any inner class,  and checking for any POJO warnings in the logs.


From: Arvid Heise <ar...@ververica.com>
Sent: Sunday, October 11, 2020 3:46 PM
To: Colletta, Edward <Ed...@FMR.COM>
Cc: Dawid Wysakowicz <dw...@apache.org>; user@flink.apache.org
Subject: Re: state access causing segmentation fault

This email is from an external source - exercise caution regarding links and attachments.

Hi Edward,

could you try adding the static keyword to ExecQueue and RingBufferExec? As is they hold a reference to the MyKeyedProcessFunction, which has unforeseen consequences.

On Sun, Oct 11, 2020 at 5:38 AM Colletta, Edward <Ed...@fmr.com>> wrote:
Tried to attach tar file but it got blocked.   Resending with files attached individually.


Ok, have minimal reproducible example.   Attaching a tar file of the job that crashed.

The crash has nothing to do with the number of state variables.  But it does seem to be caused by using a type for the state variable that is a class nested in the KeyedProcessFunction.

Reduced to a single state variable.  The type of the state variable was a class (ExecQueue) defined in class implementing KeyedProcessFunction.  Moving the ExecQueue definition to its own file fixed the problem.



The attached example always crashes  the taskManager in 30 seconds to 5 minutes.



MyKeyedProcessFunction.java  and also cut and pasted here:



package crash;



import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;

import org.apache.flink.util.Collector;



public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Exec, Exec> {

    private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessFunction.class);

    public TypeInformation<ExecQueue> leftTypeInfo;

    public transient ValueState<ExecQueue> leftState;



    public int initQueueSize;

    public long emitFrequencyMs;



    public MyKeyedProcessFunction() {

        initQueueSize = 10;

        emitFrequencyMs = 1;

    }



    @Override

    public void open(Configuration conf) {

        leftTypeInfo = TypeInformation.of(new TypeHint<ExecQueue>(){});

        leftState = getRuntimeContext().getState(

                    new ValueStateDescriptor<>("left", leftTypeInfo, null));

    }



    @Override

    public void processElement(Exec leftIn, Context ctx, Collector<Exec> out) {

        try {

            ExecQueue eq = leftState.value();

            if (eq == null) {

                eq = new ExecQueue(10);

                ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs);

            }

            leftState.update(eq);

        }

        catch (Exception e) {

            LOG.error("Exception in processElement1. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " );

            for (java.lang.StackTraceElement s:e.getStackTrace())

                LOG.error(s.toString());



        }

    }





    @Override

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Exec> out) {

        try {

            ExecQueue eq = leftState.value();

            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs);

        }

        catch ( Exception e) {

            LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " );

            for (java.lang.StackTraceElement s:e.getStackTrace())

                LOG.error(s.toString());

        }

    }

    public class ExecQueue {

        public RingBufferExec queue;

        public ExecQueue (){}

        public ExecQueue (int initSize) {

            queue = new RingBufferExec(initSize);

        }



        public class RingBufferExec {

            public Integer size;

            public Integer count;

            public RingBufferExec(){ }

            public RingBufferExec(int sizeIn){

                size = sizeIn;

                count = 0;

            }

        }

    }

}


From: Dawid Wysakowicz <dw...@apache.org>>
Sent: Thursday, October 8, 2020 6:26 AM
To: Colletta, Edward <Ed...@FMR.COM>>; user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: state access causing segmentation fault


Hi,

It should be absolutely fine to use multiple state objects. I am not aware of any limits to that. A minimal, reproducible example would definitely be helpful. For those kind of exceptions, I'd look into the serializers you use. Other than that I cannot think of an obvious reason for that kind of exceptions.

Best,

Dawid
On 08/10/2020 12:12, Colletta, Edward wrote:
Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2 instances.

I have a KeyedProcessFunction that is causing a segmentation fault, crashing the flink task manager.  The seems to be caused by using 3 State variables in the operator.  The crash happens consistently after some load is processed.
This is the second time I have encountered this.   The first time I had 3 ValueState variables, this time I had 2 ValueState variables and a MapState variable.  Both times the error was alleviated by removing one of the state variables.
This time I replaced the 2 valueState variables with a Tuple2 of the types of the individual variables.   I can try to put together a minimal example, but I was wondering if anyone has encountered this problem.

Are there any documented limits of the number of state variables 1 operator can use?

For background the reason I use multiple state variables is the operator is processing 2 types of inputs, Left and Right.  When Left is received it is put it into a PriorityQueue. When the Right type is received I put that into a ring buffer.
I replaced the PriorityQueue with a queue of Ids and MapState to hold the elements.  So I have Left stored in a queue ValueState variable and MapState variable, and Right is stored in the ring buffer ValueState variable.




--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng

Re: state access causing segmentation fault

Posted by Arvid Heise <ar...@ververica.com>.
Hi Edward,

could you try adding the static keyword to ExecQueue and RingBufferExec? As
is they hold a reference to the MyKeyedProcessFunction, which has
unforeseen consequences.

On Sun, Oct 11, 2020 at 5:38 AM Colletta, Edward <Ed...@fmr.com>
wrote:

> Tried to attach tar file but it got blocked.   Resending with files
> attached individually.
>
>
>
> Ok, have minimal reproducible example.   Attaching a tar file of the job
> that crashed.
>
> The crash has nothing to do with the number of state variables.  But it
> does seem to be caused by using a type for the state variable that is a
> class nested in the KeyedProcessFunction.
>
> Reduced to a single state variable.  The type of the state variable was a
> class (ExecQueue) defined in class implementing KeyedProcessFunction.
> Moving the ExecQueue definition to its own file fixed the problem.
>
>
>
> The attached example always crashes  the taskManager in 30 seconds to 5
> minutes.
>
>
>
> MyKeyedProcessFunction.java  and also cut and pasted here:
>
>
>
> package crash;
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import org.apache.flink.api.common.state.ValueStateDescriptor;
>
> import org.apache.flink.api.common.typeinfo.TypeHint;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
>
> import org.apache.flink.api.common.state.ValueState;
>
> import org.apache.flink.configuration.Configuration;
>
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>
> import
> org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
>
> import
> org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
>
> import org.apache.flink.util.Collector;
>
>
>
> public class MyKeyedProcessFunction extends KeyedProcessFunction<String,
> Exec, Exec> {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(MyKeyedProcessFunction.class);
>
>     public TypeInformation<ExecQueue> leftTypeInfo;
>
>     public transient ValueState<ExecQueue> leftState;
>
>
>
>     public int initQueueSize;
>
>     public long emitFrequencyMs;
>
>
>
>     public MyKeyedProcessFunction() {
>
>         initQueueSize = 10;
>
>         emitFrequencyMs = 1;
>
>     }
>
>
>
>     @Override
>
>     public void open(Configuration conf) {
>
>         leftTypeInfo = TypeInformation.of(new TypeHint<ExecQueue>(){});
>
>         leftState = getRuntimeContext().getState(
>
>                     new ValueStateDescriptor<>("left", leftTypeInfo,
> null));
>
>     }
>
>
>
>     @Override
>
>     public void processElement(Exec leftIn, Context ctx, Collector<Exec>
> out) {
>
>         try {
>
>             ExecQueue eq = leftState.value();
>
>             if (eq == null) {
>
>                 eq = new ExecQueue(10);
>
>
> ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
> + emitFrequencyMs);
>
>             }
>
>             leftState.update(eq);
>
>         }
>
>         catch (Exception e) {
>
>             LOG.error("Exception in processElement1. Key: " +
> ctx.getCurrentKey() + ". " + e + ". trace = " );
>
>             for (java.lang.StackTraceElement s:e.getStackTrace())
>
>                 LOG.error(s.toString());
>
>
>
>         }
>
>     }
>
>
>
>
>
>     @Override
>
>     public void onTimer(long timestamp, OnTimerContext ctx,
> Collector<Exec> out) {
>
>         try {
>
>             ExecQueue eq = leftState.value();
>
>
> ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
> + emitFrequencyMs);
>
>         }
>
>         catch ( Exception e) {
>
>             LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey()
> + ". " + e + ". trace = " );
>
>             for (java.lang.StackTraceElement s:e.getStackTrace())
>
>                 LOG.error(s.toString());
>
>         }
>
>     }
>
>     public class ExecQueue {
>
>         public RingBufferExec queue;
>
>         public ExecQueue (){}
>
>         public ExecQueue (int initSize) {
>
>             queue = new RingBufferExec(initSize);
>
>         }
>
>
>
>         public class RingBufferExec {
>
>             public Integer size;
>
>             public Integer count;
>
>             public RingBufferExec(){ }
>
>             public RingBufferExec(int sizeIn){
>
>                 size = sizeIn;
>
>                 count = 0;
>
>             }
>
>         }
>
>     }
>
> }
>
>
>
>
>
> *From:* Dawid Wysakowicz <dw...@apache.org>
> *Sent:* Thursday, October 8, 2020 6:26 AM
> *To:* Colletta, Edward <Ed...@FMR.COM>; user@flink.apache.org
> *Subject:* Re: state access causing segmentation fault
>
>
>
> Hi,
>
> It should be absolutely fine to use multiple state objects. I am not aware
> of any limits to that. A minimal, reproducible example would definitely be
> helpful. For those kind of exceptions, I'd look into the serializers you
> use. Other than that I cannot think of an obvious reason for that kind of
> exceptions.
>
> Best,
>
> Dawid
>
> On 08/10/2020 12:12, Colletta, Edward wrote:
>
> Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2
> instances.
>
>
>
> I have a KeyedProcessFunction that is causing a segmentation fault,
> crashing the flink task manager.  The seems to be caused by using 3 State
> variables in the operator.  The crash happens consistently after some load
> is processed.
>
> This is the second time I have encountered this.   The first time I had 3
> ValueState variables, this time I had 2 ValueState variables and a MapState
> variable.  Both times the error was alleviated by removing one of the state
> variables.
>
> This time I replaced the 2 valueState variables with a Tuple2 of the types
> of the individual variables.   I can try to put together a minimal example,
> but I was wondering if anyone has encountered this problem.
>
>
>
> Are there any documented limits of the number of state variables 1
> operator can use?
>
>
>
> For background the reason I use multiple state variables is the operator
> is processing 2 types of inputs, Left and Right.  When Left is received it
> is put it into a PriorityQueue. When the Right type is received I put that
> into a ring buffer.
>
> I replaced the PriorityQueue with a queue of Ids and MapState to hold the
> elements.  So I have Left stored in a queue ValueState variable and
> MapState variable, and Right is stored in the ring buffer ValueState
> variable.
>
>
>
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

RE: state access causing segmentation fault

Posted by "Colletta, Edward" <Ed...@FMR.COM>.
Tried to attach tar file but it got blocked.   Resending with files attached individually.


Ok, have minimal reproducible example.   Attaching a tar file of the job that crashed.

The crash has nothing to do with the number of state variables.  But it does seem to be caused by using a type for the state variable that is a class nested in the KeyedProcessFunction.

Reduced to a single state variable.  The type of the state variable was a class (ExecQueue) defined in class implementing KeyedProcessFunction.  Moving the ExecQueue definition to its own file fixed the problem.



The attached example always crashes  the taskManager in 30 seconds to 5 minutes.



MyKeyedProcessFunction.java  and also cut and pasted here:



package crash;



import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;

import org.apache.flink.util.Collector;



public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Exec, Exec> {

    private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessFunction.class);

    public TypeInformation<ExecQueue> leftTypeInfo;

    public transient ValueState<ExecQueue> leftState;



    public int initQueueSize;

    public long emitFrequencyMs;



    public MyKeyedProcessFunction() {

        initQueueSize = 10;

        emitFrequencyMs = 1;

    }



    @Override

    public void open(Configuration conf) {

        leftTypeInfo = TypeInformation.of(new TypeHint<ExecQueue>(){});

        leftState = getRuntimeContext().getState(

                    new ValueStateDescriptor<>("left", leftTypeInfo, null));

    }



    @Override

    public void processElement(Exec leftIn, Context ctx, Collector<Exec> out) {

        try {

            ExecQueue eq = leftState.value();

            if (eq == null) {

                eq = new ExecQueue(10);

                ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs);

            }

            leftState.update(eq);

        }

        catch (Exception e) {

            LOG.error("Exception in processElement1. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " );

            for (java.lang.StackTraceElement s:e.getStackTrace())

                LOG.error(s.toString());



        }

    }





    @Override

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Exec> out) {

        try {

            ExecQueue eq = leftState.value();

            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs);

        }

        catch ( Exception e) {

            LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " );

            for (java.lang.StackTraceElement s:e.getStackTrace())

                LOG.error(s.toString());

        }

    }

    public class ExecQueue {

        public RingBufferExec queue;

        public ExecQueue (){}

        public ExecQueue (int initSize) {

            queue = new RingBufferExec(initSize);

        }



        public class RingBufferExec {

            public Integer size;

            public Integer count;

            public RingBufferExec(){ }

            public RingBufferExec(int sizeIn){

                size = sizeIn;

                count = 0;

            }

        }

    }

}


From: Dawid Wysakowicz <dw...@apache.org>>
Sent: Thursday, October 8, 2020 6:26 AM
To: Colletta, Edward <Ed...@FMR.COM>>; user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: state access causing segmentation fault


Hi,

It should be absolutely fine to use multiple state objects. I am not aware of any limits to that. A minimal, reproducible example would definitely be helpful. For those kind of exceptions, I'd look into the serializers you use. Other than that I cannot think of an obvious reason for that kind of exceptions.

Best,

Dawid
On 08/10/2020 12:12, Colletta, Edward wrote:
Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2 instances.

I have a KeyedProcessFunction that is causing a segmentation fault, crashing the flink task manager.  The seems to be caused by using 3 State variables in the operator.  The crash happens consistently after some load is processed.
This is the second time I have encountered this.   The first time I had 3 ValueState variables, this time I had 2 ValueState variables and a MapState variable.  Both times the error was alleviated by removing one of the state variables.
This time I replaced the 2 valueState variables with a Tuple2 of the types of the individual variables.   I can try to put together a minimal example, but I was wondering if anyone has encountered this problem.

Are there any documented limits of the number of state variables 1 operator can use?

For background the reason I use multiple state variables is the operator is processing 2 types of inputs, Left and Right.  When Left is received it is put it into a PriorityQueue. When the Right type is received I put that into a ring buffer.
I replaced the PriorityQueue with a queue of Ids and MapState to hold the elements.  So I have Left stored in a queue ValueState variable and MapState variable, and Right is stored in the ring buffer ValueState variable.



RE: state access causing segmentation fault

Posted by "Colletta, Edward" <Ed...@FMR.COM>.
Ok, have minimal reproducible example.   Attaching a tar file of the job that crashed.

The crash has nothing to do with the number of state variables.  But it does seem to be caused by using a type for the state variable that is a class nested in the KeyedProcessFunction.

Reduced to a single state variable.  The type of the state variable was a class (ExecQueue) defined in class implementing KeyedProcessFunction.  Moving the ExecQueue definition to its own file fixed the problem.



The attached example always crashes  the taskManager in 30 seconds to 5 minutes.



MyKeyedProcessFunction in tar file and also cut and pasted here:



package crash;



import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;

import org.apache.flink.util.Collector;



public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Exec, Exec> {

    private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessFunction.class);

    public TypeInformation<ExecQueue> leftTypeInfo;

    public transient ValueState<ExecQueue> leftState;



    public int initQueueSize;

    public long emitFrequencyMs;



    public MyKeyedProcessFunction() {

        initQueueSize = 10;

        emitFrequencyMs = 1;

    }



    @Override

    public void open(Configuration conf) {

        leftTypeInfo = TypeInformation.of(new TypeHint<ExecQueue>(){});

        leftState = getRuntimeContext().getState(

                    new ValueStateDescriptor<>("left", leftTypeInfo, null));

    }



    @Override

    public void processElement(Exec leftIn, Context ctx, Collector<Exec> out) {

        try {

            ExecQueue eq = leftState.value();

            if (eq == null) {

                eq = new ExecQueue(10);

                ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs);

            }

            leftState.update(eq);

        }

        catch (Exception e) {

            LOG.error("Exception in processElement1. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " );

            for (java.lang.StackTraceElement s:e.getStackTrace())

                LOG.error(s.toString());



        }

    }





    @Override

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Exec> out) {

        try {

            ExecQueue eq = leftState.value();

            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs);

        }

        catch ( Exception e) {

            LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " );

            for (java.lang.StackTraceElement s:e.getStackTrace())

                LOG.error(s.toString());

        }

    }

    public class ExecQueue {

        public RingBufferExec queue;

        public ExecQueue (){}

        public ExecQueue (int initSize) {

            queue = new RingBufferExec(initSize);

        }



        public class RingBufferExec {

            public Integer size;

            public Integer count;

            public RingBufferExec(){ }

            public RingBufferExec(int sizeIn){

                size = sizeIn;

                count = 0;

            }

        }

    }

}


From: Dawid Wysakowicz <dw...@apache.org>
Sent: Thursday, October 8, 2020 6:26 AM
To: Colletta, Edward <Ed...@FMR.COM>; user@flink.apache.org
Subject: Re: state access causing segmentation fault


Hi,

It should be absolutely fine to use multiple state objects. I am not aware of any limits to that. A minimal, reproducible example would definitely be helpful. For those kind of exceptions, I'd look into the serializers you use. Other than that I cannot think of an obvious reason for that kind of exceptions.

Best,

Dawid
On 08/10/2020 12:12, Colletta, Edward wrote:
Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2 instances.

I have a KeyedProcessFunction that is causing a segmentation fault, crashing the flink task manager.  The seems to be caused by using 3 State variables in the operator.  The crash happens consistently after some load is processed.
This is the second time I have encountered this.   The first time I had 3 ValueState variables, this time I had 2 ValueState variables and a MapState variable.  Both times the error was alleviated by removing one of the state variables.
This time I replaced the 2 valueState variables with a Tuple2 of the types of the individual variables.   I can try to put together a minimal example, but I was wondering if anyone has encountered this problem.

Are there any documented limits of the number of state variables 1 operator can use?

For background the reason I use multiple state variables is the operator is processing 2 types of inputs, Left and Right.  When Left is received it is put it into a PriorityQueue. When the Right type is received I put that into a ring buffer.
I replaced the PriorityQueue with a queue of Ids and MapState to hold the elements.  So I have Left stored in a queue ValueState variable and MapState variable, and Right is stored in the ring buffer ValueState variable.



Re: state access causing segmentation fault

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

It should be absolutely fine to use multiple state objects. I am not
aware of any limits to that. A minimal, reproducible example would
definitely be helpful. For those kind of exceptions, I'd look into the
serializers you use. Other than that I cannot think of an obvious reason
for that kind of exceptions.

Best,

Dawid

On 08/10/2020 12:12, Colletta, Edward wrote:
>
> Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on
> EC2 instances.
>
>  
>
> I have a KeyedProcessFunction that is causing a segmentation fault,
> crashing the flink task manager.  The seems to be caused by using 3
> State variables in the operator.  The crash happens consistently after
> some load is processed.
>
> This is the second time I have encountered this.   The first time I
> had 3 ValueState variables, this time I had 2 ValueState variables and
> a MapState variable.  Both times the error was alleviated by removing
> one of the state variables.
>
> This time I replaced the 2 valueState variables with a Tuple2 of the
> types of the individual variables.   I can try to put together a
> minimal example, but I was wondering if anyone has encountered this
> problem.
>
>  
>
> Are there any documented limits of the number of state variables 1
> operator can use?
>
>  
>
> For background the reason I use multiple state variables is the
> operator is processing 2 types of inputs, Left and Right.  When Left
> is received it is put it into a PriorityQueue. When the Right type is
> received I put that into a ring buffer.
>
> I replaced the PriorityQueue with a queue of Ids and MapState to hold
> the elements.  So I have Left stored in a queue ValueState variable
> and MapState variable, and Right is stored in the ring buffer
> ValueState variable.
>
>  
>
>  
>