You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/14 16:07:54 UTC

[1/5] flink git commit: [FLINK-9494] Fix race condition in Dispatcher with granting and revoking leadership

Repository: flink
Updated Branches:
  refs/heads/master 09fbf23f5 -> 7e51b90d9


[FLINK-9494] Fix race condition in Dispatcher with granting and revoking leadership

The race condition was caused by the fact that the job recovery is executed outside
of the main thread. Only after the recovery finishes, the Dispatcher will set the new
fencing token and start the recovered jobs. The problem arose if in between these two
operations the Dispatcher gets its leadership revoked. Then it could happen that the
Dispatcher tries to run the recovered jobs even though it no longer holds the leadership.

The race condition is solved by checking first whether we still hold the leadership which
is identified by the given leader session id.

This closes #6155.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e51b90d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e51b90d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e51b90d

Branch: refs/heads/master
Commit: 7e51b90d909c6feaac6ed48140df00372e95a45c
Parents: ef0a84e
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 12 14:27:30 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 14 15:28:03 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  51 ++--
 .../runtime/dispatcher/DispatcherHATest.java    | 253 +++++++++++++++++++
 2 files changed, 286 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e51b90d/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index f5fdd27..6ced316 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -740,30 +740,20 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	 */
 	@Override
 	public void grantLeadership(final UUID newLeaderSessionID) {
-		final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);
-		log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), dispatcherId);
+		log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID);
 
 		final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();
 
