You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/06/12 14:23:09 UTC

[GitHub] flink pull request #6155: [FLINK-9494] Fix race condition in Dispatcher with...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/6155

    [FLINK-9494] Fix race condition in Dispatcher with granting and revoking leadership

    ## What is the purpose of the change
    
    The race condition was caused by the fact that the job recovery is executed outside
    of the main thread. Only after the recovery finishes, the Dispatcher will set the new
    fencing token and start the recovered jobs. The problem arose if in between these two
    operations the Dispatcher gets its leadership revoked. Then it could happen that the
    Dispatcher tries to run the recovered jobs even though it no longer holds the leadership.
    
    The race condition is solved by checking first whether we still hold the leadership which
    is identified by the given leader session id.
    
    This PR is based on #6154.
    
    cc @StefanRRichter 
    
    ## Brief change log
    
    - After recovering the jobs, check whether the leader session is still valid before assigning the fencing token and the starting the recovered jobs
    
    ## Verifying this change
    
    - Added `DispatcherHATest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixDispatcherRaceCondition

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6155.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6155
    
----
commit 54831c6a07dfcd5691fd732148a7c559514362ec
Author: Till Rohrmann <tr...@...>
Date:   2018-06-12T13:22:50Z

    [hotfix] Remove Scala dependency from ZooKeeperLeaderElectionTest

commit e50034088f4d85fe457e7015f162a6f86b1de9e7
Author: Till Rohrmann <tr...@...>
Date:   2018-06-12T12:24:59Z

    [FLINK-9573] Extend LeaderElectionService#hasLeadership to take leader session id
    
    The new LeaderElectionService#hasLeadership also takes the leader session id and verifies whether
    this is the correct leader session id associated with the leadership.

commit 104b46bd7848cd431afc564f6d3bb364a5257cf9
Author: Till Rohrmann <tr...@...>
Date:   2018-06-12T12:40:13Z

    [hotfix] Fix checkstyle violations in SingleLeaderElectionService

commit b5f9ee50dc208b767d56bc74a1705b853b10aa7c
Author: Till Rohrmann <tr...@...>
Date:   2018-06-12T12:26:15Z

    [hotfix] Make TestingDispatcher and TestingJobManagerRunnerFactory standalone testing classes
    
    Refactors the DispatcherTests and moves the TestingDispatcher and the TestingJobManagerRunnerFactory
    to be top level classes. This makes it easier to reuse them.

commit 190de76b21c3710d0fcbe5d66018c5853707cc33
Author: Till Rohrmann <tr...@...>
Date:   2018-06-12T12:27:30Z

    [FLINK-9494] Fix race condition in Dispatcher with granting and revoking leadership
    
    The race condition was caused by the fact that the job recovery is executed outside
    of the main thread. Only after the recovery finishes, the Dispatcher will set the new
    fencing token and start the recovered jobs. The problem arose if in between these two
    operations the Dispatcher gets its leadership revoked. Then it could happen that the
    Dispatcher tries to run the recovered jobs even though it no longer holds the leadership.
    
    The race condition is solved by checking first whether we still hold the leadership which
    is identified by the given leader session id.

----


---

[GitHub] flink issue #6155: [FLINK-9494] Fix race condition in Dispatcher with granti...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6155
  
    LGTM 👍 


---

[GitHub] flink issue #6155: [FLINK-9494] Fix race condition in Dispatcher with granti...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/6155
  
    Thanks for the review @StefanRRichter. Merging this PR.


---

[GitHub] flink pull request #6155: [FLINK-9494] Fix race condition in Dispatcher with...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6155#discussion_r195347814
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.VoidBlobStore;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.apache.flink.runtime.util.TestingFatalErrorHandler;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.UUID;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ExecutionException;
    +
    +import static org.hamcrest.Matchers.equalTo;
    +import static org.hamcrest.Matchers.is;
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Tests the HA behaviour of the {@link Dispatcher}.
    + */
    +public class DispatcherHATest extends TestLogger {
    +
    +	private static final DispatcherId NULL_FENCING_TOKEN = DispatcherId.fromUuid(new UUID(0L, 0L));
    +
    +	private static final Time timeout = Time.seconds(10L);
    +
    +	private static TestingRpcService rpcService;
    +
    +	private TestingFatalErrorHandler testingFatalErrorHandler;
    +
    +	@BeforeClass
    +	public static void setupClass() {
    +		rpcService = new TestingRpcService();
    +	}
    +
    +	@Before
    +	public void setup() {
    +		testingFatalErrorHandler = new TestingFatalErrorHandler();
    +	}
    +
    +	@After
    +	public void teardown() throws Exception {
    +		if (testingFatalErrorHandler != null) {
    +			testingFatalErrorHandler.rethrowError();
    +		}
    +	}
    +
    +	@AfterClass
    +	public static void teardownClass() throws ExecutionException, InterruptedException {
    +		if (rpcService != null) {
    +			rpcService.stopService().get();
    +			rpcService = null;
    +		}
    +	}
    +
    +	/**
    +	 * Tests that interleaved granting and revoking of the leadership won't interfere
    +	 * with the job recovery and the resulting internal state of the Dispatcher.
    +	 */
    +	@Test
    +	public void testGrantingRevokingLeadership() throws Exception {
    +
    +		final Configuration configuration = new Configuration();
    +		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
    +		final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
    +		final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(nonEmptyJobGraph, null);
    +
    +		final OneShotLatch enterGetJobIdsLatch = new OneShotLatch();
    +		final OneShotLatch proceedGetJobIdsLatch = new OneShotLatch();
    +		highAvailabilityServices.setSubmittedJobGraphStore(new BlockingSubmittedJobGraphStore(submittedJobGraph, enterGetJobIdsLatch, proceedGetJobIdsLatch));
    +		final TestingLeaderElectionService dispatcherLeaderElectionService = new TestingLeaderElectionService();
    +		highAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
    +
    +		final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
    +
    +		final HATestingDispatcher dispatcher = new HATestingDispatcher(
    +			rpcService,
    +			UUID.randomUUID().toString(),
    +			configuration,
    +			highAvailabilityServices,
    +			new TestingResourceManagerGateway(),
    +			new BlobServer(configuration, new VoidBlobStore()),
    +			new HeartbeatServices(1000L, 1000L),
    +			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
    +			null,
    +			new MemoryArchivedExecutionGraphStore(),
    +			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
    +			testingFatalErrorHandler,
    +			fencingTokens);
    +
    +		dispatcher.start();
    +
    +		try {
    +			final UUID leaderId = UUID.randomUUID();
    +			dispatcherLeaderElectionService.isLeader(leaderId);
    +
    +			dispatcherLeaderElectionService.notLeader();
    +
    +			final DispatcherId firstFencingToken = fencingTokens.take();
    +
    +			assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN));
    +
    +			enterGetJobIdsLatch.await();
    +			proceedGetJobIdsLatch.trigger();
    +
    +			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
    +
    +		} finally {
    +			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    +		}
    +	}
    +
    +	@Nonnull
    +	private JobGraph createNonEmptyJobGraph() {
    +		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
    +		return new JobGraph(noOpVertex);
    +	}
    +
    +	private static class HATestingDispatcher extends TestingDispatcher {
    +
    +		@Nonnull
    +		private final BlockingQueue<DispatcherId> fencingTokens;
    +
    +		HATestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nonnull BlockingQueue<DispatcherId> fencingTokens) throws Exception {
    --- End diff --
    
    Sure, will fix it before merging.


---

[GitHub] flink pull request #6155: [FLINK-9494] Fix race condition in Dispatcher with...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6155


---

[GitHub] flink pull request #6155: [FLINK-9494] Fix race condition in Dispatcher with...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6155#discussion_r195337102
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.VoidBlobStore;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.apache.flink.runtime.util.TestingFatalErrorHandler;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.UUID;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ExecutionException;
    +
    +import static org.hamcrest.Matchers.equalTo;
    +import static org.hamcrest.Matchers.is;
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Tests the HA behaviour of the {@link Dispatcher}.
    + */
    +public class DispatcherHATest extends TestLogger {
    +
    +	private static final DispatcherId NULL_FENCING_TOKEN = DispatcherId.fromUuid(new UUID(0L, 0L));
    +
    +	private static final Time timeout = Time.seconds(10L);
    +
    +	private static TestingRpcService rpcService;
    +
    +	private TestingFatalErrorHandler testingFatalErrorHandler;
    +
    +	@BeforeClass
    +	public static void setupClass() {
    +		rpcService = new TestingRpcService();
    +	}
    +
    +	@Before
    +	public void setup() {
    +		testingFatalErrorHandler = new TestingFatalErrorHandler();
    +	}
    +
    +	@After
    +	public void teardown() throws Exception {
    +		if (testingFatalErrorHandler != null) {
    +			testingFatalErrorHandler.rethrowError();
    +		}
    +	}
    +
    +	@AfterClass
    +	public static void teardownClass() throws ExecutionException, InterruptedException {
    +		if (rpcService != null) {
    +			rpcService.stopService().get();
    +			rpcService = null;
    +		}
    +	}
    +
    +	/**
    +	 * Tests that interleaved granting and revoking of the leadership won't interfere
    +	 * with the job recovery and the resulting internal state of the Dispatcher.
    +	 */
    +	@Test
    +	public void testGrantingRevokingLeadership() throws Exception {
    +
    +		final Configuration configuration = new Configuration();
    +		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
    +		final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
    +		final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(nonEmptyJobGraph, null);
    +
    +		final OneShotLatch enterGetJobIdsLatch = new OneShotLatch();
    +		final OneShotLatch proceedGetJobIdsLatch = new OneShotLatch();
    +		highAvailabilityServices.setSubmittedJobGraphStore(new BlockingSubmittedJobGraphStore(submittedJobGraph, enterGetJobIdsLatch, proceedGetJobIdsLatch));
    +		final TestingLeaderElectionService dispatcherLeaderElectionService = new TestingLeaderElectionService();
    +		highAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
    +
    +		final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
    +
    +		final HATestingDispatcher dispatcher = new HATestingDispatcher(
    +			rpcService,
    +			UUID.randomUUID().toString(),
    +			configuration,
    +			highAvailabilityServices,
    +			new TestingResourceManagerGateway(),
    +			new BlobServer(configuration, new VoidBlobStore()),
    +			new HeartbeatServices(1000L, 1000L),
    +			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
    +			null,
    +			new MemoryArchivedExecutionGraphStore(),
    +			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
    +			testingFatalErrorHandler,
    +			fencingTokens);
    +
    +		dispatcher.start();
    +
    +		try {
    +			final UUID leaderId = UUID.randomUUID();
    +			dispatcherLeaderElectionService.isLeader(leaderId);
    +
    +			dispatcherLeaderElectionService.notLeader();
    +
    +			final DispatcherId firstFencingToken = fencingTokens.take();
    +
    +			assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN));
    +
    +			enterGetJobIdsLatch.await();
    +			proceedGetJobIdsLatch.trigger();
    +
    +			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
    +
    +		} finally {
    +			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    +		}
    +	}
    +
    +	@Nonnull
    +	private JobGraph createNonEmptyJobGraph() {
    +		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
    +		return new JobGraph(noOpVertex);
    +	}
    +
    +	private static class HATestingDispatcher extends TestingDispatcher {
    +
    +		@Nonnull
    +		private final BlockingQueue<DispatcherId> fencingTokens;
    +
    +		HATestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nonnull BlockingQueue<DispatcherId> fencingTokens) throws Exception {
    --- End diff --
    
    I would suggest to introduce some newlines here.


---