You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 叶贤勋 <yx...@163.com> on 2019/12/03 09:49:35 UTC

MiniCluster启动报actor初始化异常

Hi 大家好,请教一个问题:
    Flink版本1.9.1,使用MiniCluster在本地提交任务, 不能正常执行任务,下面的报错信息是在执行miniCluster.start()时报的,并且程序一直卡在这一步 miniCluster.executeJobBlocking(getSimpleJob(numOfTMs * slotsPerTM))
    Java Code:

public static void main(String[] args) throws Exception {
final int numOfTMs = 3;
final int slotsPerTM = 7;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
            .setNumTaskManagers(numOfTMs)
            .setNumSlotsPerTaskManager(slotsPerTM)
            .setRpcServiceSharing(RpcServiceSharing.SHARED)
            .setConfiguration(getDefaultConfiguration())
            .build();
final MiniCluster miniCluster = new MiniCluster(cfg);
try {
        miniCluster.start();
        JobExecutionResult result = miniCluster.executeJobBlocking(getSimpleJob(numOfTMs * slotsPerTM));
        System.out.println(result.toString());
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        miniCluster.close();
    }
}

private static Configuration getDefaultConfiguration() {
final Configuration configuration = new Configuration();
    configuration.setString(RestOptions.BIND_PORT, "0");

return configuration;
}

private static JobGraph getSimpleJob(int parallelism) throws IOException {
final JobVertex task = new JobVertex("Test task");
    task.setParallelism(parallelism);
    task.setMaxParallelism(parallelism);
    task.setInvokableClass(NoOpInvokable.class);

final JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
    jg.setScheduleMode(ScheduleMode.EAGER);

final ExecutionConfig executionConfig = new ExecutionConfig();
    executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
    jg.setExecutionConfig(executionConfig);

return jg;
}
      报错信息:
        02/12/2019 20:49:21.212  INFO [org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils] Actor system started at akka.tcp://flink-metrics@10.242.32.235:65105
02/12/2019 20:49:21.219  INFO [org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/MetricQueryService .
02/12/2019 20:49:21.227 ERROR [org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy] Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink-metrics/user/MetricQueryService: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 7 more
02/12/2019 20:49:21.229  INFO [org.apache.flink.runtime.minicluster.MiniCluster] Starting high-availability services
02/12/2019 20:49:21.245  INFO [org.apache.flink.runtime.blob.BlobServer] Created BLOB server storage directory /var/folders/7b/x2zfnt157ls4f27syvlyjjsc0000gn/T/blobStore-575c98fc-54f2-45f8-a183-96d3880e940b
……
02/12/2019 20:49:21.671  INFO [org.apache.flink.runtime.taskexecutor.TaskManagerServices] Limiting managed memory to 524287 MB, memory will be allocated lazily.
02/12/2019 20:49:21.674 DEBUG [org.apache.flink.runtime.memory.MemoryManager] Initialized MemoryManager with total memory size 549754765312, number of slots 1, page size 32768, memory type HEAP, pre allocate memory false and number of non allocated pages 16777184.
02/12/2019 20:49:21.685  INFO [org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration] Messages have a max timeout of 10000 ms
02/12/2019 20:49:21.696  INFO [org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 .
02/12/2019 20:49:21.697 ERROR [org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy] Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/taskmanager_0: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:21.735  INFO [org.apache.flink.configuration.Configuration] Config uses fallback configuration key 'rest.port' instead of key 'rest.bind-port'
02/12/2019 20:49:21.774 DEBUG [org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory] Starting Dispatcher REST endpoint.
02/12/2019 20:49:21.774  INFO [org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint] Starting rest endpoint.
……
02/12/2019 20:49:22.245  INFO [org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
02/12/2019 20:49:22.246 ERROR [org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy] Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/resourcemanager: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:22.257  INFO [org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
02/12/2019 20:49:22.257 ERROR [org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy] Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/dispatcher: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:22.267 DEBUG [org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory] Starting ResourceManager.
02/12/2019 20:49:22.267 DEBUG [org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory] Starting Dispatcher.
02/12/2019 20:49:22.268  INFO [org.apache.flink.runtime.minicluster.MiniCluster] Flink Mini Cluster started successfully
| |
叶贤勋
|
|
yxx_cmhd@163.com
|
签名由网易邮箱大师定制