-		final CompletableFuture<Void> fencingTokenFuture = recoveredJobsFuture.thenAcceptAsync(
-			(Collection<JobGraph> recoveredJobs) -> {
-				setNewFencingToken(dispatcherId);
+		final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenApplyAsync(
+			(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
+			getUnfencedMainThreadExecutor());
 
-				for (JobGraph recoveredJob : recoveredJobs) {
-					try {
-						runJob(recoveredJob);
-					} catch (Exception e) {
-						throw new CompletionException(
-							new FlinkException(
-								String.format("Failed to recover job %s.", recoveredJob.getJobID()),
-								e));
-					}
+		final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenAcceptAsync(
+			(Boolean confirmLeadership) -> {
+				if (confirmLeadership) {
+					leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
 				}
 			},
-			getUnfencedMainThreadExecutor());
-
-		final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenRunAsync(
-			() -> leaderElectionService.confirmLeaderSessionID(newLeaderSessionID),
 			getRpcService().getExecutor());
 
 		confirmationFuture.whenComplete(
@@ -774,6 +764,31 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			});
 	}
 
+	private boolean tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
+		final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);
+
+		if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
+			log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
+			setNewFencingToken(dispatcherId);
+
+			for (JobGraph recoveredJob : recoveredJobs) {
+				try {
+					runJob(recoveredJob);
+				} catch (Exception e) {
+					throw new CompletionException(
+						new FlinkException(
+							String.format("Failed to recover job %s.", recoveredJob.getJobID()),
+							e));
+				}
+			}
+
+			return true;
+		} else {
+			log.debug("Dispatcher {} lost leadership before accepting it. Stop recovering jobs for fencing token {}.", getAddress(), dispatcherId);
+			return false;
+		}
+	}
+
 	private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
 		// clear the state if we've been the leader before
 		if (getFencingToken() != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e51b90d/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
new file mode 100644
index 0000000..2c030d2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests the HA behaviour of the {@link Dispatcher}.
+ */
+public class DispatcherHATest extends TestLogger {
+
+	private static final DispatcherId NULL_FENCING_TOKEN = DispatcherId.fromUuid(new UUID(0L, 0L));
+
+	private static final Time timeout = Time.seconds(10L);
+
+	private static TestingRpcService rpcService;
+
+	private TestingFatalErrorHandler testingFatalErrorHandler;
+
+	@BeforeClass
+	public static void setupClass() {
+		rpcService = new TestingRpcService();
+	}
+
+	@Before
+	public void setup() {
+		testingFatalErrorHandler = new TestingFatalErrorHandler();
+	}
+
+	@After
+	public void teardown() throws Exception {
+		if (testingFatalErrorHandler != null) {
+			testingFatalErrorHandler.rethrowError();
+		}
+	}
+
+	@AfterClass
+	public static void teardownClass() throws ExecutionException, InterruptedException {
+		if (rpcService != null) {
+			rpcService.stopService().get();
+			rpcService = null;
+		}
+	}
+
+	/**
+	 * Tests that interleaved granting and revoking of the leadership won't interfere
+	 * with the job recovery and the resulting internal state of the Dispatcher.
+	 */
+	@Test
+	public void testGrantingRevokingLeadership() throws Exception {
+
+		final Configuration configuration = new Configuration();
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
+		final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(nonEmptyJobGraph, null);
+
+		final OneShotLatch enterGetJobIdsLatch = new OneShotLatch();
+		final OneShotLatch proceedGetJobIdsLatch = new OneShotLatch();
+		highAvailabilityServices.setSubmittedJobGraphStore(new BlockingSubmittedJobGraphStore(submittedJobGraph, enterGetJobIdsLatch, proceedGetJobIdsLatch));
+		final TestingLeaderElectionService dispatcherLeaderElectionService = new TestingLeaderElectionService();
+		highAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
+
+		final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
+
+		final HATestingDispatcher dispatcher = new HATestingDispatcher(
+			rpcService,
+			UUID.randomUUID().toString(),
+			configuration,
+			highAvailabilityServices,
+			new TestingResourceManagerGateway(),
+			new BlobServer(configuration, new VoidBlobStore()),
+			new HeartbeatServices(1000L, 1000L),
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+			null,
+			new MemoryArchivedExecutionGraphStore(),
+			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
+			testingFatalErrorHandler,
+			fencingTokens);
+
+		dispatcher.start();
+
+		try {
+			final UUID leaderId = UUID.randomUUID();
+			dispatcherLeaderElectionService.isLeader(leaderId);
+
+			dispatcherLeaderElectionService.notLeader();
+
+			final DispatcherId firstFencingToken = fencingTokens.take();
+
+			assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN));
+
+			enterGetJobIdsLatch.await();
+			proceedGetJobIdsLatch.trigger();
+
+			assertThat(dispatcher.getNumberJobs(timeout).get(), is(0));
+
+		} finally {
+			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+		}
+	}
+
+	@Nonnull
+	private JobGraph createNonEmptyJobGraph() {
+		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
+		return new JobGraph(noOpVertex);
+	}
+
+	private static class HATestingDispatcher extends TestingDispatcher {
+
+		@Nonnull
+		private final BlockingQueue<DispatcherId> fencingTokens;
+
+		HATestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nonnull BlockingQueue<DispatcherId> fencingTokens) throws Exception {
+			super(rpcService, endpointId, configuration, highAvailabilityServices, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler);
+			this.fencingTokens = fencingTokens;
+		}
+
+		@VisibleForTesting
+		CompletableFuture<Integer> getNumberJobs(Time timeout) {
+			return callAsyncWithoutFencing(
+				() -> listJobs(timeout).get().size(),
+				timeout);
+		}
+
+		@Override
+		protected void setFencingToken(@Nullable DispatcherId newFencingToken) {
+			super.setFencingToken(newFencingToken);
+
+			if (newFencingToken == null) {
+				fencingTokens.offer(NULL_FENCING_TOKEN);
+			} else {
+				fencingTokens.offer(newFencingToken);
+			}
+		}
+	}
+
+	private static class BlockingSubmittedJobGraphStore implements SubmittedJobGraphStore {
+
+		@Nonnull
+		private final SubmittedJobGraph submittedJobGraph;
+
+		@Nonnull
+		private final OneShotLatch enterGetJobIdsLatch;
+
+		@Nonnull
+		private final OneShotLatch proceedGetJobIdsLatch;
+
+		private boolean isStarted = false;
+
+		private BlockingSubmittedJobGraphStore(@Nonnull SubmittedJobGraph submittedJobGraph, @Nonnull OneShotLatch enterGetJobIdsLatch, @Nonnull OneShotLatch proceedGetJobIdsLatch) {
+			this.submittedJobGraph = submittedJobGraph;
+			this.enterGetJobIdsLatch = enterGetJobIdsLatch;
+			this.proceedGetJobIdsLatch = proceedGetJobIdsLatch;
+		}
+
+		@Override
+		public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
+			isStarted = true;
+		}
+
+		@Override
+		public void stop() throws Exception {
+			isStarted = false;
+		}
+
+		@Nullable
+		@Override
+		public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+			Preconditions.checkArgument(jobId.equals(submittedJobGraph.getJobId()));
+
+			return submittedJobGraph;
+		}
+
+		@Override
+		public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+			throw new UnsupportedOperationException("Should not be called.");
+		}
+
+		@Override
+		public void removeJobGraph(JobID jobId) throws Exception {
+			throw new UnsupportedOperationException("Should not be called.");
+		}
+
+		@Override
+		public Collection<JobID> getJobIds() throws Exception {
+			enterGetJobIdsLatch.trigger();
+			proceedGetJobIdsLatch.await();
+			return Collections.singleton(submittedJobGraph.getJobId());
+		}
+	}
+}


[4/5] flink git commit: [hotfix] Make TestingDispatcher and TestingJobManagerRunnerFactory standalone testing classes

Posted by tr...@apache.org.
[hotfix] Make TestingDispatcher and TestingJobManagerRunnerFactory standalone testing classes

Refactors the DispatcherTests and moves the TestingDispatcher and the TestingJobManagerRunnerFactory
to be top level classes. This makes it easier to reuse them.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef0a84e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef0a84e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef0a84e8

Branch: refs/heads/master
Commit: ef0a84e84ddedde4cdfdc726dd0e21b5e947fc94
Parents: cacdb68
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 12 14:26:15 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 14 15:28:03 2018 +0200

----------------------------------------------------------------------
 .../DispatcherResourceCleanupTest.java          | 26 +++++--
 .../runtime/dispatcher/DispatcherTest.java      | 44 ------------
 .../runtime/dispatcher/MiniDispatcherTest.java  | 41 -----------
 .../runtime/dispatcher/TestingDispatcher.java   | 75 ++++++++++++++++++++
 .../TestingJobManagerRunnerFactory.java         | 73 +++++++++++++++++++
 5 files changed, 168 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index f768251..71125e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -150,6 +150,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		clearedJobLatch = new OneShotLatch();
 		runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch);
 		highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
