You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/09/08 10:53:00 UTC

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

    [ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16608014#comment-16608014 ] 

ASF GitHub Bot commented on FLINK-10011:
----------------------------------------

tillrohrmann commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587#discussion_r216127558
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
 ##########
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherHATest;
+import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+	@ClassRule
+	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS);
+
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		final Future<Terminated> terminationFuture = system.terminate();
+		Await.ready(terminationFuture, TIMEOUT);
+	}
+
+	/**
+	 * Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses
+	 * leadership.
+	 */
+	@Test
+	public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 
 Review comment:
   Good point. Will refactor it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Old job resurrected during HA failover
> --------------------------------------
>
>                 Key: FLINK-10011
>                 URL: https://issues.apache.org/jira/browse/FLINK-10011
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.4.2, 1.5.2, 1.6.0
>            Reporter: Elias Levy
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x30000003f4a0003, likely server has closed socket, closing socket connection and attempting reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked leadership.}}
>  * 15:19:57 JM 2 changes job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages because there is no leader
>  ** {{Discard message LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: TaskManager akka://flink/user/taskmanager is disassociating)) because there is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.}}
>  ** {{Session establishment complete on server ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 0x30000003f4a0003, negotiated timeout = 40000}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted leadership with leader session ID Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. Cancelling registration.}}
>  * 15:19:57 TMs register with JM 1
>  * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
>  ** {{Attempting to recover all jobs.}}
>  ** {{There are 2 jobs to recover. Starting the job recovery.}}
>  ** {{Attempting to recover job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
>  ** {{Attempting to recover job {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}
>  * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
>  ** {{Got user-level KeeperException when processing sessionid:0x2000001d2330001 type:create cxid:0x4211 zxid:0x60009dc70 txntype:-1 reqpath:n/a Error Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 Error:KeeperErrorCode = NodeExists for /flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}}
>  ** {{Got user-level KeeperException when processing sessionid:0x2000001d2330001 type:create cxid:0x4230 zxid:0x60009dc78 txntype:-1 reqpath:n/a Error Path:/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2 Error:KeeperErrorCode = NodeExists for /flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2}}
>  * 15:29:08 JM 1 starts to recover job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
>  ** {{Recovered SubmittedJobGraph(2a4eff355aef849c5ca37dbac04f2ff1, JobInfo(clients: Set((Actor[akka.tcp://flink@ip-10-210-42-62.ec2.internal:37564/temp/$c],DETACHED)), start: 1528833686265)).}}
>  ** {{Submitting recovered job 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Submitting job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job) (Recovery).}}
>  ** {{Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=30000) for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Successfully ran initialization on master in 0 ms.}}
>  ** {{Job recovers via failover strategy: full graph restart}}
>  ** {{Running initialization on master for job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).}}
>  ** {{Initialized in '/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1'.}}
>  ** {{Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
>  ** {{Persisting periodic checkpoints externally at s3://bucket/flink/cluster_1/checkpoints-external.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Found 3 checkpoints in ZooKeeper.}}
>  ** {{Trying to retrieve checkpoint 69255.}}
>  ** {{Trying to fetch 3 checkpoints from storage.}}
>  ** {{Trying to retrieve checkpoint 69256.}}
>  ** {{Trying to retrieve checkpoint 69257.}}
>  ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Scheduling job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job).}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state CREATED to RUNNING.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RUNNING to FAILING.}}{{ org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.}}
>  * 15:20:09 JM 1 starts to recover {color:#d04437}61bca496065cd05e4263070a5e923a05{color}
>  ** {{Recovered SubmittedJobGraph(61bca496065cd05e4263070a5e923a05, JobInfo(clients: Set((Actor[akka.tcp://flink@ip-10-210-22-167.ec2.internal:41001/temp/$c],DETACHED)), start: 1525728377900)).}}
>  ** {{Submitting recovered job 61bca496065cd05e4263070a5e923a05.}}
>  ** {{Submitting job 61bca496065cd05e4263070a5e923a05 (Some Job) (Recovery).}}
>  ** {{Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=30000) for 61bca496065cd05e4263070a5e923a05.}}
>  ** {{Job recovers via failover strategy: full graph restart}}
>  ** {{Successfully ran initialization on master in 0 ms.}}
>  ** {{Running initialization on master for job Some Job (61bca496065cd05e4263070a5e923a05).}}
>  ** {{Initialized in '/checkpoints/61bca496065cd05e4263070a5e923a05'.}}
>  ** {{Using application-defined state backend for checkpoint/savepoint metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
>  ** {{Persisting periodic checkpoints externally at s3://bucket/flink/cluster_1/checkpoints-external.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Scheduling job 61bca496065cd05e4263070a5e923a05 (Some Job).}}
>  ** {{Job Some Job (61bca496065cd05e4263070a5e923a05) switched from state CREATED to RUNNING.}}
>  ** {{Trying to fetch 0 checkpoints from storage}}
>  ** {{Found 0 checkpoints in ZooKeeper.}}
>  * 15:20:09 JM 1 reports a bunch of metric collisions because of the two jobs:
>  ** {{Name collision: Group already contains a Metric with the name 'lastCheckpointSize'. Metric will not be reported.[jobmanager, job]}}
>  ** {{Name collision: Group already contains a Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be reported.[jobmanager, job]}}
>  ** etc
>  * 15:20:39 JM 1 begins attempting to restart the {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job repeatedly
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state FAILING to RESTARTING.}}
>  ** {{Restarting the job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RESTARTING to CREATED.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Found 3 checkpoints in ZooKeeper.}}
>  ** {{Trying to fetch 3 checkpoints from storage.}}
>  ** {{Trying to retrieve checkpoint 69255.}}
>  ** {{Trying to retrieve checkpoint 69256.}}
>  ** {{Trying to retrieve checkpoint 69257.}}
>  ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state CREATED to RUNNING.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RUNNING to FAILING.}}
>  * 15:35:39 ZK 1 reestablishes connection with ZK 2 and 3 and becomes a follower
>  
> h4. Analysis
>  
> The cluster was running job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.  The JM HA leader was JM 2, which was connected to ZK 1.  ZK 1 was a follower in the ZK ensemble.  The ZK leader was ZK 2.  
> ZK 1 lost network connectivity for about 16 minutes.  Upon loss of connectivity to ZK 1, JM 2 gives up leadership and shutdown the  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job.  JM 2 then immediately connects to ZK 2, without its ZK session having expired.  Nonetheless, as it gave up leadership JM 1 is elected the new JM leader.
> JM 1 then erroneously decides there are two jobs to restore.  The previously running job,  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, and {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.  JM 1 decides there are no checkpoints for {color:#d04437}61bca496065cd05e4263070a5e923a05{color}, which starts right away.   JM 1 attempts to restore {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} from the latest checkpoint, but it fails because there aren't enough task slots in the cluster as a result of the other job starting.  Thus,  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} entered a restart loop.
> We manually stopped both jobs and starts a new job based on the last known checkpoint for  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.
>  
> Job {color:#d04437}61bca496065cd05e4263070a5e923a05{color}  is an old job that we ran for a month between May 7th and June 5th.
> After our manual intervention, the the HA state directory in S3 looks like this:
> {{s3cmd ls s3://bucket/flink/cluster_1/recovery/}}
> {{ DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
> {{2018-07-31 17:33 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5}}
> {{2018-07-31 17:34 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb}}
> {{2018-07-31 17:32 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02}}
> {{2018-06-12 20:01 284626 s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec}}
> {{2018-07-30 23:01 285257 s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c}}
> submittedJobGraph7f627a661cec appears to be job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, the long running job that failed during the ZK failover
> submittedJobGraphf3767780c00c appears to be job {color:#205081}d77948df92813a68ea6dfd6783f40e7e{color}, the job we started restoring from a checkpoint after shutting down the duplicate jobs
>  
> A few questions come to mind.
> h5. Why does the JM terminate running jobs when it can immediately connect to another ZK node and its ZK session has not expired?
> This seems to be a result of using the LeaderLatch recipe in Curator.  It's [docs|https://github.com/Netflix/curator/wiki/Leader-Latch] state: 
> {quote}LeaderLatch instances add a ConnectionStateListener to watch for connection problems. If SUSPENDED or LOST is reported, the LeaderLatch that is the *leader will report that it is no longer the leader* (i.e. there will not be a leader until the connection is re-established). If a LOST connection is RECONNECTED, the LeaderLatch *will delete its previous ZNode and create a new one*.
> Users of LeaderLatch must take account that connection issues can cause leadership to be lost. i.e. hasLeadership() returns true but some time later the connection is SUSPENDED or LOST. At that point hasLeadership() will return false. It is highly recommended that LeaderLatch users register a ConnectionStateListener.
> {quote}
> So not only is leadership lost while disconnected, but will likely stay lost when reconnecting as a result of the znode deletion and recreation.
> This can probably be solved by using the Curator LeaderElection recipe instead, which [states|https://github.com/Netflix/curator/wiki/Leader-Election]:
> {quote}The {{LeaderSelectorListener}} class extends {{ConnectionStateListener}}. When the LeaderSelector is started, it adds the listener to the Curator instance. Users of the {{LeaderSelector}} must pay attention to any connection state changes. If an instance becomes the leader, it should respond to notification of being SUSPENDED or LOST.
> If the SUSPENDED state is reported, the instance must assume that it might no longer be the leader until it receives a RECONNECTED state. If the LOST state is reported, the instance is no longer the leader and its {{takeLeadership}} method should exit.
> {quote}
> So with LeaderElection it seems that what to do during the SUSPENDED state is left up to the application, which may choose to continue being leader until the state becomes LOST.
> Obviously there are dangers with doing so, but at the very least you would expect the JM not to give up leadership until it tried to connect to other ZK members within the remaining ZK session timeout.
> This problem has been [previously discussed|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Zookeeper-failure-handling-td19611.html] in the mailing list, which led to FLINK-6174 and this [PR|https://github.com/apache/flink/pull/3599], which appears to be a modification of the Curator LeaderLatch recipe.  It also lead to FLINK-5703, which seems like an attempt to keep jobs running during JM failover.  While that is a valuable addition, I argue that is not required to avoid this failure, as the previous leader can continue being leader so long as it connects to a new ZK before its ZK session expires.
>  
> h5. Why did JM 1 resurrect the old job?
> Something must have been off with the state stored in the S3 HA recovery directory.  The job must have not been fully removed.  
> In fact, right now the recovery directory has the file submittedJobGraph7f627a661cec which appears to be job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. Is that expected?  That job is no longer running.  Shouldn't that file no longer exist in the recovery directory?
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)