You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Oleksandr (Jira)" <ji...@apache.org> on 2022/05/29 11:17:00 UTC

[jira] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

    [ https://issues.apache.org/jira/browse/FLINK-27813 ]


    Oleksandr deleted comment on FLINK-27813:
    -----------------------------------

was (Author: JIRAUSER290114):
We have job manager running that is configured via module.yaml(egress and ingress for kafka topics).
Our stateful functions are on a separate remote service built on top of Spring boot with controller:
{code:java}
@RequestMapping("/v1/functions")
@RequiredArgsConstructor
public class FunctionRouteController {

    private final RequestReplyHandler handler;

    @PostMapping(consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE)
    public CompletableFuture<byte[]> onRequest(@RequestBody byte[] request) {
        return handler
                .handle(Slices.wrap(request))
                .thenApply(Slice::toByteArray);
    }
} {code}
where actually used 'org.apache.flink.statefun.sdk.java.handlerConcurrentRequestReplyHandler'  and we just define beans:
{code:java}
@Bean
public StatefulFunctions statefulFunctions(StatefulFunctions autoRegisteredStatefulFunctions) {
    // obtain a request-reply handler based on the spec above
    StatefulFunctions statefulFunctions = new StatefulFunctions()
            .withStatefulFunction(fallbackFn())
            .withStatefulFunction(transactionFn());
    autoRegisteredStatefulFunctions.functionSpecs().values().forEach(statefulFunctions::withStatefulFunction);
    return statefulFunctions;
}

@Bean
public RequestReplyHandler requestReplyHandler(StatefulFunctions statefulFunctions) {
    return statefulFunctions.requestReplyHandler();
} {code}
 

statefulFunctions.requestReplyHandler() is :
{code:java}
public RequestReplyHandler requestReplyHandler() {
    return new ConcurrentRequestReplyHandler(this.specs);
} 

{code}
 

flink-conf.yaml

 
{code:java}
{
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.numberOfTaskSlots: 2
execution.checkpointing.interval: 1min
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 60000
execution.checkpointing.timeout: 1200000
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
state.backend: rocksdb
state.backend.async: true
state.checkpoints.num-retained: 3
state.savepoints.dir: s3://savepoints-dev/savepoints
state.checkpoints.dir: s3://checkpoints-dev/checkpoints
s3.endpoint: https://minio.test
s3.path.style.access: true
s3.access-key: ***
s3.secret-key: ***
state.backend.incremental: true
jobmanager.memory.process.size: 1g
taskmanager.memory.process.size: 4g
}{code}
 

> java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-27813
>                 URL: https://issues.apache.org/jira/browse/FLINK-27813
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: statefun-3.2.0
>            Reporter: Oleksandr
>            Priority: Blocker
>         Attachments: screenshot-1.png
>
>
> Issue was met after migration from 
> flink-statefun:3.1.1-java11
> to
> flink-statefun:3.2.0-java11
>  
> {code:java}
> ts.execution-paymentManualValidationRequestEgress-egress, Sink: ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: Unable to parse Netty transport spec.\n\tat org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)\n\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)\n\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: Time interval unit label 'm' does not match any of the recognized units: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds) (through reference chain: org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplySpec[\"timeouts\"]->org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplySpec$Timeouts[\"call\"])\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:390)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:349)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1822)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:326)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:187)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4569)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2798)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:3261)\n\tat org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:54)\n\t... 30 more\nCaused by: java.lang.IllegalArgumentException: Time interval unit label 'm' does not match any of the recognized units: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds)\n\tat org.apache.flink.util.TimeUtils.parseDuration(TimeUtils.java:105)\n\tat org.apache.flink.statefun.flink.common.json.StateFunObjectMapper$DurationJsonDeserializer.deserialize(StateFunObjectMapper.java:49)\n\tat org.apache.flink.statefun.flink.common.json.StateFunObjectMapper$DurationJsonDeserializer.deserialize(StateFunObjectMapper.java:45)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324)\n\t... 42 more\n”}{code}
> {code:java}
> 25T09:36:12.963Z","level":"INFO","logger":"org.apache.flink.runtime.executiongraph.ExecutionGraph","class&lt;span class="code-quote">":"org.apache.flink.runtime.executiongraph.Execution","method":"transitionState","file":"Execution.java","line":1438,"thread":"flink-akka.actor.default-dispatcher-23","stack":"java.lang.IllegalStateException: Unable to parse Netty transport spec.\n\tat org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)\n\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)\n\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: Time interval unit label 'm' does not match any of the recognized units: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds) (through reference chain: org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplySpec[\"timeouts\"]->org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplySpec$Timeouts[\"call\"])\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:390)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:349)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1822)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:326)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:187)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4569)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2798)\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:3261)\n\tat org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:54)\n\t... 30 common frames omitted\nCaused by: java.lang.IllegalArgumentException: Time interval unit label 'm' does not match any of the recognized units: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds)\n\tat org.apache.flink.util.TimeUtils.parseDuration(TimeUtils.java:105)\n\tat org.apache.flink.statefun.flink.common.json.StateFunObjectMapper$DurationJsonDeserializer.deserialize(StateFunObjectMapper.java:49)\n\tat org.apache.flink.statefun.flink.common.json.StateFunObjectMapper$DurationJsonDeserializer.des{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)