+		highAvailabilityServices.setSubmittedJobGraphStore(new InMemorySubmittedJobGraphStore());
 
 		storedBlobFuture = new CompletableFuture<>();
 		deleteAllFuture = new CompletableFuture<>();
@@ -177,7 +178,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 			Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
 			configuration,
 			highAvailabilityServices,
-			new InMemorySubmittedJobGraphStore(),
+			highAvailabilityServices.getSubmittedJobGraphStore(),
 			new TestingResourceManagerGateway(),
 			blobServer,
 			new HeartbeatServices(1000L, 1000L),
@@ -185,9 +186,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 			null,
 			new MemoryArchivedExecutionGraphStore(),
 			new TestingJobManagerRunnerFactory(resultFuture, CompletableFuture.completedFuture(null)),
-			fatalErrorHandler,
-			null,
-			VoidHistoryServerArchivist.INSTANCE);
+			fatalErrorHandler);
 
 		dispatcher.start();
 
@@ -358,8 +357,23 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 	}
 
 	private static final class TestingDispatcher extends Dispatcher {
-		public TestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, HistoryServerArchivist historyServerArchivist) throws Exception {
-			super(rpcService, endpointId, configuration, highAvailabilityServices, submittedJobGraphStore, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricServiceQueryPath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, restAddress, historyServerArchivist);
+		TestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+			super(
+				rpcService,
+				endpointId,
+				configuration,
+				highAvailabilityServices,
+				submittedJobGraphStore,
+				resourceManagerGateway,
+				blobServer,
+				heartbeatServices,
+				jobManagerMetricGroup,
+				metricServiceQueryPath,
+				archivedExecutionGraphStore,
+				jobManagerRunnerFactory,
+				fatalErrorHandler,
+				null,
+				VoidHistoryServerArchivist.INSTANCE);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index c068020..a25fe7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.dispatcher;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
@@ -48,9 +47,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -79,7 +76,6 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -535,46 +531,6 @@ public class DispatcherTest extends TestLogger {
 		}
 	}
 
-	private static class TestingDispatcher extends Dispatcher {
-
-		private TestingDispatcher(
-				RpcService rpcService,
-				String endpointId,
-				Configuration configuration,
-				HighAvailabilityServices highAvailabilityServices,
-				ResourceManagerGateway resourceManagerGateway,
-				BlobServer blobServer,
-				HeartbeatServices heartbeatServices,
-				JobManagerMetricGroup jobManagerMetricGroup,
-				@Nullable String metricQueryServicePath,
-				ArchivedExecutionGraphStore archivedExecutionGraphStore,
-				JobManagerRunnerFactory jobManagerRunnerFactory,
-				FatalErrorHandler fatalErrorHandler) throws Exception {
-			super(
-				rpcService,
-				endpointId,
-				configuration,
-				highAvailabilityServices,
-				highAvailabilityServices.getSubmittedJobGraphStore(),
-				resourceManagerGateway,
-				blobServer,
-				heartbeatServices,
-				jobManagerMetricGroup,
-				metricQueryServicePath,
-				archivedExecutionGraphStore,
-				jobManagerRunnerFactory,
-				fatalErrorHandler,
-				null,
-				VoidHistoryServerArchivist.INSTANCE);
-		}
-
-		@VisibleForTesting
-		void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
-			runAsync(
-				() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
-		}
-	}
-
 	private static final class ExpectedJobIdJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
 
 		private final JobID expectedJobId;

http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index bfe6527..39c06f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -23,24 +23,17 @@ import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -65,8 +58,6 @@ import java.util.concurrent.TimeoutException;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link MiniDispatcher}.
@@ -260,36 +251,4 @@ public class MiniDispatcherTest extends TestLogger {
 			executionMode);
 	}
 
