You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by pi...@apache.org on 2021/10/24 18:23:32 UTC

[submarine] branch master updated: SUBMARINE-1021. Experiment Watcher

This is an automated email from the ASF dual-hosted git repository.

pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 71ccbf5  SUBMARINE-1021. Experiment Watcher
71ccbf5 is described below

commit 71ccbf50701da9eb0c5ef8f5d1fae372a44a1154
Author: noidname01 <ti...@gmail.com>
AuthorDate: Sun Oct 24 16:38:13 2021 +0800

    SUBMARINE-1021. Experiment Watcher
    
    ### What is this PR for?
    
    Use k8s java client to build watchers of TFJobs and PytorchJobs, logging status when the experiment status change.
    [Watcher examples](https://github.com/kubernetes-client/java/blob/master/examples/examples-release-12/src/main/java/io/kubernetes/client/examples/WatchExample.java)
    
    We will create a websocket connection between server and workbench, and modify the frontend logic of workbench in the following PRs.
    
    ### What type of PR is it?
    [Feature]
    
    ### Todos
    
    None
    
    ### What is the Jira issue?
    
    https://issues.apache.org/jira/projects/SUBMARINE/issues/SUBMARINE-1021
    
    ### How should this be tested?
    
    It will run with the initializing of k8s submitter, then keep watching the experiments.
    You can see the log when status of experiment changing.
    
    ### Screenshots (if appropriate)
    
    https://user-images.githubusercontent.com/55401762/136215425-5dcf4c15-3810-42b5-9511-ceb00b781405.mp4
    
    ### Questions:
    * Do the license files need updating? No
    * Are there breaking changes for older versions? No
    * Does this need new documentation? No
    
    Author: noidname01 <ti...@gmail.com>
    
    Signed-off-by: Kevin <pi...@apache.org>
    
    Closes #767 from noidname01/SUBMARINE-1021 and squashes the following commits:
    
    9daa4af2 [noidname01] add error handling
    00b341d8 [noidname01] delete debugging
    423434ca [noidname01] add error log
    69acfb58 [noidname01] delete comment
    1b50a75e [noidname01] experiment watcher complete
    9c6ace57 [noidname01] informer(WIP)
    9f86dd53 [noidname01] add informer(WIP)
---
 .../server/submitter/k8s/K8sSubmitter.java         | 111 ++++++++++++++++++++-
 1 file changed, 106 insertions(+), 5 deletions(-)

diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
index 0a23e58..cac7398 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
@@ -26,9 +26,14 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
 import com.google.gson.JsonSyntaxException;
+import com.squareup.okhttp.OkHttpClient;
 import io.kubernetes.client.ApiClient;
 import io.kubernetes.client.ApiException;
 import io.kubernetes.client.Configuration;
@@ -46,8 +51,10 @@ import io.kubernetes.client.models.V1Pod;
 import io.kubernetes.client.models.V1PodList;
 import io.kubernetes.client.models.V1Service;
 import io.kubernetes.client.models.V1Status;
+import io.kubernetes.client.util.Watch;
 import io.kubernetes.client.util.ClientBuilder;
 import io.kubernetes.client.util.KubeConfig;
+
 import org.apache.submarine.commons.utils.SubmarineConfiguration;
 import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
 import org.apache.submarine.server.api.Submitter;
@@ -68,6 +75,8 @@ import org.apache.submarine.server.submitter.k8s.model.ingressroute.IngressRoute
 import org.apache.submarine.server.submitter.k8s.model.ingressroute.IngressRouteSpec;
 import org.apache.submarine.server.submitter.k8s.model.ingressroute.SpecRoute;
 import org.apache.submarine.server.submitter.k8s.model.middlewares.Middlewares;
+import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
 import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
 import org.apache.submarine.server.submitter.k8s.parser.NotebookSpecParser;
 import org.apache.submarine.server.submitter.k8s.parser.ServeSpecParser;
@@ -79,7 +88,6 @@ import org.apache.submarine.server.submitter.k8s.util.OwnerReferenceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * JobSubmitter for Kubernetes Cluster.
  */
@@ -100,12 +108,13 @@ public class K8sSubmitter implements Submitter {
 
   private AppsV1Api appsV1Api;
 
+  private ApiClient client = null;
+
   public K8sSubmitter() {
   }
 
   @Override
   public void initialize(SubmarineConfiguration conf) {
-    ApiClient client = null;
     try {
       String path = System.getenv(KUBECONFIG_ENV);
       KubeConfig config = KubeConfig.loadKubeConfig(new FileReader(path));
@@ -119,6 +128,10 @@ public class K8sSubmitter implements Submitter {
         throw new SubmarineRuntimeException(500, "Initialize K8s submitter failed.");
       }
     } finally {
+      // let watcher can wait until the next change
+      OkHttpClient httpClient = client.getHttpClient();
+      httpClient.setReadTimeout(0, TimeUnit.SECONDS);
+      client.setHttpClient(httpClient);
       Configuration.setDefaultApiClient(client);
     }
 
@@ -133,7 +146,12 @@ public class K8sSubmitter implements Submitter {
       appsV1Api = new AppsV1Api();
     }
 
-    client.setDebugging(true);
+    try {
+      watchExperiment();
+    } catch (Exception e){
+      LOG.error("Experiment watch failed. " + e.getMessage(), e);
+    }
+
   }
 
   @Override
@@ -164,7 +182,7 @@ public class K8sSubmitter implements Submitter {
     try {
       MLJob mlJob = ExperimentSpecParser.parseJob(spec);
       mlJob.getMetadata().setNamespace(getServerNamespace());
-      
+
       Object object = api.getNamespacedCustomObject(mlJob.getGroup(), mlJob.getVersion(),
           mlJob.getMetadata().getNamespace(), mlJob.getPlural(), mlJob.getMetadata().getName());
       experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_RESULT);
@@ -451,7 +469,7 @@ public class K8sSubmitter implements Submitter {
         if (latestEvent.getReason().equalsIgnoreCase("Pulling")) {
           notebook.setStatus(Notebook.Status.STATUS_PULLING.getValue());
           notebook.setReason(latestEvent.getReason());
-        }  
+        }
       }
     } catch (ApiException e) {
       throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
@@ -565,6 +583,89 @@ public class K8sSubmitter implements Submitter {
     }
   }
 
+  public void watchExperiment() throws ApiException{
+
+    Watch<MLJob> watchTF = Watch.createWatch(
+              client,
+              api.listNamespacedCustomObjectCall(
+                      TFJob.CRD_TF_GROUP_V1,
+                      TFJob.CRD_TF_VERSION_V1,
+                      getServerNamespace(),
+                      TFJob.CRD_TF_PLURAL_V1,
+                      "true",
+                      null,
+                      null,
+                      null,
+                      null,
+                      Boolean.TRUE,
+                      null,
+                      null
+              ),
+              new TypeToken<Watch.Response<MLJob>>() {}.getType()
+      );
+
+    Watch<MLJob> watchPytorch = Watch.createWatch(
+            client,
+            api.listNamespacedCustomObjectCall(
+                    PyTorchJob.CRD_PYTORCH_GROUP_V1,
+                    PyTorchJob.CRD_PYTORCH_VERSION_V1,
+                    getServerNamespace(),
+                    PyTorchJob.CRD_PYTORCH_PLURAL_V1,
+                    "true",
+                    null,
+                    null,
+                    null,
+                    null,
+                    Boolean.TRUE,
+                    null,
+                    null
+            ),
+            new TypeToken<Watch.Response<MLJob>>() {}.getType()
+    );
+
+    ExecutorService experimentThread = Executors.newFixedThreadPool(2);
+
+    experimentThread.execute(new Runnable() {
+        @Override
+        public void run() {
+            try {
+              LOG.info("Start watching on TFJobs...");
+              for (Watch.Response<MLJob> experiment : watchTF) {
+                LOG.info("{}", experiment.object.getStatus());
+              }
+            } finally {
+              LOG.info("WATCH TFJob END");
+              try {
+                watchTF.close();
+              } catch (Exception e){
+                LOG.error("{}", e.getMessage());
+              }
+              throw new RuntimeException();
+            }
+        }
+    });
+
+    experimentThread.execute(new Runnable() {
+        @Override
+        public void run() {
+            try {
+              LOG.info("Start watching on PytorchJobs...");
+              for (Watch.Response<MLJob> experiment : watchPytorch) {
+                LOG.info("{}", experiment.object.getStatus());
+              }
+            } finally {
+              LOG.info("WATCH PytorchJob END");
+              try {
+                watchPytorch.close();
+              } catch (Exception e){
+                LOG.error("{}", e.getMessage());
+              }
+              throw new RuntimeException();
+            }
+        }
+    });
+  }
+
   public void createPersistentVolumeClaim(String pvcName, String namespace, String scName, String storage)
       throws ApiException {
     V1PersistentVolumeClaim pvc = VolumeSpecParser.parsePersistentVolumeClaim(pvcName, scName, storage);

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org