You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/02 16:29:54 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

tillrohrmann commented on a change in pull request #13530:
URL: https://github.com/apache/flink/pull/13530#discussion_r498922308



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
##########
@@ -43,23 +43,30 @@
 /**
  * A {@link JobClient} for a {@link MiniCluster}.
  */
-public final class PerJobMiniClusterJobClient implements JobClient, CoordinationRequestGateway {
+public final class MiniClusterJobClient implements JobClient, CoordinationRequestGateway {
 
-	private static final Logger LOG = LoggerFactory.getLogger(PerJobMiniClusterJobClient.class);
+	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobClient.class);
 
 	private final JobID jobID;
 	private final MiniCluster miniCluster;
-	private final CompletableFuture<JobResult> jobResultFuture;
 	private final ClassLoader classLoader;
+	private final CompletableFuture<JobResult> jobResultFuture;
 
-	public PerJobMiniClusterJobClient(JobID jobID, MiniCluster miniCluster, ClassLoader classLoader) {
+	/**
+	 * Creates a {@link MiniClusterJobClient} for the given {@link JobID} and {@link MiniCluster}.
+	 * This will shut down the {@code MiniCluster} after job result retrieval if {@code
+	 * shutdownCluster} is {@code true}.
+	 */
+	public MiniClusterJobClient(JobID jobID, MiniCluster miniCluster, ClassLoader classLoader, boolean shutdownCluster) {

Review comment:
       I tend to prefer enums instead of booleans for parameters because it makes it explicit what `true` and `false` means.

##########
File path: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
##########
@@ -19,52 +19,37 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.client.deployment.executors.LocalExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.JobExecutor;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.util.Preconditions;
 
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
 /**
- * A {@link ExecutionEnvironment} implementation which executes its jobs on a
- * {@link MiniCluster}.
+ * A {@link ExecutionEnvironment} implementation which executes its jobs on a {@link MiniCluster}.
  */
 public class TestEnvironment extends ExecutionEnvironment {
 
-	private final JobExecutor jobExecutor;
-
-	private final Collection<Path> jarFiles;
-
-	private final Collection<URL> classPaths;
+	private final MiniCluster miniCluster;
 
 	private TestEnvironment lastEnv;
 
 	public TestEnvironment(
-			JobExecutor jobExecutor,
+			MiniCluster miniCluster,
 			int parallelism,
 			boolean isObjectReuseEnabled,
 			Collection<Path> jarFiles,
 			Collection<URL> classPaths) {
-		this.jobExecutor = Preconditions.checkNotNull(jobExecutor);
-		this.jarFiles = Preconditions.checkNotNull(jarFiles);
-		this.classPaths = Preconditions.checkNotNull(classPaths);
-		getConfiguration().set(DeploymentOptions.TARGET, LocalExecutor.NAME);
-		getConfiguration().set(DeploymentOptions.ATTACHED, true);
+		super(
+				new MiniClusterPipelineExecutorServiceLoader(miniCluster),
+				MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths),
+				null);

Review comment:
       nit: this looks too deeply intended.

##########
File path: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
##########
@@ -78,58 +63,33 @@ public TestEnvironment(
 	}
 
 	public TestEnvironment(
-			JobExecutor executor,
+			MiniCluster executor,
 			int parallelism,
 			boolean isObjectReuseEnabled) {
 		this(
-			executor,
-			parallelism,
-			isObjectReuseEnabled,
-			Collections.emptyList(),
-			Collections.emptyList());
+				executor,
+				parallelism,
+				isObjectReuseEnabled,
+				Collections.emptyList(),
+				Collections.emptyList());

Review comment:
       I think an extra tab of indentation is probably not correct here.

##########
File path: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
##########
@@ -174,15 +129,15 @@ public ExecutionEnvironment createExecutionEnvironment() {
 	 * environment executes the given jobs on a Flink mini cluster with the given default
 	 * parallelism and the additional jar files and class paths.
 	 *
-	 * @param jobExecutor The executor to run the jobs on
+	 * @param miniCluster The MiniCluster to execute jobs on.
 	 * @param parallelism The default parallelism
 	 */
-	public static void setAsContext(final JobExecutor jobExecutor, final int parallelism) {
+	public static void setAsContext(final MiniCluster miniCluster, final int parallelism) {
 		setAsContext(
-			jobExecutor,
-			parallelism,
-			Collections.emptyList(),
-			Collections.emptyList());
+				miniCluster,
+				parallelism,
+				Collections.emptyList(),
+				Collections.emptyList());

Review comment:
       nit: I am not sure about this indentation here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org