You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/01 09:44:36 UTC

[GitHub] asfgit closed pull request #6733: [FLINK-10291] Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint

asfgit closed pull request #6733: [FLINK-10291] Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/6733
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 94fc109c47b..59ab4065804 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.program;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -39,18 +40,21 @@
 public class PackagedProgramUtils {
 
 	/**
-	 * Creates a {@link JobGraph} from the given {@link PackagedProgram}.
+	 * Creates a {@link JobGraph} with a specified {@link JobID}
+	 * from the given {@link PackagedProgram}.
 	 *
 	 * @param packagedProgram to extract the JobGraph from
 	 * @param configuration to use for the optimizer and job graph generator
 	 * @param defaultParallelism for the JobGraph
+	 * @param jobID the pre-generated job id
 	 * @return JobGraph extracted from the PackagedProgram
 	 * @throws ProgramInvocationException if the JobGraph generation failed
 	 */
 	public static JobGraph createJobGraph(
 			PackagedProgram packagedProgram,
 			Configuration configuration,
-			int defaultParallelism) throws ProgramInvocationException {
+			int defaultParallelism,
+			JobID jobID) throws ProgramInvocationException {
 		Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
 		final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
 		final FlinkPlan flinkPlan;
@@ -79,11 +83,11 @@ public static JobGraph createJobGraph(
 		final JobGraph jobGraph;
 
 		if (flinkPlan instanceof StreamingPlan) {
-			jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
+			jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(jobID);
 			jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
 		} else {
 			final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration);
-			jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan);
+			jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID);
 		}
 
 		for (URL url : packagedProgram.getAllLibraries()) {
@@ -99,5 +103,22 @@ public static JobGraph createJobGraph(
 		return jobGraph;
 	}
 
+	/**
+	 * Creates a {@link JobGraph} with a random {@link JobID}
+	 * from the given {@link PackagedProgram}.
+	 *
+	 * @param packagedProgram to extract the JobGraph from
+	 * @param configuration to use for the optimizer and job graph generator
+	 * @param defaultParallelism for the JobGraph
+	 * @return JobGraph extracted from the PackagedProgram
+	 * @throws ProgramInvocationException if the JobGraph generation failed
+	 */
+	public static JobGraph createJobGraph(
+		PackagedProgram packagedProgram,
+		Configuration configuration,
+		int defaultParallelism) throws ProgramInvocationException {
+		return createJobGraph(packagedProgram, configuration, defaultParallelism, null);
+	}
+
 	private PackagedProgramUtils() {}
 }
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
index 3e0645d6859..d769b6848e7 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.container.entrypoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -45,6 +46,8 @@
 	@Nonnull
 	private final String[] programArguments;
 
+	public static final JobID FIXED_JOB_ID = new JobID(0, 0);
+
 	public ClassPathJobGraphRetriever(
 			@Nonnull String jobClassName,
 			@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@@ -59,7 +62,11 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept
 		final PackagedProgram packagedProgram = createPackagedProgram();
 		final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
 		try {
-			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
+			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(
+				packagedProgram,
+				configuration,
+				defaultParallelism,
+				FIXED_JOB_ID);
 			jobGraph.setAllowQueuedScheduling(true);
 			jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
 
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
index 6e460e1f894..83696e37c90 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
@@ -29,6 +29,7 @@
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -53,6 +54,7 @@ public void testJobGraphRetrieval() throws FlinkException {
 
 		assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
 		assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
+		assertEquals(jobGraph.getJobID(), ClassPathJobGraphRetriever.FIXED_JOB_ID);
 	}
 
 	@Test
@@ -68,5 +70,6 @@ public void testSavepointRestoreSettings() throws FlinkException {
 		final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
 		assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings)));
+		assertEquals(jobGraph.getJobID(), ClassPathJobGraphRetriever.FIXED_JOB_ID);
 	}
 }
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
index 880f2e3d5db..764134f391e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
@@ -21,15 +21,29 @@
 import java.io.File;
 import java.io.IOException;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
+import javax.annotation.Nullable;
+
 /**
  * Abstract class representing Flink Streaming plans
  * 
  */
 public abstract class StreamingPlan implements FlinkPlan {
 
-	public abstract JobGraph getJobGraph();
+	/**
+	 * Gets the assembled {@link JobGraph} with a random {@link JobID}.
+	 */
+	@SuppressWarnings("deprecation")
+	public JobGraph getJobGraph() {
+		return getJobGraph(null);
+	}
+
+	/**
+	 * Gets the assembled {@link JobGraph} with a specified {@link JobID}.
+	 */
+	public abstract JobGraph getJobGraph(@Nullable JobID jobID);
 
 	public abstract String getStreamingPlanAsJSON();
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 01768ad6df7..46a4ce22112 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -19,6 +19,7 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -652,19 +653,20 @@ private void removeVertex(StreamNode toRemove) {
 	}
 
 	/**
-	 * Gets the assembled {@link JobGraph}.
+	 * Gets the assembled {@link JobGraph} with a given job id.
 	 */
 	@SuppressWarnings("deprecation")
-	public JobGraph getJobGraph() {
+	@Override
+	public JobGraph getJobGraph(@Nullable JobID jobID) {
 		// temporarily forbid checkpointing for iterative jobs
 		if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) {
 			throw new UnsupportedOperationException(
-					"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
-							+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
-							+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
+				"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
+					+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
+					+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
 		}
 
-		return StreamingJobGraphGenerator.createJobGraph(this);
+		return StreamingJobGraphGenerator.createJobGraph(this, jobID);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0ce52250920..69213024975 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,6 +19,7 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -62,6 +63,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -83,7 +86,11 @@
 	// ------------------------------------------------------------------------
 
 	public static JobGraph createJobGraph(StreamGraph streamGraph) {
-		return new StreamingJobGraphGenerator(streamGraph).createJobGraph();
+		return createJobGraph(streamGraph, null);
+	}
+
+	public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
+		return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
 	}
 
 	// ------------------------------------------------------------------------
@@ -108,6 +115,10 @@ public static JobGraph createJobGraph(StreamGraph streamGraph) {
 	private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
 	private StreamingJobGraphGenerator(StreamGraph streamGraph) {
+		this(streamGraph, null);
+	}
+
+	private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
 		this.streamGraph = streamGraph;
 		this.defaultStreamGraphHasher = new StreamGraphHasherV2();
 		this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
@@ -121,7 +132,7 @@ private StreamingJobGraphGenerator(StreamGraph streamGraph) {
 		this.chainedPreferredResources = new HashMap<>();
 		this.physicalEdgesInOrder = new ArrayList<>();
 
-		jobGraph = new JobGraph(streamGraph.getJobName());
+		jobGraph = new JobGraph(jobID, streamGraph.getJobName());
 	}
 
 	private JobGraph createJobGraph() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services