-	private static final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
-
-		private final CompletableFuture<JobGraph> jobGraphFuture;
-		private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
-
-		private TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture) {
-			this.jobGraphFuture = jobGraphFuture;
-			this.resultFuture = resultFuture;
-		}
-
-		@Override
-		public JobManagerRunner createJobManagerRunner(
-				ResourceID resourceId,
-				JobGraph jobGraph,
-				Configuration configuration,
-				RpcService rpcService,
-				HighAvailabilityServices highAvailabilityServices,
-				HeartbeatServices heartbeatServices,
-				BlobServer blobServer,
-				JobManagerSharedServices jobManagerSharedServices,
-				JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-				FatalErrorHandler fatalErrorHandler) throws Exception {
-			jobGraphFuture.complete(jobGraph);
-
-			final JobManagerRunner mock = mock(JobManagerRunner.class);
-			when(mock.getResultFuture()).thenReturn(resultFuture);
-			when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
-
-			return mock;
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
new file mode 100644
index 0000000..f5091ea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link Dispatcher} implementation used for testing purposes.
+ */
+class TestingDispatcher extends Dispatcher {
+
+	TestingDispatcher(
+		RpcService rpcService,
+		String endpointId,
+		Configuration configuration,
+		HighAvailabilityServices highAvailabilityServices,
+		ResourceManagerGateway resourceManagerGateway,
+		BlobServer blobServer,
+		HeartbeatServices heartbeatServices,
+		JobManagerMetricGroup jobManagerMetricGroup,
+		@Nullable String metricQueryServicePath,
+		ArchivedExecutionGraphStore archivedExecutionGraphStore,
+		JobManagerRunnerFactory jobManagerRunnerFactory,
+		FatalErrorHandler fatalErrorHandler) throws Exception {
+		super(
+			rpcService,
+			endpointId,
+			configuration,
+			highAvailabilityServices,
+			highAvailabilityServices.getSubmittedJobGraphStore(),
+			resourceManagerGateway,
+			blobServer,
+			heartbeatServices,
+			jobManagerMetricGroup,
+			metricQueryServicePath,
+			archivedExecutionGraphStore,
+			jobManagerRunnerFactory,
+			fatalErrorHandler,
+			null,
+			VoidHistoryServerArchivist.INSTANCE);
+	}
+
+	@VisibleForTesting
+	void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
+		runAsync(
+			() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
new file mode 100644
index 0000000..f9be888
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * {@link org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory} implementation for
+ * testing purposes.
+ */
+final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+
+	private final CompletableFuture<JobGraph> jobGraphFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
+
+	TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture) {
+		this.jobGraphFuture = jobGraphFuture;
+		this.resultFuture = resultFuture;
+	}
+
+	@Override
+	public JobManagerRunner createJobManagerRunner(
+			ResourceID resourceId,
+			JobGraph jobGraph,
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			BlobServer blobServer,
+			JobManagerSharedServices jobManagerSharedServices,
+			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		jobGraphFuture.complete(jobGraph);
+
+		final JobManagerRunner mock = mock(JobManagerRunner.class);
+		when(mock.getResultFuture()).thenReturn(resultFuture);
+		when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+		return mock;
+	}
+}


[3/5] flink git commit: [hotfix] Remove Scala dependency from ZooKeeperLeaderElectionTest

Posted by tr...@apache.org.
[hotfix] Remove Scala dependency from ZooKeeperLeaderElectionTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/467ad788
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/467ad788
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/467ad788

Branch: refs/heads/master
Commit: 467ad788fbcdd4b5ba319ffe3d64b25a282dd7ec
Parents: 09fbf23
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 12 15:22:50 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 14 15:28:03 2018 +0200

----------------------------------------------------------------------
 .../ZooKeeperLeaderElectionTest.java            | 82 +++++++++++---------
 1 file changed, 47 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/467ad788/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index e815a74..b476896 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -18,17 +18,19 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.TestLogger;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.test.TestingServer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.util.TestLogger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
@@ -40,23 +42,33 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link ZooKeeperLeaderElectionService} and the {@link ZooKeeperLeaderRetrievalService}.
+ */
 public class ZooKeeperLeaderElectionTest extends TestLogger {
 	private TestingServer testingServer;
 
@@ -65,7 +77,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	private CuratorFramework client;
 
 	private static final String TEST_URL = "akka//user/jobmanager";
-	private static final FiniteDuration timeout = new FiniteDuration(200, TimeUnit.SECONDS);
+	private static final long timeout = 200L * 1000L;
 
 	private static Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionTest.class);
 
@@ -116,12 +128,12 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			leaderElectionService.start(contender);
 			leaderRetrievalService.start(listener);
 
-			contender.waitForLeader(timeout.toMillis());
+			contender.waitForLeader(timeout);
 
 			assertTrue(contender.isLeader());
 			assertEquals(leaderElectionService.getLeaderSessionID(), contender.getLeaderSessionID());
 
-			listener.waitForNewLeader(timeout.toMillis());
+			listener.waitForNewLeader(timeout);
 
 			assertEquals(TEST_URL, listener.getAddress());
 			assertEquals(leaderElectionService.getLeaderSessionID(), listener.getLeaderSessionID());
@@ -144,7 +156,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	 */
 	@Test
 	public void testZooKeeperReelection() throws Exception {
-		Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
+		Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5L));
 
 		int num = 10;
 
