You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nico Kruber (JIRA)" <ji...@apache.org> on 2017/05/24 14:27:04 UTC
[jira] [Updated] (FLINK-6689) Remote StreamExecutionEnvironment
fails to submit jobs against LocalFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-6689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nico Kruber updated FLINK-6689:
-------------------------------
Description:
The following Flink programs fails to execute with the current 1.3 branch (1.2 works) because the leader session ID being used is wrong:
{code:java}
final String jobManagerAddress = "localhost";
final int jobManagerPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
final Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false);
cluster.start(true);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(jobManagerAddress, jobManagerPort);
env.fromElements(1l).addSink(new DiscardingSink<Long>());
// fails due to leader session id being wrong:
env.execute("test");
{code}
Output from logs contais:
{code}
...
16:24:57,551 INFO org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader reachable under akka.tcp://flink@localhost:6123/user/jobmanager:ff0d56cf-6205-4dd4-a266-03847f4d6944.
16:24:57,894 INFO org.apache.flink.streaming.api.environment.RemoteStreamEnvironment - Running remotely at localhost:6123
16:24:58,121 INFO org.apache.flink.client.program.StandaloneClusterClient - Starting client actor system.
16:24:58,123 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils - Trying to select the network interface and address to use by connecting to the leading JobManager.
16:24:58,128 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
16:24:58,132 INFO org.apache.flink.runtime.net.ConnectionUtils - Retrieved new target address localhost/127.0.0.1:6123.
16:24:58,258 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
16:24:58,262 INFO Remoting - Starting remoting
16:24:58,375 INFO Remoting - Remoting started; listening on addresses :[akka.tcp://flink@nico-work.fritz.box:43413]
16:24:58,376 INFO org.apache.flink.client.program.StandaloneClusterClient - Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for job completion.
Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for job completion.
16:24:58,382 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Disconnect from JobManager null.
16:24:58,398 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received SubmitJobAndWait(JobGraph(jobId: 9bef4793a4b7f4caaad96bd28211cbb9)) but there is no connection to a JobManager yet.
16:24:58,398 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received job test (9bef4793a4b7f4caaad96bd28211cbb9).
16:24:58,429 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Connect to JobManager Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998].
16:24:58,432 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader session id 00000000-0000-0000-0000-000000000000.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader session id 00000000-0000-0000-0000-000000000000.
16:24:58,432 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Sending message to JobManager akka.tcp://flink@localhost:6123/user/jobmanager to submit job test (9bef4793a4b7f4caaad96bd28211cbb9) and wait for progress
16:24:58,433 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Upload jar files to job manager akka.tcp://flink@localhost:6123/user/jobmanager.
16:24:58,440 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Submit job to the job manager akka.tcp://flink@localhost:6123/user/jobmanager.
16:24:58,522 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 9bef4793a4b7f4caaad96bd28211cbb9),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID ff0d56cf-6205-4dd4-a266-03847f4d6944 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.
{code}
was:
The following Flink programs fails to execute with the current 1.3 branch (1.2 works):
{code:java}
final String jobManagerAddress = "localhost";
final int jobManagerPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
final Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false);
cluster.start(true);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(jobManagerAddress, jobManagerPort);
env.fromElements(1l).addSink(new DiscardingSink<Long>());
// fails due to leader session id being wrong:
env.execute("test");
{code}
> Remote StreamExecutionEnvironment fails to submit jobs against LocalFlinkMiniCluster
> ------------------------------------------------------------------------------------
>
> Key: FLINK-6689
> URL: https://issues.apache.org/jira/browse/FLINK-6689
> Project: Flink
> Issue Type: Bug
> Components: Job-Submission
> Affects Versions: 1.3.0
> Reporter: Nico Kruber
> Fix For: 1.3.0
>
>
> The following Flink programs fails to execute with the current 1.3 branch (1.2 works) because the leader session ID being used is wrong:
> {code:java}
> final String jobManagerAddress = "localhost";
> final int jobManagerPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
> final Configuration config = new Configuration();
> config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress);
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
> final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false);
> cluster.start(true);
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(jobManagerAddress, jobManagerPort);
> env.fromElements(1l).addSink(new DiscardingSink<Long>());
> // fails due to leader session id being wrong:
> env.execute("test");
> {code}
> Output from logs contais:
> {code}
> ...
> 16:24:57,551 INFO org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader reachable under akka.tcp://flink@localhost:6123/user/jobmanager:ff0d56cf-6205-4dd4-a266-03847f4d6944.
> 16:24:57,894 INFO org.apache.flink.streaming.api.environment.RemoteStreamEnvironment - Running remotely at localhost:6123
> 16:24:58,121 INFO org.apache.flink.client.program.StandaloneClusterClient - Starting client actor system.
> 16:24:58,123 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils - Trying to select the network interface and address to use by connecting to the leading JobManager.
> 16:24:58,128 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
> 16:24:58,132 INFO org.apache.flink.runtime.net.ConnectionUtils - Retrieved new target address localhost/127.0.0.1:6123.
> 16:24:58,258 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
> 16:24:58,262 INFO Remoting - Starting remoting
> 16:24:58,375 INFO Remoting - Remoting started; listening on addresses :[akka.tcp://flink@nico-work.fritz.box:43413]
> 16:24:58,376 INFO org.apache.flink.client.program.StandaloneClusterClient - Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for job completion.
> Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for job completion.
> 16:24:58,382 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Disconnect from JobManager null.
> 16:24:58,398 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received SubmitJobAndWait(JobGraph(jobId: 9bef4793a4b7f4caaad96bd28211cbb9)) but there is no connection to a JobManager yet.
> 16:24:58,398 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received job test (9bef4793a4b7f4caaad96bd28211cbb9).
> 16:24:58,429 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Connect to JobManager Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998].
> 16:24:58,432 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader session id 00000000-0000-0000-0000-000000000000.
> Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader session id 00000000-0000-0000-0000-000000000000.
> 16:24:58,432 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Sending message to JobManager akka.tcp://flink@localhost:6123/user/jobmanager to submit job test (9bef4793a4b7f4caaad96bd28211cbb9) and wait for progress
> 16:24:58,433 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Upload jar files to job manager akka.tcp://flink@localhost:6123/user/jobmanager.
> 16:24:58,440 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Submit job to the job manager akka.tcp://flink@localhost:6123/user/jobmanager.
> 16:24:58,522 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 9bef4793a4b7f4caaad96bd28211cbb9),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID ff0d56cf-6205-4dd4-a266-03847f4d6944 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)