You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vinay Patil <vi...@gmail.com> on 2019/07/20 06:12:50 UTC

StackOverflow Error

Hi,

I am trying to run a pipeline on Flink 1.8.1 ,getting the following
exception:


















*java.lang.StackOverflowError at
java.lang.Exception.<init>(Exception.java:66) at
java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at
java.lang.Class.getDeclaredMethod(Class.java:2130) at
org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*

I have even tried running in legacy mode, the pipeline code is :

private void execute(String[] args) {
        ParameterTool pt = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setMaxParallelism(30);
        env.setParallelism(20);

        env.enableCheckpointing(5000);
        StateBackend backend = new
FsStateBackend(pt.getRequired("checkpoint_path"), true);
        env.setStateBackend(backend);

        FlinkDynamoDBStreamsConsumer<ObjectNode>
flinkDynamoDBStreamsConsumer =
                new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME,
new JsonNodeDeserializationSchema(),
                        dynamodbStreamsConsumerConfig);

        SingleOutputStreamOperator<ObjectNode> sourceStream = env
                .addSource(flinkDynamoDBStreamsConsumer)
                .name("Dynamo DB Streams");

        sourceStream
                .keyBy(new CdcKeySelector())
                .addSink(new
FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
                        new JsonSerializationSchema()))
                .name("Kafka Sink");

        try {
            env.execute();
        } catch (Exception e) {
            System.out.println("Caught exception for pipeline" +
e.getMessage());
            e.printStackTrace();
        }
    }

Regards,
Vinay Patil

Re: StackOverflow Error

Posted by Vinay Patil <vi...@gmail.com>.
Hi Ravi,

The uber jar was correct, adding ClosureCleanerLevel to TOP_LEVEL resolved
this issue. Thanks a lot.

Is there any disadvantage of explicitly setting this ?


Regards,
Vinay Patil


On Sat, Jul 20, 2019 at 10:23 PM Ravi Bhushan Ratnakar <
ravibhushanratnakar@gmail.com> wrote:

> Hi Vinay,
>
> ObjectNode seems ok as this is being used by flink provided
> "JsonNodeDeserailizationSchema".
>
> Please verify that you are using maven dependency
> "flink-connector-kinesis" 1.8.1 version (with your Flink 1.8.1 cluster) and
> package this dependency as part of your application uber/fat jar. If you
> are already doing this way then, please also try to set closure cleaner
> level to "TOP_LEVEL" like below.
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig.setClosureCleanerLevel(ExecutionConfig.ClosureCleanerLevel.TOP_LEVEL)
>
>
> Regards,
> Ravi
>
> On Sat, Jul 20, 2019 at 1:53 PM Vinay Patil <vi...@gmail.com>
> wrote:
>
>> Hi Ravi,
>>
>> Tried with both new and legacy mode, it works locally but on cluster I am
>> getting this exception, I am passing jackson ObjectNode class, should be
>> serializable. What do you think?
>>
>> On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, <
>> ravibhushanratnakar@gmail.com> wrote:
>>
>>> Hi Vinay,
>>>
>>> Please make sure that all your custom code is serializable. You can run
>>> this using new mode.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On Sat 20 Jul, 2019, 08:13 Vinay Patil, <vi...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to run a pipeline on Flink 1.8.1 ,getting the following
>>>> exception:
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *java.lang.StackOverflowError at
>>>> java.lang.Exception.<init>(Exception.java:66) at
>>>> java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
>>>> at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at
>>>> java.lang.Class.getDeclaredMethod(Class.java:2130) at
>>>> org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*
>>>>
>>>> I have even tried running in legacy mode, the pipeline code is :
>>>>
>>>> private void execute(String[] args) {
>>>>         ParameterTool pt = ParameterTool.fromArgs(args);
>>>>
>>>>         StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>         //env.setMaxParallelism(30);
>>>>         env.setParallelism(20);
>>>>
>>>>         env.enableCheckpointing(5000);
>>>>         StateBackend backend = new
>>>> FsStateBackend(pt.getRequired("checkpoint_path"), true);
>>>>         env.setStateBackend(backend);
>>>>
>>>>         FlinkDynamoDBStreamsConsumer<ObjectNode>
>>>> flinkDynamoDBStreamsConsumer =
>>>>                 new
>>>> FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME, new
>>>> JsonNodeDeserializationSchema(),
>>>>                         dynamodbStreamsConsumerConfig);
>>>>
>>>>         SingleOutputStreamOperator<ObjectNode> sourceStream = env
>>>>                 .addSource(flinkDynamoDBStreamsConsumer)
>>>>                 .name("Dynamo DB Streams");
>>>>
>>>>         sourceStream
>>>>                 .keyBy(new CdcKeySelector())
>>>>                 .addSink(new
>>>> FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
>>>>                         new JsonSerializationSchema()))
>>>>                 .name("Kafka Sink");
>>>>
>>>>         try {
>>>>             env.execute();
>>>>         } catch (Exception e) {
>>>>             System.out.println("Caught exception for pipeline" +
>>>> e.getMessage());
>>>>             e.printStackTrace();
>>>>         }
>>>>     }
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>

Re: StackOverflow Error

Posted by Vinay Patil <vi...@gmail.com>.
Hi Ravi,

Tried with both new and legacy mode, it works locally but on cluster I am
getting this exception, I am passing jackson ObjectNode class, should be
serializable. What do you think?

On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, <
ravibhushanratnakar@gmail.com> wrote:

> Hi Vinay,
>
> Please make sure that all your custom code is serializable. You can run
> this using new mode.
>
> Thanks,
> Ravi
>
> On Sat 20 Jul, 2019, 08:13 Vinay Patil, <vi...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to run a pipeline on Flink 1.8.1 ,getting the following
>> exception:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *java.lang.StackOverflowError at
>> java.lang.Exception.<init>(Exception.java:66) at
>> java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
>> at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at
>> java.lang.Class.getDeclaredMethod(Class.java:2130) at
>> org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*
>>
>> I have even tried running in legacy mode, the pipeline code is :
>>
>> private void execute(String[] args) {
>>         ParameterTool pt = ParameterTool.fromArgs(args);
>>
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         //env.setMaxParallelism(30);
>>         env.setParallelism(20);
>>
>>         env.enableCheckpointing(5000);
>>         StateBackend backend = new
>> FsStateBackend(pt.getRequired("checkpoint_path"), true);
>>         env.setStateBackend(backend);
>>
>>         FlinkDynamoDBStreamsConsumer<ObjectNode>
>> flinkDynamoDBStreamsConsumer =
>>                 new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME,
>> new JsonNodeDeserializationSchema(),
>>                         dynamodbStreamsConsumerConfig);
>>
>>         SingleOutputStreamOperator<ObjectNode> sourceStream = env
>>                 .addSource(flinkDynamoDBStreamsConsumer)
>>                 .name("Dynamo DB Streams");
>>
>>         sourceStream
>>                 .keyBy(new CdcKeySelector())
>>                 .addSink(new
>> FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
>>                         new JsonSerializationSchema()))
>>                 .name("Kafka Sink");
>>
>>         try {
>>             env.execute();
>>         } catch (Exception e) {
>>             System.out.println("Caught exception for pipeline" +
>> e.getMessage());
>>             e.printStackTrace();
>>         }
>>     }
>>
>> Regards,
>> Vinay Patil
>>
>