You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yunfeng Zhou (Jira)" <ji...@apache.org> on 2022/11/30 01:41:00 UTC

[jira] [Created] (FLINK-30241) Flink ML Iteration ConcurrentModificationException

Yunfeng Zhou created FLINK-30241:
------------------------------------

             Summary: Flink ML Iteration ConcurrentModificationException
                 Key: FLINK-30241
                 URL: https://issues.apache.org/jira/browse/FLINK-30241
             Project: Flink
          Issue Type: Bug
          Components: Library / Machine Learning
    Affects Versions: ml-2.1.0
            Reporter: Yunfeng Zhou


https://github.com/jiangxin369/flink-ml/actions/runs/3577811156/jobs/6017233847


{code}
___________________ LinearRegressionTest.test_get_model_data ___________________

self = <ml.lib.regression.tests.test_linearregression.LinearRegressionTest testMethod=test_get_model_data>

    def test_get_model_data(self):
        regression = LinearRegression().set_weight_col('weight')
        model = regression.fit(self.input_data_table)
        model_data = self.t_env.to_data_stream(
>           model.get_model_data()[0]).execute_and_collect().next()

pyflink/ml/lib/regression/tests/test_linearregression.py:124: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/datastream/data_stream.py:1760: in next
    if not self._j_closeable_iterator.hasNext():
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/py4j/java_gateway.py:1322: in __call__
    answer, self.gateway_client, self.target_id, self.name)
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/util/exceptions.py:146: in deco
    return f(*a, **kw)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

answer = 'xro12236'
gateway_client = <py4j.java_gateway.GatewayClient object at 0x7fdb862ca190>
target_id = 'o12139', name = 'hasNext'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.
    
        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.
    
        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
                    raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
>                       format(target_id, ".", name), value)
E                   py4j.protocol.Py4JJavaError: An error occurred while calling o12139.hasNext.
E                   : java.lang.RuntimeException: Failed to fetch next result
E                   	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
E                   	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
E                   	at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)
E                   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E                   	at java.lang.reflect.Method.invoke(Method.java:498)
E                   	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E                   	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E                   	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
E                   	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E                   	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
E                   	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E                   	at java.lang.Thread.run(Thread.java:750)
E                   Caused by: java.io.IOException: Failed to fetch job execution result
E                   	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
E                   	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
E                   	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
E                   	... 11 more
E                   Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
E                   	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
E                   	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
E                   	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
E                   	... 13 more
E                   Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
E                   	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
E                   	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
E                   	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
E                   	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
E                   	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
E                   	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138)
E                   	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:181)
E                   	... 13 more
E                   Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
E                   	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
E                   	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
E                   	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
E                   	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
E                   	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
E                   	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
E                   	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
E                   	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
E                   	at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
E                   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E                   	at java.lang.reflect.Method.invoke(Method.java:498)
E                   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
E                   	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
E                   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
E                   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
E                   	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
E                   	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
E                   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
E                   	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
E                   	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
E                   	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
E                   	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
E                   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
E                   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
E                   	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
E                   	at akka.actor.Actor.aroundReceive(Actor.scala:537)
E                   	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
E                   	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
E                   	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
E                   	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
E                   	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
E                   	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
E                   	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
E                   	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
E                   	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
E                   	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
E                   	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
E                   Caused by: java.util.ConcurrentModificationException
E                   	at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648)
E                   	at java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044)
E                   	at org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:463)
E                   	at org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:391)
E                   	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
E                   	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
E                   	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
E                   	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
E                   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
E                   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
E                   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
E                   	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
E                   	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
E                   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
E                   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
E                   	at java.lang.Thread.run(Thread.java:750)

{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)