You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yunfeng Zhou (Jira)" <ji...@apache.org> on 2022/11/30 01:41:00 UTC
[jira] [Created] (FLINK-30241) Flink ML Iteration ConcurrentModificationException
Yunfeng Zhou created FLINK-30241:
------------------------------------
Summary: Flink ML Iteration ConcurrentModificationException
Key: FLINK-30241
URL: https://issues.apache.org/jira/browse/FLINK-30241
Project: Flink
Issue Type: Bug
Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou
https://github.com/jiangxin369/flink-ml/actions/runs/3577811156/jobs/6017233847
{code}
___________________ LinearRegressionTest.test_get_model_data ___________________
self = <ml.lib.regression.tests.test_linearregression.LinearRegressionTest testMethod=test_get_model_data>
def test_get_model_data(self):
regression = LinearRegression().set_weight_col('weight')
model = regression.fit(self.input_data_table)
model_data = self.t_env.to_data_stream(
> model.get_model_data()[0]).execute_and_collect().next()
pyflink/ml/lib/regression/tests/test_linearregression.py:124:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/datastream/data_stream.py:1760: in next
if not self._j_closeable_iterator.hasNext():
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/py4j/java_gateway.py:1322: in __call__
answer, self.gateway_client, self.target_id, self.name)
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/util/exceptions.py:146: in deco
return f(*a, **kw)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
answer = 'xro12236'
gateway_client = <py4j.java_gateway.GatewayClient object at 0x7fdb862ca190>
target_id = 'o12139', name = 'hasNext'
def get_return_value(answer, gateway_client, target_id=None, name=None):
"""Converts an answer received from the Java gateway into a Python object.
For example, string representation of integers are converted to Python
integer, string representation of objects are converted to JavaObject
instances, etc.
:param answer: the string returned by the Java gateway
:param gateway_client: the gateway client used to communicate with the Java
Gateway. Only necessary if the answer is a reference (e.g., object,
list, map)
:param target_id: the name of the object from which the answer comes from
(e.g., *object1* in `object1.hello()`). Optional.
:param name: the name of the member from which the answer comes from
(e.g., *hello* in `object1.hello()`). Optional.
"""
if is_error(answer)[0]:
if len(answer) > 1:
type = answer[1]
value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
if answer[1] == REFERENCE_TYPE:
raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.\n".
> format(target_id, ".", name), value)
E py4j.protocol.Py4JJavaError: An error occurred while calling o12139.hasNext.
E : java.lang.RuntimeException: Failed to fetch next result
E at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
E at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
E at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)
E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E at java.lang.reflect.Method.invoke(Method.java:498)
E at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
E at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
E at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E at java.lang.Thread.run(Thread.java:750)
E Caused by: java.io.IOException: Failed to fetch job execution result
E at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
E at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
E at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
E ... 11 more
E Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
E at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
E at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
E at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
E ... 13 more
E Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
E at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
E at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
E at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
E at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
E at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
E at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138)
E at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:181)
E ... 13 more
E Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
E at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
E at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
E at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
E at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
E at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
E at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
E at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
E at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
E at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E at java.lang.reflect.Method.invoke(Method.java:498)
E at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
E at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
E at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
E at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
E at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
E at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
E at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
E at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
E at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
E at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
E at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
E at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
E at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
E at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
E at akka.actor.Actor.aroundReceive(Actor.scala:537)
E at akka.actor.Actor.aroundReceive$(Actor.scala:535)
E at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
E at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
E at akka.actor.ActorCell.invoke(ActorCell.scala:548)
E at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
E at akka.dispatch.Mailbox.run(Mailbox.scala:231)
E at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
E at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
E at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
E at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
E at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
E Caused by: java.util.ConcurrentModificationException
E at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648)
E at java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044)
E at org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:463)
E at org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:391)
E at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
E at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
E at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
E at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
E at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
E at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
E at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
E at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
E at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
E at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
E at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
E at java.lang.Thread.run(Thread.java:750)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)