@@ -248,7 +260,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			Pattern regex = Pattern.compile(pattern);
 
 			for (int i = 0; i < numTries; i++) {
-				listener.waitForNewLeader(timeout.toMillis());
+				listener.waitForNewLeader(timeout);
 
 				String address = listener.getAddress();
 
@@ -316,7 +328,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			leaderElectionService.start(contender);
 			leaderRetrievalService.start(listener);
 
-			listener.waitForNewLeader(timeout.toMillis());
+			listener.waitForNewLeader(timeout);
 
 			assertEquals(listener.getLeaderSessionID(), contender.getLeaderSessionID());
 			assertEquals(TEST_URL, listener.getAddress());
@@ -348,10 +360,10 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 
 			leaderRetrievalService2.start(listener2);
 
-			listener2.waitForNewLeader(timeout.toMillis());
+			listener2.waitForNewLeader(timeout);
 
 			if (FAULTY_CONTENDER_URL.equals(listener2.getAddress())) {
-				listener2.waitForNewLeader(timeout.toMillis());
+				listener2.waitForNewLeader(timeout);
 			}
 
 			assertEquals(listener2.getLeaderSessionID(), contender.getLeaderSessionID());
@@ -422,7 +434,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			leaderElectionService.start(testingContender);
 			leaderRetrievalService.start(listener);
 
-			testingContender.waitForError(timeout.toMillis());
+			testingContender.waitForError(timeout);
 
 			assertNotNull(testingContender.getError());
 			assertEquals(testException, testingContender.getError().getCause());
@@ -473,11 +485,11 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 
 			leaderElectionService.start(testingContender);
 
-			testingContender.waitForLeader(timeout.toMillis());
+			testingContender.waitForLeader(timeout);
 
 			Future<Boolean> existsFuture = existsListener.nodeExists();
 
-			Await.result(existsFuture, timeout);
+			existsFuture.get(timeout, TimeUnit.MILLISECONDS);
 
 			cache.getListenable().addListener(deletedCacheListener);
 
@@ -489,7 +501,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			Future<Boolean> deletedFuture = deletedCacheListener.nodeDeleted();
 
 			// make sure that the leader node has been deleted
-			Await.result(deletedFuture, timeout);
+			deletedFuture.get(timeout, TimeUnit.MILLISECONDS);
 
 			leaderRetrievalService.start(listener);
 
@@ -516,9 +528,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		}
 	}
 
