You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/26 12:55:46 UTC

[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #112: [FLINK-26787] Initial implementation of FlinkSessionJobController and…

bgeng777 commented on a change in pull request #112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/112#discussion_r835761998



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -116,6 +136,83 @@ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, Configuration conf)
+            throws Exception {
+        JarRunResponseBody jarRunResponseBody =
+                jarUpload(sessionJob, conf)
+                        .handle(
+                                (response, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to upload user jar.", throwable);
+                                        throw new FlinkRuntimeException(throwable);
+                                    } else {
+                                        return jarRun(sessionJob, response, conf);
+                                    }
+                                })
+                        .get();
+
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, Configuration conf) {
+        final String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (final RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            final JarRunHeaders headers = JarRunHeaders.getInstance();
+            final JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            final JobSpec job = sessionJob.getSpec().getJob();
+            final JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            null,
+                            sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private CompletableFuture<JarUploadResponseBody> jarUpload(
+            FlinkSessionJob sessionJob, final Configuration conf) throws Exception {
+        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();

Review comment:
       nit: we can add some `final` modifiers to make codes nicer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org