You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by pi...@apache.org on 2021/10/24 18:23:32 UTC
[submarine] branch master updated: SUBMARINE-1021. Experiment
Watcher
This is an automated email from the ASF dual-hosted git repository.
pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 71ccbf5 SUBMARINE-1021. Experiment Watcher
71ccbf5 is described below
commit 71ccbf50701da9eb0c5ef8f5d1fae372a44a1154
Author: noidname01 <ti...@gmail.com>
AuthorDate: Sun Oct 24 16:38:13 2021 +0800
SUBMARINE-1021. Experiment Watcher
### What is this PR for?
Use k8s java client to build watchers of TFJobs and PytorchJobs, logging status when the experiment status change.
[Watcher examples](https://github.com/kubernetes-client/java/blob/master/examples/examples-release-12/src/main/java/io/kubernetes/client/examples/WatchExample.java)
We will create a websocket connection between server and workbench, and modify the frontend logic of workbench in the following PRs.
### What type of PR is it?
[Feature]
### Todos
None
### What is the Jira issue?
https://issues.apache.org/jira/projects/SUBMARINE/issues/SUBMARINE-1021
### How should this be tested?
It will run with the initializing of k8s submitter, then keep watching the experiments.
You can see the log when status of experiment changing.
### Screenshots (if appropriate)
https://user-images.githubusercontent.com/55401762/136215425-5dcf4c15-3810-42b5-9511-ceb00b781405.mp4
### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? No
* Does this need new documentation? No
Author: noidname01 <ti...@gmail.com>
Signed-off-by: Kevin <pi...@apache.org>
Closes #767 from noidname01/SUBMARINE-1021 and squashes the following commits:
9daa4af2 [noidname01] add error handling
00b341d8 [noidname01] delete debugging
423434ca [noidname01] add error log
69acfb58 [noidname01] delete comment
1b50a75e [noidname01] experiment watcher complete
9c6ace57 [noidname01] informer(WIP)
9f86dd53 [noidname01] add informer(WIP)
---
.../server/submitter/k8s/K8sSubmitter.java | 111 ++++++++++++++++++++-
1 file changed, 106 insertions(+), 5 deletions(-)
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
index 0a23e58..cac7398 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
@@ -26,9 +26,14 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
+import com.squareup.okhttp.OkHttpClient;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
@@ -46,8 +51,10 @@ import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.models.V1PodList;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.models.V1Status;
+import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;
+
import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
import org.apache.submarine.server.api.Submitter;
@@ -68,6 +75,8 @@ import org.apache.submarine.server.submitter.k8s.model.ingressroute.IngressRoute
import org.apache.submarine.server.submitter.k8s.model.ingressroute.IngressRouteSpec;
import org.apache.submarine.server.submitter.k8s.model.ingressroute.SpecRoute;
import org.apache.submarine.server.submitter.k8s.model.middlewares.Middlewares;
+import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
import org.apache.submarine.server.submitter.k8s.parser.NotebookSpecParser;
import org.apache.submarine.server.submitter.k8s.parser.ServeSpecParser;
@@ -79,7 +88,6 @@ import org.apache.submarine.server.submitter.k8s.util.OwnerReferenceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* JobSubmitter for Kubernetes Cluster.
*/
@@ -100,12 +108,13 @@ public class K8sSubmitter implements Submitter {
private AppsV1Api appsV1Api;
+ private ApiClient client = null;
+
public K8sSubmitter() {
}
@Override
public void initialize(SubmarineConfiguration conf) {
- ApiClient client = null;
try {
String path = System.getenv(KUBECONFIG_ENV);
KubeConfig config = KubeConfig.loadKubeConfig(new FileReader(path));
@@ -119,6 +128,10 @@ public class K8sSubmitter implements Submitter {
throw new SubmarineRuntimeException(500, "Initialize K8s submitter failed.");
}
} finally {
+ // let watcher can wait until the next change
+ OkHttpClient httpClient = client.getHttpClient();
+ httpClient.setReadTimeout(0, TimeUnit.SECONDS);
+ client.setHttpClient(httpClient);
Configuration.setDefaultApiClient(client);
}
@@ -133,7 +146,12 @@ public class K8sSubmitter implements Submitter {
appsV1Api = new AppsV1Api();
}
- client.setDebugging(true);
+ try {
+ watchExperiment();
+ } catch (Exception e){
+ LOG.error("Experiment watch failed. " + e.getMessage(), e);
+ }
+
}
@Override
@@ -164,7 +182,7 @@ public class K8sSubmitter implements Submitter {
try {
MLJob mlJob = ExperimentSpecParser.parseJob(spec);
mlJob.getMetadata().setNamespace(getServerNamespace());
-
+
Object object = api.getNamespacedCustomObject(mlJob.getGroup(), mlJob.getVersion(),
mlJob.getMetadata().getNamespace(), mlJob.getPlural(), mlJob.getMetadata().getName());
experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_RESULT);
@@ -451,7 +469,7 @@ public class K8sSubmitter implements Submitter {
if (latestEvent.getReason().equalsIgnoreCase("Pulling")) {
notebook.setStatus(Notebook.Status.STATUS_PULLING.getValue());
notebook.setReason(latestEvent.getReason());
- }
+ }
}
} catch (ApiException e) {
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
@@ -565,6 +583,89 @@ public class K8sSubmitter implements Submitter {
}
}
+ public void watchExperiment() throws ApiException{
+
+ Watch<MLJob> watchTF = Watch.createWatch(
+ client,
+ api.listNamespacedCustomObjectCall(
+ TFJob.CRD_TF_GROUP_V1,
+ TFJob.CRD_TF_VERSION_V1,
+ getServerNamespace(),
+ TFJob.CRD_TF_PLURAL_V1,
+ "true",
+ null,
+ null,
+ null,
+ null,
+ Boolean.TRUE,
+ null,
+ null
+ ),
+ new TypeToken<Watch.Response<MLJob>>() {}.getType()
+ );
+
+ Watch<MLJob> watchPytorch = Watch.createWatch(
+ client,
+ api.listNamespacedCustomObjectCall(
+ PyTorchJob.CRD_PYTORCH_GROUP_V1,
+ PyTorchJob.CRD_PYTORCH_VERSION_V1,
+ getServerNamespace(),
+ PyTorchJob.CRD_PYTORCH_PLURAL_V1,
+ "true",
+ null,
+ null,
+ null,
+ null,
+ Boolean.TRUE,
+ null,
+ null
+ ),
+ new TypeToken<Watch.Response<MLJob>>() {}.getType()
+ );
+
+ ExecutorService experimentThread = Executors.newFixedThreadPool(2);
+
+ experimentThread.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Start watching on TFJobs...");
+ for (Watch.Response<MLJob> experiment : watchTF) {
+ LOG.info("{}", experiment.object.getStatus());
+ }
+ } finally {
+ LOG.info("WATCH TFJob END");
+ try {
+ watchTF.close();
+ } catch (Exception e){
+ LOG.error("{}", e.getMessage());
+ }
+ throw new RuntimeException();
+ }
+ }
+ });
+
+ experimentThread.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Start watching on PytorchJobs...");
+ for (Watch.Response<MLJob> experiment : watchPytorch) {
+ LOG.info("{}", experiment.object.getStatus());
+ }
+ } finally {
+ LOG.info("WATCH PytorchJob END");
+ try {
+ watchPytorch.close();
+ } catch (Exception e){
+ LOG.error("{}", e.getMessage());
+ }
+ throw new RuntimeException();
+ }
+ }
+ });
+ }
+
public void createPersistentVolumeClaim(String pvcName, String namespace, String scName, String storage)
throws ApiException {
V1PersistentVolumeClaim pvc = VolumeSpecParser.parsePersistentVolumeClaim(pvcName, scName, storage);
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org