You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/04 14:18:47 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #14301: [FLINK-20468][minicluster] Enable leadership control in MiniCluster to test JM failover

tillrohrmann commented on a change in pull request #14301:
URL: https://github.com/apache/flink/pull/14301#discussion_r536125712



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
##########
@@ -29,13 +30,13 @@
  * Simple {@link CheckpointRecoveryFactory} which creates a
  * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter} per {@link JobID}.
  */
-public class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
+public class PerJobCheckpointRecoveryFactory implements CheckpointRecoveryFactory {

Review comment:
       Why did you call it `PerJobCheckpointRecoveryFactory`? Is it because it stores for different jobs the services? To me it sounds that this factory only works for a single job.

##########
File path: flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
##########
@@ -76,46 +68,15 @@
 	@ClassRule
 	public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
 
-	private static TestingMiniCluster miniCluster;
-
-	private static EmbeddedHaServicesWithLeadershipControl highAvailabilityServices;
-
-	@BeforeClass
-	public static void setupMiniCluster() throws Exception  {
-		highAvailabilityServices =
-			new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
-
-		final Configuration configuration = createConfiguration();
-
-		miniCluster = new TestingMiniCluster(
-			new TestingMiniClusterConfiguration.Builder()
-				.setConfiguration(configuration)
-				.setNumTaskManagers(1)
-				.setNumSlotsPerTaskManager(PARALLELISM)
-				.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-				.build(),
-			() -> highAvailabilityServices);
-
-		miniCluster.start();
-	}
-
-	private static Configuration createConfiguration() throws IOException {
-		final Configuration configuration = new Configuration();
-		final String checkPointDir = Path.fromLocalFile(TMP_FOLDER.newFolder()).toUri().toString();
-		configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkPointDir);
-		return configuration;
-	}
-
-	@AfterClass
-	public static void shutdownMiniCluster() throws Exception {
-		if (miniCluster != null) {
-			miniCluster.close();
-		}
-		if (highAvailabilityServices != null) {
-			highAvailabilityServices.closeAndCleanupAllData();
-			highAvailabilityServices = null;
-		}
-	}
+	@ClassRule
+	public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
+		new MiniClusterResourceConfiguration
+			.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(PARALLELISM)
+			.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+			.enableEmbeddedHaLeadershipControl()

Review comment:
       maybe call `withHaLeadershipControl()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -938,6 +962,13 @@ private void terminateMiniClusterServices() throws Exception {
 		}
 	}
 
+	@Nullable
+	private static BiFunction<Configuration, Executor, HighAvailabilityServices> createHighAvailabilityServicesFactory(
+			boolean enableEmbeddedHaLeadershipControl) {
+		return enableEmbeddedHaLeadershipControl ?
+			(conf, executor) -> new EmbeddedHaServicesWithLeadershipControl(executor) : null;
+	}

Review comment:
       For what do we need this method here?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
##########
@@ -176,6 +184,11 @@ public Builder setCommonBindAddress(String commonBindAddress) {
 			return this;
 		}
 
+		public Builder enableEmbeddedHaLeadershipControl(boolean enableEmbeddedHaLeadershipControl) {

Review comment:
       maybe rename into `withHaLeadershipControl` and add a description that this overrides the HA config option.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
##########
@@ -20,27 +20,47 @@
 
 import org.apache.flink.api.common.JobID;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
 /**
- * Simple {@link CheckpointRecoveryFactory} which is initialized with a
- * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter}.
+ * Simple {@link CheckpointRecoveryFactory} which creates a
+ * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter} per {@link JobID}.
  */
 public class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
+	private final Function<Integer, CompletedCheckpointStore> completedCheckpointStorePerJobFactory;
+	private final Supplier<CheckpointIDCounter> checkpointIDCounterPerJobFactory;
+	private final Map<JobID, CompletedCheckpointStore> store;
+	private final Map<JobID, CheckpointIDCounter> counter;
 
-	private final CompletedCheckpointStore store;
-	private final CheckpointIDCounter counter;
-
-	public TestingCheckpointRecoveryFactory(CompletedCheckpointStore store, CheckpointIDCounter counter) {
-		this.store = store;
-		this.counter = counter;
+	public TestingCheckpointRecoveryFactory(
+			Function<Integer, CompletedCheckpointStore> completedCheckpointStorePerJobFactory,
+			Supplier<CheckpointIDCounter> checkpointIDCounterPerJobFactory) {
+		this.completedCheckpointStorePerJobFactory = completedCheckpointStorePerJobFactory;
+		this.checkpointIDCounterPerJobFactory = checkpointIDCounterPerJobFactory;
+		this.store = new HashMap<>();
+		this.counter = new HashMap<>();
 	}
 
 	@Override
-	public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception {
-		return store;
+	public CompletedCheckpointStore createCheckpointStore(
+			JobID jobId,
+			int maxNumberOfCheckpointsToRetain,
+			ClassLoader userClassLoader) {
+		return store.computeIfAbsent(jobId, jId ->
+			completedCheckpointStorePerJobFactory.apply(maxNumberOfCheckpointsToRetain));
 	}
 
 	@Override
-	public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception {
-		return counter;
+	public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
+		return counter.computeIfAbsent(jobId, jId -> checkpointIDCounterPerJobFactory.get());
+	}
+
+	public static CheckpointRecoveryFactory createSamePerJob(

Review comment:
       Maybe rename into `useSameServicesForAllJobs()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
##########
@@ -54,12 +56,13 @@ public MiniClusterConfiguration(
 			Configuration configuration,
 			int numTaskManagers,
 			RpcServiceSharing rpcServiceSharing,
-			@Nullable String commonBindAddress) {
-
+			@Nullable String commonBindAddress,
+			boolean enableEmbeddedHaLeadershipControl) {

Review comment:
       I am always a bit more in favour of passing enum instead of booleans because enum values are more descriptive than `true` and `false`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -428,11 +434,29 @@ DispatcherResourceManagerComponentFactory createDispatcherResourceManagerCompone
 	}
 
 	@VisibleForTesting
-	protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception {
+	protected HighAvailabilityServices createHighAvailabilityServices(
+			Configuration configuration,
+			Executor executor) throws Exception {
 		LOG.info("Starting high-availability services");
-		return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
-			configuration,
-			executor);
+		return miniClusterConfiguration.embeddedHaLeadershipControlEnabled() ?
+			new EmbeddedHaServicesWithLeadershipControl(executor) :
+			HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration, executor);
+	}
+
+	/**
+	 * Returns {@link HaLeadershipControl} if enabled.
+	 *
+	 * <p>{@link HaLeadershipControl} allows granting and revoking leadership of HA components,
+	 * e.g. JobManager. The method return {@link Optional#empty()} if the control is not enabled in
+	 * {@link MiniClusterConfiguration}.
+	 *
+	 * <p>Enabling this feature disables {@link HighAvailabilityOptions#HA_MODE} option.

Review comment:
       This paragraph should be added to `enableHaLeadershipControl` method on the builder, I guess.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org