-	public static class ExistsCacheListener implements NodeCacheListener {
+	private static class ExistsCacheListener implements NodeCacheListener {
 
-		final Promise<Boolean> existsPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+		final CompletableFuture<Boolean> existsPromise = new CompletableFuture<>();
 
 		final NodeCache cache;
 
@@ -527,23 +539,23 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		}
 
 		public Future<Boolean> nodeExists() {
-			return existsPromise.future();
+			return existsPromise;
 		}
 
 		@Override
 		public void nodeChanged() throws Exception {
 			ChildData data = cache.getCurrentData();
 
-			if (data != null && !existsPromise.isCompleted()) {
-				existsPromise.success(true);
+			if (data != null && !existsPromise.isDone()) {
+				existsPromise.complete(true);
 				cache.getListenable().removeListener(this);
 			}
 		}
 	}
 
-	public static class DeletedCacheListener implements NodeCacheListener {
+	private static class DeletedCacheListener implements NodeCacheListener {
 
-		final Promise<Boolean> deletedPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+		final CompletableFuture<Boolean> deletedPromise = new CompletableFuture<>();
 
 		final NodeCache cache;
 
@@ -552,15 +564,15 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		}
 
 		public Future<Boolean> nodeDeleted() {
-			return deletedPromise.future();
+			return deletedPromise;
 		}
 
 		@Override
 		public void nodeChanged() throws Exception {
 			ChildData data = cache.getCurrentData();
 
-			if (data == null && !deletedPromise.isCompleted()) {
-				deletedPromise.success(true);
+			if (data == null && !deletedPromise.isDone()) {
+				deletedPromise.complete(true);
 				cache.getListenable().removeListener(this);
 			}
 		}


[5/5] flink git commit: [FLINK-9573] Extend LeaderElectionService#hasLeadership to take leader session id

Posted by tr...@apache.org.
[FLINK-9573] Extend LeaderElectionService#hasLeadership to take leader session id

The new LeaderElectionService#hasLeadership also takes the leader session id and verifies whether
this is the correct leader session id associated with the leadership.

This closes #6154.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/363de6b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/363de6b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/363de6b6

Branch: refs/heads/master
Commit: 363de6b643689f64564270857f1daf7f6c59257f
Parents: 467ad78
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 12 14:24:59 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 14 15:28:03 2018 +0200

----------------------------------------------------------------------
 .../flink/docs/rest/RestAPIDocGenerator.java    |   4 +-
 .../nonha/embedded/EmbeddedLeaderService.java   |   7 +-
 .../SingleLeaderElectionService.java            |   5 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   2 +-
 .../leaderelection/LeaderElectionService.java   |   9 +-
 .../StandaloneLeaderElectionService.java        |   6 +-
 .../ZooKeeperLeaderElectionService.java         |   8 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../leaderelection/LeaderElectionTest.java      | 245 +++++++++++++++++++
 .../TestingLeaderElectionService.java           |   8 +-
 .../testingUtils/TestingJobManagerLike.scala    |   2 +-
 11 files changed, 282 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index e69cc7e..cebde56 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -54,6 +54,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -369,7 +371,7 @@ public class RestAPIDocGenerator {
 			}
 
 			@Override
-			public boolean hasLeadership() {
+			public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
 				return false;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
index f89cd2c..dafefcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
@@ -62,7 +63,7 @@ public class EmbeddedLeaderService {
 	private EmbeddedLeaderElectionService currentLeaderConfirmed;
 
 	/** fencing UID for the current leader (or proposed leader). */
-	private UUID currentLeaderSessionId;
+	private volatile UUID currentLeaderSessionId;
 
 	/** the cached address of the current leader. */
 	private String currentLeaderAddress;
@@ -356,8 +357,8 @@ public class EmbeddedLeaderService {
 		}
 
 		@Override
-		public boolean hasLeadership() {
-			return isLeader;
+		public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+			return isLeader && leaderSessionId.equals(currentLeaderSessionId);
 		}
 
 		void shutdown(Exception cause) {

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
index a56b077..bb7f44b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import java.util.HashSet;
@@ -162,9 +163,9 @@ public class SingleLeaderElectionService implements LeaderElectionService {
 	}
 
 	@Override
-	public boolean hasLeadership() {
+	public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
 		synchronized (lock) {
-			return leader != null;
+			return proposedLeader != null && leaderSessionId.equals(leaderId);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index f04e4af..d867ab3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -340,7 +340,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 	}
 
 	private void confirmLeaderSessionIdIfStillLeader(UUID leaderSessionId, CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture) {
-		if (leaderElectionService.hasLeadership()) {
+		if (leaderElectionService.hasLeadership(leaderSessionId)) {
 			currentLeaderGatewayFuture.complete(jobMaster.getSelfGateway(JobMasterGateway.class));
 			leaderElectionService.confirmLeaderSessionID(leaderSessionId);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
index 6cba141..10f2f68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import javax.annotation.Nonnull;
+
 import java.util.UUID;
 
 /**
@@ -62,10 +64,11 @@ public interface LeaderElectionService {
 
 	/**
 	 * Returns true if the {@link LeaderContender} with which the service has been started owns
-	 * currently the leadership.
+	 * currently the leadership under the given leader session id.
+	 *
+	 * @param leaderSessionId identifying the current leader
 	 *
 	 * @return true if the associated {@link LeaderContender} is the leader, otherwise false
 	 */
-	boolean hasLeadership();
-
+	boolean hasLeadership(@Nonnull UUID leaderSessionId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
index a956a5e..ec997a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.leaderelection;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.util.UUID;
 
 /**
@@ -58,7 +60,7 @@ public class StandaloneLeaderElectionService implements LeaderElectionService {
 	public void confirmLeaderSessionID(UUID leaderSessionID) {}
 
 	@Override
-	public boolean hasLeadership() {
-		return true;
+	public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+		return (contender != null && HighAvailabilityServices.DEFAULT_LEADER_ID.equals(leaderSessionId));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index dc0f3ae..87684c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -37,6 +37,8 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
@@ -66,7 +68,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 	/** ZooKeeper path of the node which stores the current leader information. */
 	private final String leaderPath;
 
-	private UUID issuedLeaderSessionID;
+	private volatile UUID issuedLeaderSessionID;
 
 	private volatile UUID confirmedLeaderSessionID;
 
@@ -205,8 +207,8 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 	}
 
 	@Override
-	public boolean hasLeadership() {
-		return leaderLatch.hasLeadership();
+	public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+		return leaderLatch.hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4e4eb4c..cebff58 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -490,7 +490,8 @@ class JobManager(
 
             Option(submittedJobGraphOption) match {
               case Some(submittedJobGraph) =>
-                if (!leaderElectionService.hasLeadership()) {
+                if (leaderSessionID.isEmpty ||
+                  !leaderElectionService.hasLeadership(leaderSessionID.get)) {
                   // we've lost leadership. mission: abort.
                   log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.")
                 } else {
@@ -1381,7 +1382,8 @@ class JobManager(
           jobInfo.notifyClients(
             decorateMessage(JobSubmitSuccess(jobGraph.getJobID)))
 
-          if (leaderElectionService.hasLeadership) {
+          if (leaderSessionID.isDefined &&
+            leaderElectionService.hasLeadership(leaderSessionID.get)) {
             // There is a small chance that multiple job managers schedule the same job after if
             // they try to recover at the same time. This will eventually be noticed, but can not be
             // ruled out from the beginning.

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
new file mode 100644
index 0000000..44016f8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for leader election.
+ */
+@RunWith(Parameterized.class)
+public class LeaderElectionTest extends TestLogger {
+
+	enum LeaderElectionType {
+		ZooKeeper,
+		Embedded,
+		Standalone
+	}
+
+	@Parameterized.Parameters(name = "Leader election: {0}")
+	public static Collection<LeaderElectionType> parameters () {
+		return Arrays.asList(LeaderElectionType.values());
+	}
+
+	private final ServiceClass serviceClass;
+
+	public LeaderElectionTest(LeaderElectionType leaderElectionType) {
+		switch(leaderElectionType) {
+			case ZooKeeper:
+				serviceClass = new ZooKeeperServiceClass();
+				break;
+			case Embedded:
+				serviceClass = new EmbeddedServiceClass();
+				break;
+			case Standalone:
+				serviceClass = new StandaloneServiceClass();
+				break;
+			default:
+				throw new IllegalArgumentException(String.format("Unknown leader election type: %s.", leaderElectionType));
+		}
+	}
+
+	@Before
+	public void setup() throws Exception {
+		serviceClass.setup();
+	}
+
+	@After
+	public void teardown() throws Exception {
+		serviceClass.teardown();
+	}
+
+	@Test
+	public void testHasLeadership() throws Exception {
+		final LeaderElectionService leaderElectionService = serviceClass.createLeaderElectionService();
+		final ManualLeaderContender manualLeaderContender = new ManualLeaderContender();
+
+		try {
+			assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()), is(false));
+
+			leaderElectionService.start(manualLeaderContender);
+
+			final UUID leaderSessionId = manualLeaderContender.waitForLeaderSessionId();
+
+			assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(true));
+			assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()), is(false));
+
+			leaderElectionService.confirmLeaderSessionID(leaderSessionId);
+
+			assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(true));
+
+			leaderElectionService.stop();
+
+			assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(false));
+		} finally {
+			manualLeaderContender.rethrowError();
+		}
+	}
+
+	private static final class ManualLeaderContender implements LeaderContender {
+
+		private static final UUID NULL_LEADER_SESSION_ID = new UUID(0L, 0L);
+
+		private final ArrayBlockingQueue<UUID> leaderSessionIds = new ArrayBlockingQueue<>(10);
+
+		private volatile Exception exception;
+
+		@Override
+		public void grantLeadership(UUID leaderSessionID) {
+			leaderSessionIds.offer(leaderSessionID);
+		}
+
+		@Override
+		public void revokeLeadership() {
+			leaderSessionIds.offer(NULL_LEADER_SESSION_ID);
+		}
+
+		@Override
+		public String getAddress() {
+			return "foobar";
+		}
+
+		@Override
+		public void handleError(Exception exception) {
+			this.exception = exception;
+		}
+
+		void rethrowError() throws Exception {
+			if (exception != null) {
+				throw exception;
+			}
+		}
+
+		UUID waitForLeaderSessionId() throws InterruptedException {
+			return leaderSessionIds.take();
+		}
+	}
+
+	private interface ServiceClass {
+		void setup() throws Exception;
+
+		void teardown() throws Exception;
+
+		LeaderElectionService createLeaderElectionService() throws Exception;
+	}
+
+	private static final class ZooKeeperServiceClass implements ServiceClass {
+
+		private TestingServer testingServer;
+
+		private CuratorFramework client;
+
+		private Configuration configuration;
+
+		@Override
+		public void setup() throws Exception {
+			try {
+				testingServer = new TestingServer();
+			} catch (Exception e) {
+				throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
+			}
+
+			configuration = new Configuration();
+
+			configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+			configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+			client = ZooKeeperUtils.startCuratorFramework(configuration);
+		}
+
+		@Override
+		public void teardown() throws Exception {
+			if (client != null) {
+				client.close();
+				client = null;
+			}
+
+			if (testingServer != null) {
+				testingServer.stop();
+				testingServer = null;
+			}
+		}
+
+		@Override
+		public LeaderElectionService createLeaderElectionService() throws Exception {
+			return ZooKeeperUtils.createLeaderElectionService(client, configuration);
+		}
+	}
+
+	private static final class EmbeddedServiceClass implements ServiceClass {
+		private EmbeddedLeaderService embeddedLeaderService;
+
+		@Override
+		public void setup() {
+			embeddedLeaderService = new EmbeddedLeaderService(TestingUtils.defaultExecutionContext());
+		}
+
+		@Override
+		public void teardown() {
+			if (embeddedLeaderService != null) {
+				embeddedLeaderService.shutdown();
+				embeddedLeaderService = null;
+			}
+		}
+
+		@Override
+		public LeaderElectionService createLeaderElectionService() throws Exception {
+			return embeddedLeaderService.createLeaderElectionService();
+		}
+	}
+
+	private static final class StandaloneServiceClass implements ServiceClass {
+
+		@Override
+		public void setup() throws Exception {
+			// noop
+		}
+
+		@Override
+		public void teardown() throws Exception {
+			// noop
+		}
+
+		@Override
+		public LeaderElectionService createLeaderElectionService() throws Exception {
+			return new StandaloneLeaderElectionService();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index 4ecb9b61..25f97c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import javax.annotation.Nonnull;
+
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
@@ -30,6 +32,7 @@ public class TestingLeaderElectionService implements LeaderElectionService {
 	private LeaderContender contender;
 	private boolean hasLeadership = false;
 	private CompletableFuture<UUID> confirmationFuture = null;
+	private UUID issuedLeaderSessionId = null;
 
 	/**
 	 * Gets a future that completes when leadership is confirmed.
@@ -58,8 +61,8 @@ public class TestingLeaderElectionService implements LeaderElectionService {
 	}
 
 	@Override
-	public synchronized boolean hasLeadership() {
-		return hasLeadership;
+	public synchronized boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+		return hasLeadership && leaderSessionId.equals(issuedLeaderSessionId);
 	}
 
 	public synchronized CompletableFuture<UUID> isLeader(UUID leaderSessionID) {
@@ -68,6 +71,7 @@ public class TestingLeaderElectionService implements LeaderElectionService {
 		}
 		confirmationFuture = new CompletableFuture<>();
 		hasLeadership = true;
+		issuedLeaderSessionId = leaderSessionID;
 		contender.grantLeadership(leaderSessionID);
 
 		return confirmationFuture;

http://git-wip-us.apache.org/repos/asf/flink/blob/363de6b6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 2c49d0f..0640f39 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -409,7 +409,7 @@ trait TestingJobManagerLike extends FlinkActor {
       }
 
     case NotifyWhenLeader =>
-      if (leaderElectionService.hasLeadership) {
+      if (leaderSessionID.isDefined && leaderElectionService.hasLeadership(leaderSessionID.get)) {
         sender() ! true
       } else {
         waitForLeader += sender()


[2/5] flink git commit: [hotfix] Fix checkstyle violations in SingleLeaderElectionService

Posted by tr...@apache.org.
[hotfix] Fix checkstyle violations in SingleLeaderElectionService


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cacdb683
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cacdb683
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cacdb683

Branch: refs/heads/master
Commit: cacdb68310db70ffc1a5b22dcd9b62b81855a544
Parents: 363de6b
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 12 14:40:13 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 14 15:28:03 2018 +0200

----------------------------------------------------------------------
 .../SingleLeaderElectionService.java            | 37 +++++++++++---------
 1 file changed, 20 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cacdb683/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
index bb7f44b..ef06b16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+
 import java.util.HashSet;
 import java.util.UUID;
 import java.util.concurrent.Executor;
@@ -40,10 +41,10 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * An implementation of the {@link LeaderElectionService} interface that handles a single
  * leader contender. When started, this service immediately grants the contender the leadership.
- * 
+ *
  * <p>The implementation accepts a single static leader session ID and is hence compatible with
  * pre-configured single leader (no leader failover) setups.
- * 
+ *
  * <p>This implementation supports a series of leader listeners that receive notifications about
  * the leader contender.
  */
@@ -53,31 +54,31 @@ public class SingleLeaderElectionService implements LeaderElectionService {
 
 	// ------------------------------------------------------------------------
 
-	/** lock for all operations on this instance */
+	/** lock for all operations on this instance. */
 	private final Object lock = new Object();
 
-	/** The executor service that dispatches notifications */
+	/** The executor service that dispatches notifications. */
 	private final Executor notificationExecutor;
 
-	/** The leader ID assigned to the immediate leader */
+	/** The leader ID assigned to the immediate leader. */
 	private final UUID leaderId;
 
 	@GuardedBy("lock")
 	private final HashSet<EmbeddedLeaderRetrievalService> listeners;
 
-	/** The currently proposed leader */
+	/** The currently proposed leader. */
 	@GuardedBy("lock")
 	private volatile LeaderContender proposedLeader;
 
-	/** The confirmed leader */
+	/** The confirmed leader. */
 	@GuardedBy("lock")
 	private volatile LeaderContender leader;
 
-	/** The address of the confirmed leader */
+	/** The address of the confirmed leader. */
 	@GuardedBy("lock")
 	private volatile String leaderAddress;
 
-	/** Flag marking this service as shutdown, meaning it cannot be started again */
+	/** Flag marking this service as shutdown, meaning it cannot be started again. */
 	@GuardedBy("lock")
 	private volatile boolean shutdown;
 
@@ -86,7 +87,7 @@ public class SingleLeaderElectionService implements LeaderElectionService {
 	/**
 	 * Creates a new leader election service. The service assigns the given leader ID
 	 * to the leader contender.
-	 * 
+	 *
 	 * @param leaderId The constant leader ID assigned to the leader.
 	 */
 	public SingleLeaderElectionService(Executor notificationsDispatcher, UUID leaderId) {
@@ -172,7 +173,7 @@ public class SingleLeaderElectionService implements LeaderElectionService {
 	void errorOnGrantLeadership(LeaderContender contender, Throwable error) {
 		LOG.warn("Error notifying leader listener about new leader", error);
 		contender.handleError(error instanceof Exception ? (Exception) error : new Exception(error));
-		
+
 		synchronized (lock) {
 			if (proposedLeader == contender) {
 				proposedLeader = null;
@@ -186,7 +187,9 @@ public class SingleLeaderElectionService implements LeaderElectionService {
 	// ------------------------------------------------------------------------
 
 	public boolean isShutdown() {
-		return shutdown;
+		synchronized (lock) {
+			return shutdown;
+		}
 	}
 
 	public void shutdown() {
@@ -232,8 +235,10 @@ public class SingleLeaderElectionService implements LeaderElectionService {
 	// ------------------------------------------------------------------------
 
 	public LeaderRetrievalService createLeaderRetrievalService() {
-		checkState(!shutdown, "leader election service is shut down");
-		return new EmbeddedLeaderRetrievalService();
+		synchronized (lock) {
+			checkState(!shutdown, "leader election service is shut down");
+			return new EmbeddedLeaderRetrievalService();
+		}
 	}
 
 	void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
@@ -347,7 +352,7 @@ public class SingleLeaderElectionService implements LeaderElectionService {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * This runnable informs a leader listener of a new leader
+	 * This runnable informs a leader listener of a new leader.
 	 */
 	private static class NotifyOfLeaderCall implements Runnable {
 
@@ -382,6 +387,4 @@ public class SingleLeaderElectionService implements LeaderElectionService {
 			}
 		}
 	}
-
-
 }