You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vinay Patil <vi...@gmail.com> on 2019/07/20 06:12:50 UTC
StackOverflow Error
Hi,
I am trying to run a pipeline on Flink 1.8.1 ,getting the following
exception:
*java.lang.StackOverflowError at
java.lang.Exception.<init>(Exception.java:66) at
java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at
java.lang.Class.getDeclaredMethod(Class.java:2130) at
org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*
I have even tried running in legacy mode, the pipeline code is :
private void execute(String[] args) {
ParameterTool pt = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//env.setMaxParallelism(30);
env.setParallelism(20);
env.enableCheckpointing(5000);
StateBackend backend = new
FsStateBackend(pt.getRequired("checkpoint_path"), true);
env.setStateBackend(backend);
FlinkDynamoDBStreamsConsumer<ObjectNode>
flinkDynamoDBStreamsConsumer =
new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME,
new JsonNodeDeserializationSchema(),
dynamodbStreamsConsumerConfig);
SingleOutputStreamOperator<ObjectNode> sourceStream = env
.addSource(flinkDynamoDBStreamsConsumer)
.name("Dynamo DB Streams");
sourceStream
.keyBy(new CdcKeySelector())
.addSink(new
FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
new JsonSerializationSchema()))
.name("Kafka Sink");
try {
env.execute();
} catch (Exception e) {
System.out.println("Caught exception for pipeline" +
e.getMessage());
e.printStackTrace();
}
}
Regards,
Vinay Patil
Re: StackOverflow Error
Posted by Vinay Patil <vi...@gmail.com>.
Hi Ravi,
The uber jar was correct, adding ClosureCleanerLevel to TOP_LEVEL resolved
this issue. Thanks a lot.
Is there any disadvantage of explicitly setting this ?
Regards,
Vinay Patil
On Sat, Jul 20, 2019 at 10:23 PM Ravi Bhushan Ratnakar <
ravibhushanratnakar@gmail.com> wrote:
> Hi Vinay,
>
> ObjectNode seems ok as this is being used by flink provided
> "JsonNodeDeserailizationSchema".
>
> Please verify that you are using maven dependency
> "flink-connector-kinesis" 1.8.1 version (with your Flink 1.8.1 cluster) and
> package this dependency as part of your application uber/fat jar. If you
> are already doing this way then, please also try to set closure cleaner
> level to "TOP_LEVEL" like below.
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig.setClosureCleanerLevel(ExecutionConfig.ClosureCleanerLevel.TOP_LEVEL)
>
>
> Regards,
> Ravi
>
> On Sat, Jul 20, 2019 at 1:53 PM Vinay Patil <vi...@gmail.com>
> wrote:
>
>> Hi Ravi,
>>
>> Tried with both new and legacy mode, it works locally but on cluster I am
>> getting this exception, I am passing jackson ObjectNode class, should be
>> serializable. What do you think?
>>
>> On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, <
>> ravibhushanratnakar@gmail.com> wrote:
>>
>>> Hi Vinay,
>>>
>>> Please make sure that all your custom code is serializable. You can run
>>> this using new mode.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On Sat 20 Jul, 2019, 08:13 Vinay Patil, <vi...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to run a pipeline on Flink 1.8.1 ,getting the following
>>>> exception:
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *java.lang.StackOverflowError at
>>>> java.lang.Exception.<init>(Exception.java:66) at
>>>> java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
>>>> at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at
>>>> java.lang.Class.getDeclaredMethod(Class.java:2130) at
>>>> org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*
>>>>
>>>> I have even tried running in legacy mode, the pipeline code is :
>>>>
>>>> private void execute(String[] args) {
>>>> ParameterTool pt = ParameterTool.fromArgs(args);
>>>>
>>>> StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> //env.setMaxParallelism(30);
>>>> env.setParallelism(20);
>>>>
>>>> env.enableCheckpointing(5000);
>>>> StateBackend backend = new
>>>> FsStateBackend(pt.getRequired("checkpoint_path"), true);
>>>> env.setStateBackend(backend);
>>>>
>>>> FlinkDynamoDBStreamsConsumer<ObjectNode>
>>>> flinkDynamoDBStreamsConsumer =
>>>> new
>>>> FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME, new
>>>> JsonNodeDeserializationSchema(),
>>>> dynamodbStreamsConsumerConfig);
>>>>
>>>> SingleOutputStreamOperator<ObjectNode> sourceStream = env
>>>> .addSource(flinkDynamoDBStreamsConsumer)
>>>> .name("Dynamo DB Streams");
>>>>
>>>> sourceStream
>>>> .keyBy(new CdcKeySelector())
>>>> .addSink(new
>>>> FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
>>>> new JsonSerializationSchema()))
>>>> .name("Kafka Sink");
>>>>
>>>> try {
>>>> env.execute();
>>>> } catch (Exception e) {
>>>> System.out.println("Caught exception for pipeline" +
>>>> e.getMessage());
>>>> e.printStackTrace();
>>>> }
>>>> }
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>
Re: StackOverflow Error
Posted by Vinay Patil <vi...@gmail.com>.
Hi Ravi,
Tried with both new and legacy mode, it works locally but on cluster I am
getting this exception, I am passing jackson ObjectNode class, should be
serializable. What do you think?
On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, <
ravibhushanratnakar@gmail.com> wrote:
> Hi Vinay,
>
> Please make sure that all your custom code is serializable. You can run
> this using new mode.
>
> Thanks,
> Ravi
>
> On Sat 20 Jul, 2019, 08:13 Vinay Patil, <vi...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to run a pipeline on Flink 1.8.1 ,getting the following
>> exception:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *java.lang.StackOverflowError at
>> java.lang.Exception.<init>(Exception.java:66) at
>> java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
>> at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at
>> java.lang.Class.getDeclaredMethod(Class.java:2130) at
>> org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*
>>
>> I have even tried running in legacy mode, the pipeline code is :
>>
>> private void execute(String[] args) {
>> ParameterTool pt = ParameterTool.fromArgs(args);
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> //env.setMaxParallelism(30);
>> env.setParallelism(20);
>>
>> env.enableCheckpointing(5000);
>> StateBackend backend = new
>> FsStateBackend(pt.getRequired("checkpoint_path"), true);
>> env.setStateBackend(backend);
>>
>> FlinkDynamoDBStreamsConsumer<ObjectNode>
>> flinkDynamoDBStreamsConsumer =
>> new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME,
>> new JsonNodeDeserializationSchema(),
>> dynamodbStreamsConsumerConfig);
>>
>> SingleOutputStreamOperator<ObjectNode> sourceStream = env
>> .addSource(flinkDynamoDBStreamsConsumer)
>> .name("Dynamo DB Streams");
>>
>> sourceStream
>> .keyBy(new CdcKeySelector())
>> .addSink(new
>> FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
>> new JsonSerializationSchema()))
>> .name("Kafka Sink");
>>
>> try {
>> env.execute();
>> } catch (Exception e) {
>> System.out.println("Caught exception for pipeline" +
>> e.getMessage());
>> e.printStackTrace();
>> }
>> }
>>
>> Regards,
>> Vinay Patil
>>
>