You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by cd...@apache.org on 2023/03/11 10:20:46 UTC

[submarine] branch master updated: SUBMARINE-1362. Refactor submarine-k8s-agent using JOSDK

This is an automated email from the ASF dual-hosted git repository.

cdmikechen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 48cdf7bb SUBMARINE-1362. Refactor submarine-k8s-agent using JOSDK
48cdf7bb is described below

commit 48cdf7bbc82af714a5a5485c92d7da4f2fba3891
Author: cdmikechen <cd...@apache.org>
AuthorDate: Sat Mar 11 17:20:55 2023 +0800

    SUBMARINE-1362. Refactor submarine-k8s-agent using JOSDK
    
    ### What is this PR for?
    A unified agent service replaces the previous one for each experiment and notebook. The unified agent will listen to both experiments and notebooks and update their status in real time, instead of having to start them independently, which would have resulted in untimely status updates.
    We will use the Java Operator SDK (https://github.com/java-operator-sdk/java-operator-sdk)
    
    ### What type of PR is it?
    Feature
    
    ### Todos
    * [x] - Use Java Operator SDK to replace old agent
    * [x] - Remove agent pod deployment when creating/deleting resource
    * [x] - Operator support agent deployment
    * [x] - Add agent test cases
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-1362
    
    ### How should this be tested?
    Agent test cases
    
    ### Screenshots (if appropriate)
    NA
    
    ### Questions:
    * Do the license files need updating? Yes
    * Are there breaking changes for older versions? Yes
    * Does this need new documentation? No
    
    Author: cdmikechen <cd...@apache.org>
    
    Signed-off-by: cdmikechen <cd...@apache.org>
    
    Closes #1038 from cdmikechen/SUBMARINE-1362 and squashes the following commits:
    
    0e91b022 [cdmikechen] Update libs version in LICENSE-binary
    e8718928 [cdmikechen] Fix cicd
    b9497d7c [cdmikechen] Add cicd
    d5d9d0af [cdmikechen] Unit tests update
    ee87f8d5 [cdmikechen] Add Apache2.0
    e8f109fb [cdmikechen] Add agent test
    fa74b7ca [cdmikechen] Add Apache2.0
    b78d9cc2 [cdmikechen] Add Test Framework
    c98425bb [cdmikechen] Add agent deployment in operator
    de4035b2 [cdmikechen] Remove agent pod deployment when creating/deleting resource
    b8f01ee3 [cdmikechen] Use Java Operator SDK to replace old agent
---
 .github/workflows/master.yml                       |  19 +-
 LICENSE-binary                                     |  10 +-
 dev-support/docker-images/agent/Dockerfile         |  17 +-
 dev-support/docker-images/agent/build.sh           |  14 +-
 pom.xml                                            |   9 +-
 submarine-cloud-v3/api/v1alpha1/submarine_types.go |  12 +-
 .../api/v1alpha1/zz_generated.deepcopy.go          |  20 ++
 submarine-cloud-v3/artifacts/submarine-agent.yaml  |  47 +++
 .../crd/bases/submarine.apache.org_submarines.yaml |  13 +-
 submarine-cloud-v3/controllers/submarine_agent.go  | 139 +++++++++
 .../controllers/submarine_agent_test.go            |  49 +++
 .../controllers/submarine_controller.go            |  11 +-
 .../controllers/submarine_database.go              |  10 +-
 .../controllers/submarine_virtualservice.go        |   4 +-
 submarine-server/server-core/pom.xml               |   4 +
 .../experiment/mappers/ExperimentMapper.java       |  26 ++
 .../database/notebook/mappers/NotebookMapper.java  |   8 +
 .../database/mappers/ExperimentMapper.xml          |  24 ++
 .../submarine/database/mappers/NotebookMapper.xml  |   8 +
 .../server/k8s/utils/OwnerReferenceConfig.java     |  66 ++++
 .../server-submitter/submarine-k8s-agent/README.md |  46 +++
 .../server-submitter/submarine-k8s-agent/pom.xml   | 233 +++++++-------
 .../src/assembly/distribution.xml                  |  69 -----
 .../src/assembly/src-distribution.xml              |  53 ----
 .../submarine/server/k8s/agent/SubmarineAgent.java |  84 ------
 .../server/k8s/agent/SubmarineAgentListener.java   |  82 +++++
 .../k8s/agent/handler/CustomResourceHandler.java   | 130 --------
 .../server/k8s/agent/handler/NotebookHandler.java  | 154 ----------
 .../k8s/agent/handler/PyTorchJobHandler.java       | 114 -------
 .../server/k8s/agent/handler/TFJobHandler.java     | 112 -------
 .../k8s/agent/model/notebook/NotebookResource.java |  51 ++++
 .../model/notebook/status/NotebookCondition.java   | 116 +++++++
 .../model/notebook/status/NotebookStatus.java      |  94 ++++++
 .../training/JobResource.java}                     |  35 ++-
 .../agent/model/training/resource/PyTorchJob.java  |  42 +++
 .../k8s/agent/model/training/resource/TFJob.java   |  42 +++
 .../agent/model/training/resource/XGBoostJob.java  |  42 +++
 .../agent/model/training/status/JobCondition.java  | 128 ++++++++
 .../k8s/agent/model/training/status/JobStatus.java | 118 ++++++++
 .../agent/model/training/status/ReplicaStatus.java | 101 +++++++
 .../server/k8s/agent/reconciler/JobReconciler.java | 180 +++++++++++
 .../k8s/agent/reconciler/NotebookReconciler.java   | 197 ++++++++++++
 .../k8s/agent/reconciler/PyTorchJobReconciler.java |  54 ++++
 .../k8s/agent/reconciler/TFJobReconciler.java      |  54 ++++
 .../k8s/agent/reconciler/XGBoostJobReconciler.java |  54 ++++
 .../server/k8s/agent/util/RestClient.java          |  55 ----
 .../src/main/resources/log4j.properties            |   4 +
 .../src/main/resources/mybatis-config.xml          |  48 +++
 .../server/k8s/agent/SubmitSubmarineAgentTest.java | 336 +++++++++++++++++++++
 .../test/resources/custom-resources/notebook.yml   |  71 +++++
 .../resources/custom-resources/pytorchjobs.yaml    |  61 ++++
 .../test/resources/custom-resources/tfjobs.yaml    |  59 ++++
 .../resources/custom-resources/xgboostjobs.yaml    |  60 ++++
 .../src/test/resources/db/agent-init.sql           |  51 ++++
 .../src/{main => test}/resources/log4j.properties  |   7 +
 .../server/submitter/k8s/K8sSubmitter.java         |  31 +-
 .../server/submitter/k8s/model/AgentPod.java       |   5 +
 .../submitter/k8s/util/OwnerReferenceUtils.java    |  17 +-
 .../submitter/k8s/SubmitterNotebookApiTest.java    |  21 +-
 .../k8s/mljob/SubmitterPyTorchApiTest.java         |  19 +-
 .../k8s/mljob/SubmitterTensorflowApiTest.java      |  19 +-
 .../k8s/mljob/SubmitterXGBoostApiTest.java         |  20 +-
 62 files changed, 2735 insertions(+), 1044 deletions(-)

diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 02b04f34..c474fd39 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -517,6 +517,12 @@ jobs:
           path: |
             ./submarine-server/server-submitter/submitter-k8s/target/jacoco.exec
           key: ${{ runner.os }}-docker-${{ github.sha }}
+      - name: Cache submitter-k8s-agent jacoco.exec
+        uses: actions/cache@v2
+        with:
+          path: |
+            ./submarine-server/server-submitter/submarine-k8s-agent/target/jacoco.exec
+          key: ${{ runner.os }}-docker-${{ github.sha }}
       - name: Set up JDK 11
         uses: actions/setup-java@v1
         with:
@@ -531,16 +537,17 @@ jobs:
           java -version
       - name: Build
         env:
-          MODULES: "-pl :submarine-server-submitter,:submarine-submitter-k8s"
+          MODULES: "-pl :submarine-server-submitter,:submarine-submitter-k8s,:submarine-k8s-agent"
         run: |
           echo ">>> mvn $BUILD_FLAG $MODULES -B"
           mvn $BUILD_FLAG $MODULES -B
       - name: Test
         env:
-          # There is a `submarine-submitter-k8s` package under the `submarine-server-submitter` that also needs to be tested
-          TEST_MODULES: "-pl :submarine-server-submitter,:submarine-submitter-k8s"
+          # There are some `submitter-k8s` packages under the `submarine-server-submitter` that also needs to be tested
+          TEST_MODULES: "-pl :submarine-server-submitter,:submarine-submitter-k8s,:submarine-k8s-agent"
         run: |
           echo ">>> mvn $TEST_FLAG $TEST_MODULES -B"
+          export SUBMARINE_UID="dfea05c4-dbf2-43cc-833c-62ff329566a5"
           mvn $TEST_FLAG $TEST_MODULES -B
   rat:
     name: Check License
@@ -677,6 +684,12 @@ jobs:
           path: |
             ./submarine-server/server-submitter/submitter-k8s/target/jacoco.exec
           key: ${{ runner.os }}-docker-${{ github.sha }}
+      - name: Cache submarine-k8s-agent data
+        uses: actions/cache@v2
+        with:
+          path: |
+            ./submarine-server/server-submitter/submarine-k8s-agent/target/jacoco.exec
+          key: ${{ runner.os }}-docker-${{ github.sha }}
       - name: Cache SonarCloud packages
         uses: actions/cache@v1
         with:
diff --git a/LICENSE-binary b/LICENSE-binary
index 50a3c95d..77585881 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -206,10 +206,10 @@
 This project bundles some components that are also licensed under the Apache
 License Version 2.0:
 cglib:cglib:2.2.2
-com.fasterxml.jackson.core:jackson-annotations:2.7.8
-com.fasterxml.jackson.core:jackson-databind:2.7.8
-com.fasterxml.jackson.core:jackson-core:2.7.8
-com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.8.10
+com.fasterxml.jackson.core:jackson-annotations:2.13.4
+com.fasterxml.jackson.core:jackson-databind:2.13.4
+com.fasterxml.jackson.core:jackson-core:2.13.4
+com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.13.4
 com.fasterxml.woodstox:woodstox-core:5.0.3
 com.github.stephenc.jcip:jcip-annotations:1.0-1
 com.google.code.findbugs:jsr305:3.0.0
@@ -237,6 +237,8 @@ commons-lang:commons-lang:2.6
 commons-logging:commons-logging:1.1.3
 commons-logging:commons-logging:1.1.1
 commons-net:commons-net:3.1
+io.fabric8:kubernetes-server-mock:6.2.0
+io.javaoperatorsdk:operator-framework:4.1.1
 io.netty:netty:3.7.0.Final
 io.netty:netty-all:4.0.23.Final
 javax.inject:javax.inject:1
diff --git a/dev-support/docker-images/agent/Dockerfile b/dev-support/docker-images/agent/Dockerfile
index 34fd4b4b..0dd3d7e8 100644
--- a/dev-support/docker-images/agent/Dockerfile
+++ b/dev-support/docker-images/agent/Dockerfile
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM alpine:3.10
+FROM alpine:3.16.3
 MAINTAINER Apache Software Foundation <de...@submarine.apache.org>
 
 # If you are in China, enabling the following two lines of code can speed up the build of the image, but it may cause failure in travis.
@@ -23,21 +23,18 @@ MAINTAINER Apache Software Foundation <de...@submarine.apache.org>
 
 # INSTALL openjdk
 RUN apk update && \
-    apk add --no-cache openjdk8 tzdata bash tini&& \
+    apk add --no-cache openjdk11 tzdata bash tini&& \
     cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
     echo Asia/Shanghai > /etc/timezone && \
     apk del tzdata && \
     rm -rf /tmp/* /var/cache/apk/*
 
-ENV JAVA_HOME /usr/lib/jvm/java-1.8-openjdk/jre
+ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk/jre
 
 # Install Submarine
-ADD ./tmp/submarine-k8s-agent-*.tar.gz /opt/
-RUN ln -s /opt/submarine-k8s-agent-* "/opt/submarine-current"
-ADD ./tmp/submarine-site.xml "/opt/submarine-current/conf/"
+RUN mkdir "/opt/submarine-agent"
+ADD ./tmp/submarine-k8s-agent-*.jar /opt/submarine-agent/submarine-k8s-agent.jar
 
-WORKDIR /opt/submarine-current
+WORKDIR /opt/submarine-agent
 
-ENTRYPOINT ["/sbin/tini", "--"]
-
-CMD ["/bin/bash", "-c", "/opt/submarine-current/bin/agent.sh"]
+CMD ["java", "-jar", "/opt/submarine-agent/submarine-k8s-agent.jar"]
diff --git a/dev-support/docker-images/agent/build.sh b/dev-support/docker-images/agent/build.sh
index a519b024..5330f637 100755
--- a/dev-support/docker-images/agent/build.sh
+++ b/dev-support/docker-images/agent/build.sh
@@ -30,19 +30,15 @@ export SUBMARINE_HOME=${CURRENT_PATH}/../../..
 if [ ! -d "${SUBMARINE_HOME}/submarine-server/server-submitter/submarine-k8s-agent/target" ]; then
   mkdir -p "${SUBMARINE_HOME}/submarine-server/server-submitter/submarine-k8s-agent/target"
 fi
-submarine_dist_exists=$(find -L "${SUBMARINE_HOME}/submarine-server/server-submitter/submarine-k8s-agent/target" -name "submarine-k8s-agent-${SUBMARINE_VERSION}.tar.gz")
+submarine_jar_exists=$(find -L "${SUBMARINE_HOME}/submarine-server/server-submitter/submarine-k8s-agent/target" -name "submarine-k8s-agent-${SUBMARINE_VERSION}.jar")
 # Build source code if the package doesn't exist.
-if [[ -z "${submarine_dist_exists}" ]]; then
-  cd "${SUBMARINE_HOME}/submarine-server/server-submitter/submarine-k8s-agent"
-  mvn clean package -DskipTests
+if [[ -z "${submarine_jar_exists}" ]]; then
+  cd ${SUBMARINE_HOME}
+  mvn clean install -DskipTests -pl submarine-server/server-submitter/submarine-k8s-agent -am
 fi
 
 mkdir -p "${CURRENT_PATH}/tmp"
-cp ${SUBMARINE_HOME}/submarine-server/server-submitter/submarine-k8s-agent/target/submarine-k8s-agent-${SUBMARINE_VERSION}.tar.gz "${CURRENT_PATH}/tmp"
-
-# Replace the submarine.server.addr in the submarine-site.xml file with the link name of the submarine container
-# `submarine-server` is submarine server container name
-cp ${SUBMARINE_HOME}/conf/submarine-site.xml "${CURRENT_PATH}/tmp/"
+cp ${SUBMARINE_HOME}/submarine-server/server-submitter/submarine-k8s-agent/target/submarine-k8s-agent-${SUBMARINE_VERSION}.jar "${CURRENT_PATH}/tmp"
 
 # build image
 cd ${CURRENT_PATH}
diff --git a/pom.xml b/pom.xml
index b1df264f..63f3cadc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,10 +74,10 @@
     <pagehelper.version>5.1.10</pagehelper.version>
 
     <gson.version>2.8.9</gson.version>
-    <jackson-databind.version>2.11.0</jackson-databind.version>
-    <jackson-annotations.version>2.11.0</jackson-annotations.version>
-    <jackson-core.version>2.11.0</jackson-core.version>
-    <jackson-module-jaxb-annotations.version>2.11.0</jackson-module-jaxb-annotations.version>
+    <jackson-databind.version>2.13.4</jackson-databind.version>
+    <jackson-annotations.version>2.13.4</jackson-annotations.version>
+    <jackson-core.version>2.13.4</jackson-core.version>
+    <jackson-module-jaxb-annotations.version>2.13.4</jackson-module-jaxb-annotations.version>
     <commons-configuration.version>1.10</commons-configuration.version>
     <commons-httpclient.version>3.1</commons-httpclient.version>
 
@@ -135,6 +135,7 @@
     <json.version>20211205</json.version>
     <!--  Submarine on Kubernetes  -->
     <k8s.client-java.version>11.0.1</k8s.client-java.version>
+    <k8s.fabric8.version>6.2.0</k8s.fabric8.version>
     <jersey.test-framework>2.27</jersey.test-framework>
     <!-- integration test-->
     <plugin.failsafe.version>2.17</plugin.failsafe.version>
diff --git a/submarine-cloud-v3/api/v1alpha1/submarine_types.go b/submarine-cloud-v3/api/v1alpha1/submarine_types.go
index 3ad43720..ae3a85bb 100644
--- a/submarine-cloud-v3/api/v1alpha1/submarine_types.go
+++ b/submarine-cloud-v3/api/v1alpha1/submarine_types.go
@@ -58,6 +58,8 @@ type SubmarineSpec struct {
 	Minio *SubmarineMinioSpec `json:"minio"`
 	// Common is the spec that defines some submarine common configurations
 	Common *SubmarineCommon `json:"common,omitempty"`
+	// SubmarineAgent is the spec that defines the submarine agent
+	Agent *SubmarineAgent `json:"agent,omitempty"`
 }
 
 // SubmarineServerSpec defines the desired submarine server
@@ -77,8 +79,8 @@ type SubmarineDatabaseSpec struct {
 	Image string `json:"image,omitempty"`
 	// StorageSize is the storage size of the database
 	StorageSize string `json:"storageSize"`
-	// MysqlRootPasswordSecret is the mysql root password secret, this secret need password key: MYSQL_ROOT_PASSWORD
-	MysqlRootPasswordSecret string `json:"mysqlRootPasswordSecret"`
+	// MysqlRootPasswordSecret is the mysql root password secret, secret must have key MYSQL_ROOT_PASSWORD as root password
+	MysqlRootPasswordSecret string `json:"mysqlRootPasswordSecret,omitempty"`
 }
 
 // SubmarineVirtualserviceSpec defines the desired submarine virtualservice
@@ -152,6 +154,12 @@ type SubmarineStatus struct {
 	SubmarineState `json:"submarineState,omitempty"`
 }
 
+// SubmarineAgent defines the observed submarine agent
+type SubmarineAgent struct {
+	// Image is the submarine agent's docker image
+	Image string `json:"image,omitempty"`
+}
+
 // SubmarineStateType represents the type of the current state of a submarine.
 type SubmarineStateType string
 
diff --git a/submarine-cloud-v3/api/v1alpha1/zz_generated.deepcopy.go b/submarine-cloud-v3/api/v1alpha1/zz_generated.deepcopy.go
index c09b18b3..d5ab834a 100644
--- a/submarine-cloud-v3/api/v1alpha1/zz_generated.deepcopy.go
+++ b/submarine-cloud-v3/api/v1alpha1/zz_generated.deepcopy.go
@@ -74,6 +74,21 @@ func (in *Submarine) DeepCopyObject() runtime.Object {
 	return nil
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *SubmarineAgent) DeepCopyInto(out *SubmarineAgent) {
+	*out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubmarineAgent.
+func (in *SubmarineAgent) DeepCopy() *SubmarineAgent {
+	if in == nil {
+		return nil
+	}
+	out := new(SubmarineAgent)
+	in.DeepCopyInto(out)
+	return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *SubmarineCommon) DeepCopyInto(out *SubmarineCommon) {
 	*out = *in
@@ -242,6 +257,11 @@ func (in *SubmarineSpec) DeepCopyInto(out *SubmarineSpec) {
 		*out = new(SubmarineCommon)
 		(*in).DeepCopyInto(*out)
 	}
+	if in.Agent != nil {
+		in, out := &in.Agent, &out.Agent
+		*out = new(SubmarineAgent)
+		**out = **in
+	}
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubmarineSpec.
diff --git a/submarine-cloud-v3/artifacts/submarine-agent.yaml b/submarine-cloud-v3/artifacts/submarine-agent.yaml
new file mode 100644
index 00000000..90c6c040
--- /dev/null
+++ b/submarine-cloud-v3/artifacts/submarine-agent.yaml
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: "submarine-agent"
+spec:
+  selector:
+    matchLabels:
+      app: "submarine-agent"
+  replicas: 1
+  template:
+    metadata:
+      labels:
+        app: "submarine-agent"
+    spec:
+      serviceAccountName: "submarine-server"
+      containers:
+      - name: "submarine-server"
+        livenessProbe:
+          httpGet:
+            path: /health
+            port: 8080
+          initialDelaySeconds: 10
+          failureThreshold: 3
+          periodSeconds: 10
+        env:
+        - name: JDBC_URL
+          value: "jdbc:mysql://submarine-database:3306/submarine?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&allowMultiQueries=true&failOverReadOnly=false&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=UTC&useTimezone=true&useLegacyDatetimeCode=true"
+        image: "apache/submarine:agent-0.8.0-SNAPSHOT"
+        imagePullPolicy: IfNotPresent
+        ports:
+        - containerPort: 8080
diff --git a/submarine-cloud-v3/config/crd/bases/submarine.apache.org_submarines.yaml b/submarine-cloud-v3/config/crd/bases/submarine.apache.org_submarines.yaml
index b9a5f7ba..be88a207 100644
--- a/submarine-cloud-v3/config/crd/bases/submarine.apache.org_submarines.yaml
+++ b/submarine-cloud-v3/config/crd/bases/submarine.apache.org_submarines.yaml
@@ -52,6 +52,14 @@ spec:
           spec:
             description: SubmarineSpec defines the desired state of Submarine
             properties:
+              agent:
+                description: SubmarineAgent is the spec that defines the submarine
+                  agent
+                properties:
+                  image:
+                    description: Image is the submarine agent's docker image
+                    type: string
+                type: object
               common:
                 description: Common is the spec that defines some submarine common
                   configurations
@@ -87,14 +95,13 @@ spec:
                     description: Image is the submarine database's docker image
                     type: string
                   mysqlRootPasswordSecret:
-                    description: 'MysqlRootPasswordSecret is the mysql root password
-                      secret, this secret need password key: MYSQL_ROOT_PASSWORD'
+                    description: MysqlRootPasswordSecret is the mysql root password
+                      secret, secret must have key MYSQL_ROOT_PASSWORD as root password
                     type: string
                   storageSize:
                     description: StorageSize is the storage size of the database
                     type: string
                 required:
-                - mysqlRootPasswordSecret
                 - storageSize
                 type: object
               minio:
diff --git a/submarine-cloud-v3/controllers/submarine_agent.go b/submarine-cloud-v3/controllers/submarine_agent.go
new file mode 100644
index 00000000..f3c8de3f
--- /dev/null
+++ b/submarine-cloud-v3/controllers/submarine_agent.go
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controllers
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/apache/submarine/submarine-cloud-v3/controllers/util"
+
+	appsv1 "k8s.io/api/apps/v1"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/errors"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/types"
+	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+
+	submarineapacheorgv1alpha1 "github.com/apache/submarine/submarine-cloud-v3/api/v1alpha1"
+)
+
+func (r *SubmarineReconciler) newSubmarineAgentDeployment(ctx context.Context, submarine *submarineapacheorgv1alpha1.Submarine) *appsv1.Deployment {
+	deployment, err := util.ParseDeploymentYaml(agentYamlPath)
+	if err != nil {
+		r.Log.Error(err, "ParseDeploymentYaml")
+	}
+	deployment.Namespace = submarine.Namespace
+	err = controllerutil.SetControllerReference(submarine, deployment, r.Scheme)
+	if err != nil {
+		r.Log.Error(err, "Set Deployment ControllerReference")
+	}
+
+	// env
+	env := []corev1.EnvVar{
+		{
+			Name:  "SUBMARINE_UID",
+			Value: string(submarine.UID),
+		},
+	}
+	deployment.Spec.Template.Spec.Containers[0].Env = append(deployment.Spec.Template.Spec.Containers[0].Env, env...)
+
+	// agent image
+	if submarine.Spec.Agent != nil {
+		agentImage := submarine.Spec.Agent.Image
+		if agentImage != "" {
+			deployment.Spec.Template.Spec.Containers[0].Image = agentImage
+		} else {
+			deployment.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("apache/submarine:agent-%s", submarine.Spec.Version)
+		}
+	}
+	// pull secrets
+	pullSecrets := util.GetSubmarineCommonImage(submarine).PullSecrets
+	if pullSecrets != nil {
+		deployment.Spec.Template.Spec.ImagePullSecrets = r.CreatePullSecrets(&pullSecrets)
+	}
+
+	return deployment
+}
+
+// createSubmarineAgent is a function to create submarine-agent.
+// Reference: https://github.com/apache/submarine/blob/master/submarine-cloud-v3/artifacts/submarine-agent.yaml
+func (r *SubmarineReconciler) createSubmarineAgent(ctx context.Context, submarine *submarineapacheorgv1alpha1.Submarine) error {
+	r.Log.Info("Enter createSubmarineAgent")
+
+	// Step1: Create Deployment
+	deployment := &appsv1.Deployment{}
+	err := r.Get(ctx, types.NamespacedName{Name: agentName, Namespace: submarine.Namespace}, deployment)
+	// If the resource doesn't exist, we'll create it
+	if errors.IsNotFound(err) {
+		deployment = r.newSubmarineAgentDeployment(ctx, submarine)
+		err = r.Create(ctx, deployment)
+		r.Log.Info("Create Deployment", "name", deployment.Name)
+	} else {
+		newDeployment := r.newSubmarineAgentDeployment(ctx, submarine)
+		// compare if there are same
+		if !r.CompareAgentDeployment(deployment, newDeployment) {
+			// update meta with uid
+			newDeployment.ObjectMeta = deployment.ObjectMeta
+			err = r.Update(ctx, newDeployment)
+			r.Log.Info("Update Deployment", "name", deployment.Name)
+		}
+	}
+
+	// If an error occurs during Get/Create, we'll requeue the item so we can
+	// attempt processing again later. This could have been caused by a
+	// temporary network failure, or any other transient reason.
+	if err != nil {
+		if errors.IsNotFound(err) {
+			return nil
+		}
+		return err
+	}
+
+	if !metav1.IsControlledBy(deployment, submarine) {
+		msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
+		r.Recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
+		return fmt.Errorf(msg)
+	}
+
+	return nil
+}
+
+// CompareAgentDeployment will determine if two Deployments are equal
+func (r *SubmarineReconciler) CompareAgentDeployment(oldDeployment, newDeployment *appsv1.Deployment) bool {
+	// spec.replicas
+	if *oldDeployment.Spec.Replicas != *newDeployment.Spec.Replicas {
+		return false
+	}
+	if len(oldDeployment.Spec.Template.Spec.Containers) != 1 {
+		return false
+	}
+	// spec.template.spec.containers[0].env
+	if !util.CompareEnv(oldDeployment.Spec.Template.Spec.Containers[0].Env, newDeployment.Spec.Template.Spec.Containers[0].Env) {
+		return false
+	}
+	// spec.template.spec.containers[0].image
+	if oldDeployment.Spec.Template.Spec.Containers[0].Image != newDeployment.Spec.Template.Spec.Containers[0].Image {
+		return false
+	}
+	// spec.template.spec.imagePullSecrets
+	if !util.ComparePullSecrets(oldDeployment.Spec.Template.Spec.ImagePullSecrets, newDeployment.Spec.Template.Spec.ImagePullSecrets) {
+		return false
+	}
+	return true
+}
diff --git a/submarine-cloud-v3/controllers/submarine_agent_test.go b/submarine-cloud-v3/controllers/submarine_agent_test.go
new file mode 100644
index 00000000..2278c390
--- /dev/null
+++ b/submarine-cloud-v3/controllers/submarine_agent_test.go
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controllers
+
+import (
+	"context"
+	"testing"
+
+	submarineapacheorgv1alpha1 "github.com/apache/submarine/submarine-cloud-v3/api/v1alpha1"
+
+	. "github.com/apache/submarine/submarine-cloud-v3/controllers/util"
+	. "github.com/onsi/gomega"
+)
+
+func TestSubmarineAgent(t *testing.T) {
+	g := NewGomegaWithT(t)
+	r := createSubmarineReconciler(&SubmarineReconciler{Namespace: "submarine"})
+	submarine, err := MakeSubmarineFromYamlByNamespace("../config/samples/_v1alpha1_submarine.yaml", "submarine")
+	g.Expect(err).To(BeNil())
+
+	ArtifactBasePath = "../"
+	deployment1 := r.newSubmarineAgentDeployment(context.TODO(), submarine)
+	g.Expect(deployment1).NotTo(BeNil())
+
+	// change image
+	submarine.Spec.Agent = &submarineapacheorgv1alpha1.SubmarineAgent{
+		Image: "apache/submarine:agent",
+	}
+	deployment2 := r.newSubmarineAgentDeployment(context.TODO(), submarine)
+	g.Expect(deployment2.Spec.Template.Spec.Containers[0].Image).To(Equal("apache/submarine:agent"))
+
+	// compare
+	g.Expect(r.CompareAgentDeployment(deployment1, deployment2)).To(Equal(false))
+}
diff --git a/submarine-cloud-v3/controllers/submarine_controller.go b/submarine-cloud-v3/controllers/submarine_controller.go
index c8e3303b..f8a7c567 100644
--- a/submarine-cloud-v3/controllers/submarine_controller.go
+++ b/submarine-cloud-v3/controllers/submarine_controller.go
@@ -42,6 +42,7 @@ import (
 // Defines resource names and path to artifact yaml files
 // Reference: https://github.com/apache/submarine/blob/master/submarine-cloud-v3/artifacts/
 const (
+	agentName              = "submarine-agent"
 	serverName             = "submarine-server"
 	observerName           = "submarine-observer"
 	databaseName           = "submarine-database"
@@ -58,6 +59,7 @@ const (
 	minioPvcName           = minioName + "-pvc"
 	minioServiceName       = minioName + "-service"
 	artifactPath           = "./artifacts/"
+	agentYamlPath          = artifactPath + "submarine-agent.yaml"
 	databaseYamlPath       = artifactPath + "submarine-database.yaml"
 	minioYamlPath          = artifactPath + "submarine-minio.yaml"
 	serveYamlPath          = artifactPath + "submarine-serve.yaml"
@@ -317,12 +319,12 @@ func (r *SubmarineReconciler) createSubmarine(ctx context.Context, submarine *su
 		return err
 	}
 
-	err = r.createSubmarineServer(ctx, submarine)
+	err = r.createSubmarineDatabase(ctx, submarine)
 	if err != nil && !errors.IsAlreadyExists(err) {
 		return err
 	}
 
-	err = r.createSubmarineDatabase(ctx, submarine)
+	err = r.createSubmarineServer(ctx, submarine)
 	if err != nil && !errors.IsAlreadyExists(err) {
 		return err
 	}
@@ -352,6 +354,11 @@ func (r *SubmarineReconciler) createSubmarine(ctx context.Context, submarine *su
 		return err
 	}
 
+	err = r.createSubmarineAgent(ctx, submarine)
+	if err != nil && !errors.IsAlreadyExists(err) {
+		return err
+	}
+
 	return nil
 }
 
diff --git a/submarine-cloud-v3/controllers/submarine_database.go b/submarine-cloud-v3/controllers/submarine_database.go
index 9e83f643..f068f24d 100644
--- a/submarine-cloud-v3/controllers/submarine_database.go
+++ b/submarine-cloud-v3/controllers/submarine_database.go
@@ -86,7 +86,15 @@ func (r *SubmarineReconciler) newSubmarineDatabaseStatefulSet(ctx context.Contex
 	}
 	// password secret
 	if submarine.Spec.Database.MysqlRootPasswordSecret != "" {
-		statefulset.Spec.Template.Spec.Containers[0].Env[0].ValueFrom.SecretKeyRef.Name = submarine.Spec.Database.MysqlRootPasswordSecret
+		statefulset.Spec.Template.Spec.Containers[0].Env[0].Value = ""
+		statefulset.Spec.Template.Spec.Containers[0].Env[0].ValueFrom = &corev1.EnvVarSource{
+			SecretKeyRef: &corev1.SecretKeySelector{
+				LocalObjectReference: corev1.LocalObjectReference{
+					Name: submarine.Spec.Database.MysqlRootPasswordSecret,
+				},
+				Key: "MYSQL_ROOT_PASSWORD",
+			},
+		}
 	}
 
 	return statefulset
diff --git a/submarine-cloud-v3/controllers/submarine_virtualservice.go b/submarine-cloud-v3/controllers/submarine_virtualservice.go
index ea729cb8..ab3ed902 100644
--- a/submarine-cloud-v3/controllers/submarine_virtualservice.go
+++ b/submarine-cloud-v3/controllers/submarine_virtualservice.go
@@ -47,12 +47,12 @@ func (r *SubmarineReconciler) newSubmarineVirtualService(ctx context.Context, su
 	specVirtual := submarine.Spec.Virtualservice
 	if specVirtual != nil {
 		virtualserviceHosts := specVirtual.Hosts
-		if virtualserviceHosts != nil {
+		if virtualserviceHosts != nil && len(virtualserviceHosts) > 0 {
 			// Use `Hosts` defined in submarine spec
 			virtualService.Spec.Hosts = virtualserviceHosts
 		}
 		virtualserviceGateways := specVirtual.Gateways
-		if virtualserviceGateways != nil {
+		if virtualserviceGateways != nil && len(virtualserviceGateways) > 0 {
 			// Use `Gateways` defined in submarine spec
 			virtualService.Spec.Gateways = virtualserviceGateways
 		} else {
diff --git a/submarine-server/server-core/pom.xml b/submarine-server/server-core/pom.xml
index f582c69b..ebe8cb44 100644
--- a/submarine-server/server-core/pom.xml
+++ b/submarine-server/server-core/pom.xml
@@ -397,6 +397,10 @@
           <groupId>org.apache.commons</groupId>
           <artifactId>commons-text</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 
diff --git a/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/experiment/mappers/ExperimentMapper.java b/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/experiment/mappers/ExperimentMapper.java
index 7b6c9033..f99bc942 100644
--- a/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/experiment/mappers/ExperimentMapper.java
+++ b/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/experiment/mappers/ExperimentMapper.java
@@ -19,15 +19,41 @@
 
 package org.apache.submarine.server.database.experiment.mappers;
 
+import org.apache.ibatis.annotations.Param;
 import org.apache.submarine.server.database.experiment.entity.ExperimentEntity;
 
+import java.util.Date;
 import java.util.List;
 
 public interface ExperimentMapper {
+
   List<ExperimentEntity> selectAll();
+
   ExperimentEntity select(String id);
 
   int insert(ExperimentEntity experiment);
+
   int update(ExperimentEntity experiment);
+
   int delete(String id);
+
+  /**
+   * Update experimentStatus to 'Created'
+   */
+  int create(@Param("id") String id, @Param("acceptedTime") Date acceptedTime);
+
+  /**
+   * Update experimentStatus to 'Succeeded'
+   */
+  int succeed(@Param("id") String id, @Param("finishedTime") Date finishedTime);
+
+  /**
+   * Update experimentStatus to 'Failed'
+   */
+  int failed(@Param("id") String id, @Param("finishedTime") Date finishedTime);
+
+  /**
+   * Update experimentStatus to 'Running'
+   */
+  int running(@Param("id") String id, @Param("runningTime") Date runningTime);
 }
diff --git a/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/notebook/mappers/NotebookMapper.java b/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/notebook/mappers/NotebookMapper.java
index b5c626f8..97040130 100644
--- a/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/notebook/mappers/NotebookMapper.java
+++ b/submarine-server/server-database/src/main/java/org/apache/submarine/server/database/notebook/mappers/NotebookMapper.java
@@ -19,8 +19,10 @@
 
 package org.apache.submarine.server.database.notebook.mappers;
 
+import org.apache.ibatis.annotations.Param;
 import org.apache.submarine.server.database.notebook.entity.NotebookEntity;
 
+import java.util.Date;
 import java.util.List;
 
 public interface NotebookMapper {
@@ -33,4 +35,10 @@ public interface NotebookMapper {
   int update(NotebookEntity notebook);
 
   int delete(String id);
+
+  /**
+   * Update notebook status
+   */
+  int updateStatus(@Param("id") String id, @Param("status") String status,
+                   @Param("reason") String reason, @Param("updateTime")Date updateTime);
 }
diff --git a/submarine-server/server-database/src/main/resources/org/apache/submarine/database/mappers/ExperimentMapper.xml b/submarine-server/server-database/src/main/resources/org/apache/submarine/database/mappers/ExperimentMapper.xml
index 58a23d65..4e8a789f 100644
--- a/submarine-server/server-database/src/main/resources/org/apache/submarine/database/mappers/ExperimentMapper.xml
+++ b/submarine-server/server-database/src/main/resources/org/apache/submarine/database/mappers/ExperimentMapper.xml
@@ -82,4 +82,28 @@
     where id = #{id,jdbcType=VARCHAR}
   </update>
 
+  <update id="create" >
+    update experiment
+    set experiment_status='Created', accepted_time=#{acceptedTime,jdbcType=TIMESTAMP}, update_time=now()
+    where id = #{id,jdbcType=VARCHAR}
+  </update>
+
+  <update id="succeed" >
+    update experiment
+    set experiment_status='Succeeded', finished_time=#{finishedTime,jdbcType=TIMESTAMP}, update_time=now()
+    where id = #{id,jdbcType=VARCHAR}
+  </update>
+
+  <update id="failed" >
+    update experiment
+    set experiment_status='Failed', finished_time=#{finishedTime,jdbcType=TIMESTAMP}, update_time=now()
+    where id = #{id,jdbcType=VARCHAR}
+  </update>
+
+  <update id="running" >
+    update experiment
+    set experiment_status='Running', running_time=#{runningTime,jdbcType=TIMESTAMP}, update_time=now()
+    where id = #{id,jdbcType=VARCHAR}
+  </update>
+
 </mapper>
diff --git a/submarine-server/server-database/src/main/resources/org/apache/submarine/database/mappers/NotebookMapper.xml b/submarine-server/server-database/src/main/resources/org/apache/submarine/database/mappers/NotebookMapper.xml
index 578fb499..8ee20949 100644
--- a/submarine-server/server-database/src/main/resources/org/apache/submarine/database/mappers/NotebookMapper.xml
+++ b/submarine-server/server-database/src/main/resources/org/apache/submarine/database/mappers/NotebookMapper.xml
@@ -82,4 +82,12 @@
     where id = #{id,jdbcType=VARCHAR}
   </update>
 
+  <update id="updateStatus" >
+    update notebook
+    set notebook_status = #{status,jdbcType=VARCHAR},
+        reason = #{reason,jdbcType=VARCHAR},
+        update_time = #{updateTime,jdbcType=TIMESTAMP}
+    where id = #{id,jdbcType=VARCHAR}
+  </update>
+
 </mapper>
diff --git a/submarine-server/server-submitter/k8s-utils/src/main/java/org/apache/submarine/server/k8s/utils/OwnerReferenceConfig.java b/submarine-server/server-submitter/k8s-utils/src/main/java/org/apache/submarine/server/k8s/utils/OwnerReferenceConfig.java
new file mode 100644
index 00000000..db3e37aa
--- /dev/null
+++ b/submarine-server/server-submitter/k8s-utils/src/main/java/org/apache/submarine/server/k8s/utils/OwnerReferenceConfig.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.utils;
+
+/**
+ * OwnerReference config
+ * We currently get the configuration by environment variables
+ */
+public class OwnerReferenceConfig {
+
+  public static final String SUBMARINE_APIVERSION = "SUBMARINE_APIVERSION";
+  public static final String SUBMARINE_KIND = "SUBMARINE_KIND";
+  public static final String SUBMARINE_NAME = "SUBMARINE_NAME";
+  public static final String SUBMARINE_UID = "SUBMARINE_UID";
+
+  public static final String DEFAULT_SUBMARINE_APIVERSION = "submarine.apache.org/v1alpha1";
+  public static final String DEFAULT_SUBMARINE_KIND = "Submarine";
+
+  /**
+   * Get submarine apiVersion
+   */
+  public static String getSubmarineApiversion() {
+    String apiVersion = System.getenv(SUBMARINE_APIVERSION);
+    return apiVersion == null || apiVersion.isEmpty() ? DEFAULT_SUBMARINE_APIVERSION : apiVersion;
+  }
+
+  /**
+   * Get submarine kind
+   */
+  public static String getSubmarineKind() {
+    String kind = System.getenv(SUBMARINE_KIND);
+    return kind == null || kind.isEmpty() ? DEFAULT_SUBMARINE_KIND : kind;
+  }
+
+  /**
+   * Get submarine CR name
+   */
+  public static String getSubmarineName() {
+    return System.getenv(SUBMARINE_NAME);
+  }
+
+  /**
+   * Get submarine owner references uid
+   */
+  public static String getSubmarineUid() {
+    return System.getenv(SUBMARINE_UID);
+  }
+
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/README.md b/submarine-server/server-submitter/submarine-k8s-agent/README.md
new file mode 100644
index 00000000..5907e45f
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/README.md
@@ -0,0 +1,46 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# Submarine Agent
+
+## Development Guide
+
+The `submarine-server` created with operator contains a `SUBMARINE_UID` environment variable,
+which we will also need to configure locally during the development phase.
+```shell
+export SUBMARINE_UID=${submarine_uid}
+```
+
+Also, we need use `port-forward` to link the database port to a local connection
+```shell
+kubectl port-forward service/submarine-database 3306:3306 -n submarine-user-test
+```
+
+## Test
+
+If you want to run a test case, you need to set the environment variables before the test case starts
+to ensure that the watch event is listening to the relevant resources correctly.
+
+```shell
+export SUBMARINE_UID=${submarine_uid}
+mvn test -pl submarine-server/server-submitter/submarine-k8s-agent
+```
+
+## Build Image
+
+We already have a script to automate the image build
+```shell
+cd ./dev-support/docker-images/agent
+./build.sh
+```
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/pom.xml b/submarine-server/server-submitter/submarine-k8s-agent/pom.xml
index 982cdf0d..0f0035b1 100644
--- a/submarine-server/server-submitter/submarine-k8s-agent/pom.xml
+++ b/submarine-server/server-submitter/submarine-k8s-agent/pom.xml
@@ -34,120 +34,129 @@
 			<version>${slf4j.version}</version>
 		</dependency>
 
+    <dependency>
+      <groupId>io.javaoperatorsdk</groupId>
+      <artifactId>operator-framework</artifactId>
+      <version>4.1.1</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+
+    <dependency>
+      <groupId>io.fabric8</groupId>
+      <artifactId>kubernetes-server-mock</artifactId>
+      <version>${k8s.fabric8.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson-databind.version}</version>
+    </dependency>
+
 		<dependency>
 			<groupId>org.apache.submarine</groupId>
-			<artifactId>submarine-submitter-k8s</artifactId>
+			<artifactId>submarine-server-database</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-	</dependencies>
-	<profiles>
-		<profile>
-			<id>src</id>
-			<activation>
-				<activeByDefault>false</activeByDefault>
-			</activation>
-			<build>
-				<plugins>
-					<plugin>
-						<groupId>org.apache.maven.plugins</groupId>
-						<artifactId>maven-checkstyle-plugin</artifactId>
-						<configuration>
-							<skip>false</skip>
-						</configuration>
-					</plugin>
-					<plugin>
-						<artifactId>maven-enforcer-plugin</artifactId>
-					</plugin>
-					<plugin>
-						<groupId>org.apache.maven.plugins</groupId>
-						<artifactId>maven-assembly-plugin</artifactId>
-						<executions>
-							<execution>
-								<id>src-dist</id>
-								<phase>package</phase>
-								<goals>
-									<goal>single</goal>
-								</goals>
-								<configuration>
-									<finalName>${project.artifactId}-${project.version}-src</finalName>
-									<appendAssemblyId>false</appendAssemblyId>
-									<attach>false</attach>
-									<descriptors>
-										<descriptor>src/assembly/src-distribution.xml</descriptor>
-									</descriptors>
-								</configuration>
-							</execution>
-						</executions>
-					</plugin>
-				</plugins>
-			</build>
-		</profile>
-	</profiles>
-	<build>
-		<plugins>
-
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-checkstyle-plugin</artifactId>
-				<configuration>
-					<skip>false</skip>
-				</configuration>
-			</plugin>
-
-			<plugin>
-				<artifactId>maven-enforcer-plugin</artifactId>
-			</plugin>
-
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-dependency-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>copy-dependencies-runtime</id>
-						<phase>package</phase>
-						<goals>
-							<goal>copy-dependencies</goal>
-						</goals>
-						<configuration>
-							<includeScope>runtime</includeScope>
-						</configuration>
-					</execution>
-					<execution>
-						<id>copy-dependencies-system</id>
-						<phase>package</phase>
-						<goals>
-							<goal>copy-dependencies</goal>
-						</goals>
-						<configuration>
-							<includeScope>system</includeScope>
-							<excludeTransitive>true</excludeTransitive>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-assembly-plugin</artifactId>
-				<version>${plugin.maven.assembly.version}</version>
-				<executions>
-					<execution>
-						<id>dist</id>
-						<phase>package</phase>
-						<goals>
-							<goal>single</goal>
-						</goals>
-						<configuration>
-							<finalName>${project.artifactId}-${project.version}</finalName>
-							<appendAssemblyId>false</appendAssemblyId>
-							<attach>false</attach>
-							<descriptors>
-								<descriptor>src/assembly/distribution.xml</descriptor>
-							</descriptors>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
+
+    <dependency>
+      <groupId>org.takes</groupId>
+      <artifactId>takes</artifactId>
+      <version>1.24.4</version>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>${mysql-connector-java.version}</version>
+      <!-- mysql-connector-java uses the GPL license. When we release the version in Submarine-dist, we exclude mysql-connector-java jar -->
+    </dependency>
+
+    <dependency>
+      <groupId>org.reflections</groupId>
+      <artifactId>reflections</artifactId>
+      <version>0.10.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.submarine</groupId>
+      <artifactId>submarine-k8s-utils</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!--  Unit Tests  -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <version>${h2-connector-java.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>${plugin.shade.version}</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>false</createDependencyReducedPom>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                  <manifestEntries>
+                    <Main-Class>org.apache.submarine.server.k8s.agent.SubmarineAgentListener</Main-Class>
+                  </manifestEntries>
+                </transformer>
+              </transformers>
+              <filters>
+                <filter>
+                  <artifact>io.fabric8:openshift-client</artifact>
+                  <excludes>
+                    <exclude>io/fabric8/kubernetes/client/Config*</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/assembly/distribution.xml b/submarine-server/server-submitter/submarine-k8s-agent/src/assembly/distribution.xml
deleted file mode 100644
index 463ffbb6..00000000
--- a/submarine-server/server-submitter/submarine-k8s-agent/src/assembly/distribution.xml
+++ /dev/null
@@ -1,69 +0,0 @@
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-
-<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
-  <id>distribution</id>
-  <formats>
-    <format>dir</format>
-    <format>tar.gz</format>
-  </formats>
-
-
-  <files>
-    <file>
-      <source>../../../LICENSE-binary</source>
-      <outputDirectory>/</outputDirectory>
-      <destName>LICENSE</destName>
-    </file>
-    <file>
-      <source>../../../NOTICE-binary</source>
-      <outputDirectory>/</outputDirectory>
-      <destName>NOTICE</destName>
-    </file>
-  </files>
-
-  <fileSets>
-    <fileSet>
-      <directory>../../../licenses-binary</directory>
-      <outputDirectory>/licenses</outputDirectory>
-    </fileSet>
-
-    <fileSet>
-      <directory>../../../conf</directory>
-      <outputDirectory>/conf</outputDirectory>
-    </fileSet>
-
-    <fileSet>
-      <directory>../../../bin</directory>
-      <outputDirectory>/bin</outputDirectory>
-    </fileSet>
-    
-    <fileSet>
-      <directory>./target</directory>
-      <outputDirectory>/lib</outputDirectory>
-      <includes>
-        <include>submarine-k8s-agent-${project.version}.jar</include>
-      </includes>
-    </fileSet>
-    
-    <fileSet>
-      <directory>./target/dependency</directory>
-      <outputDirectory>/lib</outputDirectory>
-    </fileSet>
-
-  </fileSets>
-
-</assembly>
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/assembly/src-distribution.xml b/submarine-server/server-submitter/submarine-k8s-agent/src/assembly/src-distribution.xml
deleted file mode 100644
index c3e2a895..00000000
--- a/submarine-server/server-submitter/submarine-k8s-agent/src/assembly/src-distribution.xml
+++ /dev/null
@@ -1,53 +0,0 @@
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-
-<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
-  <id>src</id>
-  <formats>
-    <format>dir</format>
-    <format>tar.gz</format>
-  </formats>
-
-  <fileSets>
-    <fileSet>
-      <directory>../</directory>
-      <useDefaultExcludes>true</useDefaultExcludes>
-      <excludes>
-        <exclude>.git/**</exclude>
-        <exclude>**/.gitignore</exclude>
-        <exclude>**/.svn</exclude>
-        <exclude>**/*.iws</exclude>
-        <exclude>**/*.ipr</exclude>
-        <exclude>**/*.iml</exclude>
-        <exclude>**/.classpath</exclude>
-        <exclude>**/.project</exclude>
-        <exclude>**/.settings</exclude>
-        <exclude>**/target/**</exclude>
-        <exclude>submodules/**/target/**</exclude>
-        <exclude>submarine-workbench/workbench-web/dist/**</exclude>
-        <exclude>submarine-workbench/workbench-web/node_modules/**</exclude>
-        <!-- until the code that does this is fixed -->
-        <exclude>**/*.log</exclude>
-        <exclude>**/build/**</exclude>
-        <exclude>**/file:/**</exclude>
-        <exclude>**/SecurityAuth.audit*</exclude>
-        <exclude>submarine-cloud/bin/**</exclude>
-        <exclude>dev-support/submarine-installer/package/hadoop/yarn/lib/native/**</exclude>
-      </excludes>
-    </fileSet>
-  </fileSets>
-
-</assembly>
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/SubmarineAgent.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/SubmarineAgent.java
deleted file mode 100644
index 7e1f4f6f..00000000
--- a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/SubmarineAgent.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.k8s.agent;
-
-import java.io.IOException;
-
-import org.apache.submarine.server.api.common.CustomResourceType;
-import org.apache.submarine.server.k8s.agent.handler.CustomResourceHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class SubmarineAgent {
-  private static final Logger LOG = LoggerFactory.getLogger(SubmarineAgent.class);
-  private String serverHost;
-  private Integer serverPort;
-  private String namespace;
-  private String customResourceType;
-  private String customResourceName;
-  private String resourceId;
-  private CustomResourceType type;
-  private CustomResourceHandler handler;
-    
-    
-  public SubmarineAgent(String serverHost, Integer serverPort, String namespace,
-            String customResourceType, String customResourceName,
-            String resourceId) throws ClassNotFoundException,
-  InstantiationException, IllegalAccessException, IOException {
-    this.serverHost = serverHost;
-    this.serverPort = serverPort;
-    this.namespace = namespace;
-    this.customResourceType = customResourceType;
-    this.customResourceName = customResourceName;
-    this.resourceId = resourceId;
-    this.type = CustomResourceType.valueOf(customResourceType);
-    this.handler = HandlerFactory.getHandler(this.type);
-    this.handler.init(serverHost, serverPort, namespace, customResourceName, resourceId);
-  }
-    
-  public void start() {
-    handler.run();
-  }
-    
-  public static void main(String[] args) 
-          throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
-    String serverHost = System.getenv("SERVER_HOST");
-    Integer serverPort = Integer.parseInt(System.getenv("SERVER_PORT"));
-    LOG.info(String.format("SERVER_HOST:%s", serverHost));
-    LOG.info(String.format("SERVER_PORT:%d", serverPort));
-        
-    String namespace = System.getenv("NAMESPACE");
-    String customResourceType = System.getenv("CUSTOM_RESOURCE_TYPE");
-    String customResourceName = System.getenv("CUSTOM_RESOURCE_NAME");
-    String customResourceId = System.getenv("CUSTOM_RESOURCE_ID");
-        
-    LOG.info(String.format("NAMESPACE:%s", namespace));
-    LOG.info(String.format("CUSTOM_RESOURCE_TYPE:%s", customResourceType));
-    LOG.info(String.format("CUSTOM_RESOURCE_NAME:%s", customResourceName));
-    LOG.info(String.format("CUSTOM_RESOURCE_ID:%s", customResourceId));
-
-    SubmarineAgent agent = new SubmarineAgent(serverHost, serverPort, namespace,
-            customResourceType, customResourceName, customResourceId);
-        
-    agent.start();
-  }
-    
-}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/SubmarineAgentListener.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/SubmarineAgentListener.java
new file mode 100644
index 00000000..1ae7ec9c
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/SubmarineAgentListener.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.k8s.utils.OwnerReferenceConfig;
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.takes.facets.fork.FkRegex;
+import org.takes.facets.fork.TkFork;
+import org.takes.http.Exit;
+import org.takes.http.FtBasic;
+
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.Set;
+
+/**
+ * Submarine agent listener
+ * Listen for changes in the associated kubeflow resources and update their status
+ */
+public class SubmarineAgentListener {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SubmarineAgentListener.class);
+
+  public static final DateTimeFormatter DTF = DateTimeFormatter.ISO_DATE_TIME;
+
+  public static void main(String[] args) throws IOException {
+    // create kubernetes client
+    KubernetesClient client = new KubernetesClientBuilder().build();
+    // create operator
+    Operator operator = new Operator(client);
+    // scan all Reconciler implemented subclasses
+    Reflections reflections = new Reflections("org.apache.submarine.server.k8s.agent");
+    Set<Class<? extends Reconciler>> reconcilers = reflections.getSubTypesOf(Reconciler.class);
+    reconcilers.forEach(reconciler ->
+        {
+          try {
+            LOGGER.info("Register {} ...", reconciler.getName());
+            operator.register(reconciler.getDeclaredConstructor().newInstance(),
+                ControllerConfigurationOverrider::watchingOnlyCurrentNamespace
+            );
+          } catch (Exception e) {
+            throw new SubmarineRuntimeException("Can not new instance " + reconciler.getName());
+          }
+        }
+    );
+    LOGGER.info("Starting agent with SUBMARINE_UID={}", OwnerReferenceConfig.getSubmarineUid());
+    // Adds a shutdown hook that automatically calls stop() when the app shuts down.
+    operator.installShutdownHook();
+    // start operator
+    operator.start();
+    // Provide a lightweight service to handle health checks
+    new FtBasic(
+            new TkFork(new FkRegex("/health", "ALL GOOD.")), 8080
+    ).start(Exit.NEVER);
+  }
+
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/CustomResourceHandler.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/CustomResourceHandler.java
deleted file mode 100644
index 4a9eb506..00000000
--- a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/CustomResourceHandler.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.k8s.agent.handler;
-
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
-import org.apache.submarine.server.k8s.agent.util.RestClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.openapi.Configuration;
-import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.apis.CustomObjectsApi;
-import io.kubernetes.client.util.ClientBuilder;
-import io.kubernetes.client.util.KubeConfig;
-import okhttp3.OkHttpClient;
-
-public abstract class CustomResourceHandler {
-  private static final Logger LOG = LoggerFactory.getLogger(CustomResourceHandler.class);
-  private static final String KUBECONFIG_ENV = "KUBECONFIG";
-    
-  protected ApiClient client = null;  
-  protected CustomObjectsApi customObjectsApi = null;
-  protected CoreV1Api coreV1Api = null;
-  protected String namespace;
-  protected String crType;
-  protected String crName;
-  protected String serverHost;
-  protected Integer serverPort;
-  protected String resourceId;
-  protected RestClient restClient;
-    
-  public CustomResourceHandler() throws IOException {
-    try {
-      String path = System.getenv(KUBECONFIG_ENV);
-      LOG.info("PATH:" + path);
-      KubeConfig config = KubeConfig.loadKubeConfig(new FileReader(path));
-      client = ClientBuilder.kubeconfig(config).build();
-    } catch (Exception e) {
-      LOG.info("Maybe in cluster mode, try to initialize the client again.");
-      try {
-        client = ClientBuilder.cluster().build();
-      } catch (IOException e1) {
-        LOG.error("Initialize K8s submitter failed. " + e.getMessage(), e1);
-        throw new SubmarineRuntimeException(500, "Initialize K8s submitter failed.");
-      }
-    } finally {
-      // let watcher can wait until the next change
-      client.setReadTimeout(0);
-      OkHttpClient httpClient = client.getHttpClient();
-      this.client.setHttpClient(httpClient);
-      Configuration.setDefaultApiClient(client);
-    }    
-    customObjectsApi = new CustomObjectsApi(client);
-    coreV1Api = new CoreV1Api(client);
-  }
-    
-  public abstract void init(String serverHost, Integer serverPort,
-        String namespace, String crName, String resourceId);
-  public abstract void run();
-
-  public String getNamespace() {
-    return namespace;
-  }
-
-  public void setNamespace(String namespace) {
-    this.namespace = namespace;
-  }
-
-  public String getCrType() {
-    return crType;
-  }
-
-  public void setCrType(String crType) {
-    this.crType = crType;
-  }
-
-  public String getCrName() {
-    return crName;
-  }
-
-  public void setCrName(String crName) {
-    this.crName = crName;
-  }
-
-  public String getServerHost() {
-    return serverHost;
-  }
-
-  public void setServerHost(String serverHost) {
-    this.serverHost = serverHost;
-  }
-
-  public Integer getServerPort() {
-    return serverPort;
-  }
-
-  public void setServerPort(Integer serverPort) {
-    this.serverPort = serverPort;
-  }
-
-  public RestClient getRestClient() {
-    return restClient;
-  }
-
-  public void setRestClient(RestClient restClient) {
-    this.restClient = restClient;
-  }
-    
-}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/NotebookHandler.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/NotebookHandler.java
deleted file mode 100644
index 6452e1ed..00000000
--- a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/NotebookHandler.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.k8s.agent.handler;
-
-import java.io.IOException;
-
-import io.kubernetes.client.openapi.models.CoreV1EventList;
-import io.kubernetes.client.openapi.models.V1Pod;
-import io.kubernetes.client.openapi.models.V1PodList;
-import io.kubernetes.client.util.generic.options.ListOptions;
-import org.apache.submarine.server.api.common.CustomResourceType;
-import org.apache.submarine.server.api.notebook.Notebook;
-import org.apache.submarine.server.k8s.agent.util.RestClient;
-import org.apache.submarine.server.submitter.k8s.model.notebook.NotebookCR;
-import org.apache.submarine.server.submitter.k8s.model.notebook.NotebookCRList;
-import org.apache.submarine.server.submitter.k8s.util.NotebookUtils;
-import io.kubernetes.client.util.generic.GenericKubernetesApi;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.models.CoreV1Event;
-import io.kubernetes.client.util.Watch.Response;
-import io.kubernetes.client.util.Watchable;
-
-public class NotebookHandler extends CustomResourceHandler {
-
-  private static final Logger LOG = LoggerFactory.getLogger(NotebookHandler.class);
-  private Watchable<CoreV1Event> watcher;
-
-  private GenericKubernetesApi<V1Pod, V1PodList> podClient;
-  private GenericKubernetesApi<CoreV1Event, CoreV1EventList> eventClient;
-  private GenericKubernetesApi<NotebookCR, NotebookCRList> notebookCRClient;
-
-  private String uid;
-
-  public NotebookHandler() throws IOException {
-    super();
-  }
-
-  @Override
-  public void init(String serverHost, Integer serverPort, String namespace,
-          String crName, String resourceId) {
-    this.serverHost = serverHost;
-    this.serverPort = serverPort;
-    this.namespace = namespace;
-    this.crName = crName;
-    this.resourceId = resourceId;
-
-    podClient =
-            new GenericKubernetesApi<>(
-                    V1Pod.class, V1PodList.class,
-                    "", "v1", "pods", client);
-    eventClient =
-            new GenericKubernetesApi<>(
-                    CoreV1Event.class, CoreV1EventList.class,
-                    "", "v1", "events", client);
-    notebookCRClient =
-            new GenericKubernetesApi<>(
-                    NotebookCR.class, NotebookCRList.class,
-                    NotebookCR.CRD_NOTEBOOK_GROUP_V1, NotebookCR.CRD_NOTEBOOK_VERSION_V1,
-                    NotebookCR.CRD_NOTEBOOK_PLURAL_V1, client);
-
-    try {
-      ListOptions listOptions = new ListOptions();
-      String podLabelSelector = String.format("%s=%s", NotebookCR.NOTEBOOK_ID, this.resourceId);
-      listOptions.setLabelSelector(podLabelSelector);
-      
-      
-      V1PodList podList = podClient.list(namespace, listOptions).throwsApiException().getObject();
-      this.uid = podList.getItems().get(podList.getItems().size() - 1).getMetadata().getUid();
-      
-      listOptions = new ListOptions();
-      String fieldSelector = String.format("involvedObject.uid=%s", this.uid);
-
-      listOptions.setFieldSelector(fieldSelector);
-      watcher = eventClient.watch(namespace, listOptions);
-
-    } catch (ApiException e) {
-      e.printStackTrace();
-    }
-    restClient = new RestClient(serverHost, serverPort);
-  }
-
-  @Override
-  public void run() {
-    Notebook notebook = null;
-    while (true) {
-      for (Response<CoreV1Event> event: watcher) {
-        String reason = event.object.getReason();
-        Object object = null;
-        try {
-          switch (reason) {
-            case "Created":
-            case "Scheduled":
-              object = notebookCRClient.get(namespace, crName).throwsApiException().getObject();
-              notebook = NotebookUtils.parseObject(object, NotebookUtils.ParseOpt.PARSE_OPT_GET);
-              notebook.setStatus(Notebook.Status.STATUS_CREATING.getValue());
-              restClient.callStatusUpdate(CustomResourceType.Notebook, this.resourceId, notebook);
-              break;
-            case "Started":
-            case "Pulled":
-              object = notebookCRClient.get(namespace, crName).throwsApiException().getObject();
-              notebook = NotebookUtils.parseObject(object, NotebookUtils.ParseOpt.PARSE_OPT_GET);
-              notebook.setStatus(Notebook.Status.STATUS_RUNNING.getValue());
-              restClient.callStatusUpdate(CustomResourceType.Notebook, this.resourceId, notebook);
-              break;
-            case "BackOff":
-            case "Failed":
-              object = notebookCRClient.get(namespace, crName).throwsApiException().getObject();
-              notebook = NotebookUtils.parseObject(object, NotebookUtils.ParseOpt.PARSE_OPT_GET);
-              notebook.setStatus(Notebook.Status.STATUS_FAILED.getValue());
-              restClient.callStatusUpdate(CustomResourceType.Notebook, this.resourceId, notebook);
-              break;
-            case "Pulling":
-              object = notebookCRClient.get(namespace, crName).throwsApiException().getObject();
-              notebook = NotebookUtils.parseObject(object, NotebookUtils.ParseOpt.PARSE_OPT_GET);
-              notebook.setStatus(Notebook.Status.STATUS_PULLING.getValue());
-              restClient.callStatusUpdate(CustomResourceType.Notebook, this.resourceId, notebook);
-              break;
-            case "Killing":
-              object = notebookCRClient.get(namespace, crName).throwsApiException().getObject();
-              notebook = NotebookUtils.parseObject(object, NotebookUtils.ParseOpt.PARSE_OPT_GET);
-              notebook.setStatus(Notebook.Status.STATUS_TERMINATING.getValue());
-              restClient.callStatusUpdate(CustomResourceType.Notebook, this.resourceId, notebook);
-              LOG.info("Receive terminating event, exit progress");
-              return;
-            default:
-              LOG.info(String.format("Unprocessed event type:%s", reason));
-          }
-        } catch (ApiException e) {
-          LOG.error("error while accessing k8s", e);
-        }
-      }
-    }
-  }
-}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/PyTorchJobHandler.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/PyTorchJobHandler.java
deleted file mode 100644
index ffcc6246..00000000
--- a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/PyTorchJobHandler.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.k8s.agent.handler;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.submarine.server.api.common.CustomResourceType;
-import org.apache.submarine.server.api.experiment.Experiment;
-import org.apache.submarine.server.k8s.agent.util.RestClient;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJobList;
-import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.reflect.TypeToken;
-
-import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.models.CoreV1Event;
-import io.kubernetes.client.openapi.models.V1JobCondition;
-import io.kubernetes.client.util.Watch.Response;
-import io.kubernetes.client.util.Watch;
-import io.kubernetes.client.util.Watchable;
-import io.kubernetes.client.util.generic.GenericKubernetesApi;
-import okhttp3.Call;
-
-public class PyTorchJobHandler extends CustomResourceHandler {
-  private static final Logger LOG = LoggerFactory.getLogger(PyTorchJobHandler.class);
-  private GenericKubernetesApi<PyTorchJob, PyTorchJobList> pytorchJobClient;
-  private Watchable<CoreV1Event> watcher;
-  public PyTorchJobHandler() throws IOException {
-    super();
-  }
-
-
-  @Override
-  public void init(String serverHost, Integer serverPort,
-          String namespace, String crName, String resourceId) {
-    this.serverHost = serverHost;
-    this.serverPort = serverPort;
-    this.namespace = namespace;
-    this.crName = crName;
-    this.resourceId = resourceId;
-    pytorchJobClient =
-            new GenericKubernetesApi<>(
-                    PyTorchJob.class, PyTorchJobList.class,
-                    PyTorchJob.CRD_PYTORCH_GROUP_V1, PyTorchJob.CRD_PYTORCH_VERSION_V1,
-                    PyTorchJob.CRD_PYTORCH_PLURAL_V1, client);
-    try {
-      String fieldSelector = String.format("involvedObject.name=%s", resourceId);
-      LOG.info("fieldSelector:" + fieldSelector);
-      Call call =  coreV1Api.listNamespacedEventCall(namespace, null, null, null, fieldSelector,
-            null, null, null, null, null, true, null);
-
-      watcher = Watch.createWatch(client, call, new TypeToken<Response<CoreV1Event>>(){}.getType());
-    } catch (ApiException e) {
-      e.printStackTrace();
-    }
-    restClient = new RestClient(serverHost, serverPort);
-  }
-
-  @Override
-  public void run() {
-    while (true) {
-      for (Response<CoreV1Event> event: watcher) {
-        try {
-          PyTorchJob job = pytorchJobClient.get(this.namespace, this.resourceId).getObject();
-          List<V1JobCondition> conditionList = job.getStatus().getConditions();
-          if (conditionList == null || conditionList.isEmpty()) continue;
-          V1JobCondition lastCondition = conditionList.get(conditionList.size() - 1);
-          Experiment experiment = MLJobConverter.toJobFromMLJob(job);
-
-          this.restClient.callStatusUpdate(CustomResourceType.PyTorchJob, resourceId, experiment);
-          LOG.info(String.format("receiving condition:%s", lastCondition.getReason()));
-          LOG.info(String.format("current status of PyTorchjob:%s is %s", resourceId,
-              experiment.getStatus()));
-
-          // The reason value can refer to https://github.com/kubeflow/common/blob/master/pkg/util/status.go
-          switch (Objects.requireNonNull(lastCondition.getReason())) {
-            case "JobSucceeded":
-              LOG.info(String.format("PyTorchjob:%s is succeeded, exit", this.resourceId));
-              return;
-            case "JobFailed":
-              LOG.info(String.format("PyTorchjob:%s is failed, exit", this.resourceId));
-              return;
-            default:
-              break;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while processing the PyTorch event! " + e.getMessage(), e);
-        }
-      }
-    }
-  }
-}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/TFJobHandler.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/TFJobHandler.java
deleted file mode 100644
index dfd870ca..00000000
--- a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/TFJobHandler.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.k8s.agent.handler;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.submarine.server.api.common.CustomResourceType;
-import org.apache.submarine.server.api.experiment.Experiment;
-import org.apache.submarine.server.k8s.agent.util.RestClient;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJobList;
-import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.reflect.TypeToken;
-
-import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.models.CoreV1Event;
-import io.kubernetes.client.openapi.models.V1JobCondition;
-import io.kubernetes.client.util.Watch.Response;
-import io.kubernetes.client.util.Watch;
-import io.kubernetes.client.util.Watchable;
-import io.kubernetes.client.util.generic.GenericKubernetesApi;
-import okhttp3.Call;
-
-public class TFJobHandler extends CustomResourceHandler {
-  private static final Logger LOG = LoggerFactory.getLogger(TFJobHandler.class);
-  private GenericKubernetesApi<TFJob, TFJobList> tfJobClient;
-  private Watchable<CoreV1Event> watcher;
-  public TFJobHandler() throws IOException {
-    super();
-  }
-
-  @Override
-  public void init(String serverHost, Integer serverPort,
-          String namespace, String crName, String resourceId) {
-    this.serverHost = serverHost;
-    this.serverPort = serverPort;
-    this.namespace = namespace;
-    this.crName = crName;
-    this.resourceId = resourceId;
-    tfJobClient =
-            new GenericKubernetesApi<>(
-                    TFJob.class, TFJobList.class,
-                    TFJob.CRD_TF_GROUP_V1, TFJob.CRD_TF_VERSION_V1,
-                    TFJob.CRD_TF_PLURAL_V1, client);
-    try {
-      String fieldSelector = String.format("involvedObject.name=%s", resourceId);
-      LOG.info("fieldSelector:" + fieldSelector);
-      Call call =  coreV1Api.listNamespacedEventCall(namespace, null, null, null, fieldSelector,
-            null, null, null, null, null, true, null);
-
-      watcher = Watch.createWatch(client, call, new TypeToken<Response<CoreV1Event>>(){}.getType());
-    } catch (ApiException e) {
-      e.printStackTrace();
-    }
-    restClient = new RestClient(serverHost, serverPort);
-  }
-
-  @Override
-  public void run() {
-    while (true) {
-      for (Response<CoreV1Event> event: watcher) {
-        try {
-          TFJob job = tfJobClient.get(this.namespace, this.resourceId).getObject();
-          List<V1JobCondition> conditionList = job.getStatus().getConditions();
-          if (conditionList == null || conditionList.isEmpty()) continue;
-          V1JobCondition lastCondition = conditionList.get(conditionList.size() - 1);
-          Experiment experiment = MLJobConverter.toJobFromMLJob(job);
-
-          this.restClient.callStatusUpdate(CustomResourceType.TFJob, resourceId, experiment);
-          LOG.info(String.format("receiving condition:%s", lastCondition.getReason()));
-          LOG.info(String.format("current status of tfjob:%s is %s", resourceId, experiment.getStatus()));
-
-          // The reason value can refer to https://github.com/kubeflow/common/blob/master/pkg/util/status.go
-          switch (Objects.requireNonNull(lastCondition.getReason())) {
-            case "JobSucceeded":
-              LOG.info(String.format("TfJob:%s is succeeded, exit", this.resourceId));
-              return;
-            case "JobFailed":
-              LOG.info(String.format("TfJob:%s is failed, exit", this.resourceId));
-              return;
-            default:
-              break;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while processing the TfJob event! " + e.getMessage(), e);
-        }
-      }
-    }
-  }
-}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/notebook/NotebookResource.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/notebook/NotebookResource.java
new file mode 100644
index 00000000..8ff9e310
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/notebook/NotebookResource.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.model.notebook;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import io.fabric8.kubernetes.api.model.Namespaced;
+import io.fabric8.kubernetes.client.CustomResource;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.Kind;
+import io.fabric8.kubernetes.model.annotation.Version;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.submarine.server.k8s.agent.model.notebook.status.NotebookStatus;
+
+@JsonDeserialize(
+        using = JsonDeserializer.None.class
+)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonPropertyOrder({"apiVersion", "kind", "metadata", "spec", "status"})
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Version("v1")
+@Group("kubeflow.org")
+@Kind("Notebook")
+public class NotebookResource extends CustomResource<Void, NotebookStatus> implements Namespaced {
+
+  public String toString() {
+    return ToStringBuilder.reflectionToString(this, ToStringStyle.JSON_STYLE);
+  }
+
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/notebook/status/NotebookCondition.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/notebook/status/NotebookCondition.java
new file mode 100644
index 00000000..b54194fb
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/notebook/status/NotebookCondition.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.model.notebook.status;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import io.fabric8.kubernetes.api.model.KubernetesResource;
+
+import java.util.Objects;
+
+@JsonDeserialize(
+        using = JsonDeserializer.None.class
+)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonPropertyOrder({"lastProbeTime", "message", "reason", "status", "type"})
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class NotebookCondition implements KubernetesResource {
+
+  private String lastProbeTime;
+
+  private String message;
+
+  private String reason;
+
+  private String status;
+
+  private String type;
+
+  public String getLastProbeTime() {
+    return lastProbeTime;
+  }
+
+  public void setLastProbeTime(String lastProbeTime) {
+    this.lastProbeTime = lastProbeTime;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+  public String getReason() {
+    return reason;
+  }
+
+  public void setReason(String reason) {
+    this.reason = reason;
+  }
+
+  public String getStatus() {
+    return status;
+  }
+
+  public void setStatus(String status) {
+    this.status = status;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  @Override
+  public String toString() {
+    return "NotebookCondition{" +
+            "lastProbeTime='" + lastProbeTime + '\'' +
+            ", message='" + message + '\'' +
+            ", reason='" + reason + '\'' +
+            ", status='" + status + '\'' +
+            ", type='" + type + '\'' +
+            '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    NotebookCondition that = (NotebookCondition) o;
+    return Objects.equals(lastProbeTime, that.lastProbeTime)
+            && Objects.equals(message, that.message)
+            && Objects.equals(reason, that.reason)
+            && Objects.equals(status, that.status)
+            && Objects.equals(type, that.type);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(lastProbeTime, message, reason, status, type);
+  }
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/notebook/status/NotebookStatus.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/notebook/status/NotebookStatus.java
new file mode 100644
index 00000000..8fc59faf
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/notebook/status/NotebookStatus.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.model.notebook.status;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import io.fabric8.kubernetes.api.model.ContainerState;
+import io.fabric8.kubernetes.api.model.KubernetesResource;
+
+import java.util.List;
+import java.util.Objects;
+
+@JsonDeserialize(
+        using = JsonDeserializer.None.class
+)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonPropertyOrder({"conditions", "readyReplicas", "containerState"})
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class NotebookStatus implements KubernetesResource {
+
+  private List<NotebookCondition> conditions;
+
+  private Integer readyReplicas;
+
+  private ContainerState containerState;
+
+  public List<NotebookCondition> getConditions() {
+    return conditions;
+  }
+
+  public void setConditions(List<NotebookCondition> conditions) {
+    this.conditions = conditions;
+  }
+
+  public Integer getReadyReplicas() {
+    return readyReplicas;
+  }
+
+  public void setReadyReplicas(Integer readyReplicas) {
+    this.readyReplicas = readyReplicas;
+  }
+
+  public ContainerState getContainerState() {
+    return containerState;
+  }
+
+  public void setContainerState(ContainerState containerState) {
+    this.containerState = containerState;
+  }
+
+  @Override
+  public String toString() {
+    return "NotebookStatus{" +
+            "conditions=" + conditions +
+            ", readyReplicas=" + readyReplicas +
+            ", containerState=" + containerState +
+            '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    NotebookStatus that = (NotebookStatus) o;
+    return Objects.equals(conditions, that.conditions)
+            && Objects.equals(readyReplicas, that.readyReplicas)
+            && Objects.equals(containerState, that.containerState);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(conditions, readyReplicas, containerState);
+  }
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/HandlerFactory.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/JobResource.java
similarity index 51%
rename from submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/HandlerFactory.java
rename to submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/JobResource.java
index 9f030fbb..2dbb47b0 100644
--- a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/HandlerFactory.java
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/JobResource.java
@@ -17,20 +17,31 @@
  * under the License.
  */
 
-package org.apache.submarine.server.k8s.agent;
+package org.apache.submarine.server.k8s.agent.model.training;
 
-import org.apache.submarine.server.api.common.CustomResourceType;
-import org.apache.submarine.server.k8s.agent.handler.CustomResourceHandler;
+import io.fabric8.kubernetes.api.model.Namespaced;
+import io.fabric8.kubernetes.client.CustomResource;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.submarine.server.k8s.agent.model.training.status.JobStatus;
 
-public class HandlerFactory {
+/**
+ * Kubeflow training operator custom resource
+ */
+public class JobResource extends CustomResource<Void, JobStatus> implements Namespaced {
+
+  @Override
+  public void setSpec(Void spec) {
+    super.setSpec(spec);
+  }
 
-  private static String HANDLER_POSTFIX = "Handler";
-  private static String HANDLER_PACKAGE = "org.apache.submarine.server.k8s.agent.handler";
-    
-  public static CustomResourceHandler getHandler(CustomResourceType crType) 
-          throws ClassNotFoundException, InstantiationException, IllegalAccessException {
-    String handlerClassStr = HANDLER_PACKAGE + "." +  crType.toString() + HANDLER_POSTFIX;
-    Class handlerClass = Class.forName(handlerClassStr);
-    return (CustomResourceHandler) handlerClass.newInstance();
+  @Override
+  public void setStatus(JobStatus status) {
+    super.setStatus(status);
   }
+
+  public String toString() {
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.JSON_STYLE);
+    }
+
 }
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/resource/PyTorchJob.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/resource/PyTorchJob.java
new file mode 100644
index 00000000..dff1f2bc
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/resource/PyTorchJob.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.model.training.resource;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.Kind;
+import io.fabric8.kubernetes.model.annotation.Version;
+import org.apache.submarine.server.k8s.agent.model.training.JobResource;
+
+@JsonDeserialize(
+        using = JsonDeserializer.None.class
+)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonPropertyOrder({"apiVersion", "kind", "metadata", "spec", "status"})
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Version("v1")
+@Group("kubeflow.org")
+@Kind("PyTorchJob")
+public class PyTorchJob extends JobResource {
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/resource/TFJob.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/resource/TFJob.java
new file mode 100644
index 00000000..05e708b7
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/resource/TFJob.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.model.training.resource;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.Kind;
+import io.fabric8.kubernetes.model.annotation.Version;
+import org.apache.submarine.server.k8s.agent.model.training.JobResource;
+
+@JsonDeserialize(
+        using = JsonDeserializer.None.class
+)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonPropertyOrder({"apiVersion", "kind", "metadata", "spec", "status"})
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Version("v1")
+@Group("kubeflow.org")
+@Kind("TFJob")
+public class TFJob extends JobResource {
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/resource/XGBoostJob.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/resource/XGBoostJob.java
new file mode 100644
index 00000000..291475bb
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/resource/XGBoostJob.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.model.training.resource;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.Kind;
+import io.fabric8.kubernetes.model.annotation.Version;
+import org.apache.submarine.server.k8s.agent.model.training.JobResource;
+
+@JsonDeserialize(
+        using = JsonDeserializer.None.class
+)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonPropertyOrder({"apiVersion", "kind", "metadata", "spec", "status"})
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Version("v1")
+@Group("kubeflow.org")
+@Kind("XGBoostJob")
+public class XGBoostJob extends JobResource {
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/status/JobCondition.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/status/JobCondition.java
new file mode 100644
index 00000000..7bd180ec
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/status/JobCondition.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.model.training.status;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import io.fabric8.kubernetes.api.model.KubernetesResource;
+
+import java.util.Objects;
+
+@JsonDeserialize(
+        using = JsonDeserializer.None.class
+)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonPropertyOrder({"lastTransitionTime", "lastUpdateTime", "message", "reason", "status", "type"})
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JobCondition implements KubernetesResource {
+
+  private String lastTransitionTime;
+
+  private String lastUpdateTime;
+
+  private String message;
+
+  private String reason;
+
+  private String status;
+
+  private String type;
+
+  public String getLastTransitionTime() {
+    return lastTransitionTime;
+  }
+
+  public void setLastTransitionTime(String lastTransitionTime) {
+    this.lastTransitionTime = lastTransitionTime;
+  }
+
+  public String getLastUpdateTime() {
+    return lastUpdateTime;
+  }
+
+  public void setLastUpdateTime(String lastUpdateTime) {
+    this.lastUpdateTime = lastUpdateTime;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+  public String getReason() {
+    return reason;
+  }
+
+  public void setReason(String reason) {
+    this.reason = reason;
+  }
+
+  public String getStatus() {
+    return status;
+  }
+
+  public void setStatus(String status) {
+    this.status = status;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  @Override
+  public String toString() {
+    return "JobCondition{" +
+            "lastTransitionTime='" + lastTransitionTime + '\'' +
+            ", lastUpdateTime='" + lastUpdateTime + '\'' +
+            ", message='" + message + '\'' +
+            ", reason='" + reason + '\'' +
+            ", status='" + status + '\'' +
+            ", type='" + type + '\'' +
+            '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    JobCondition that = (JobCondition) o;
+    return Objects.equals(lastTransitionTime, that.lastTransitionTime)
+            && Objects.equals(lastUpdateTime, that.lastUpdateTime)
+            && Objects.equals(message, that.message)
+            && Objects.equals(reason, that.reason)
+            && Objects.equals(status, that.status)
+            && Objects.equals(type, that.type);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(lastTransitionTime, lastUpdateTime, message, reason, status, type);
+  }
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/status/JobStatus.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/status/JobStatus.java
new file mode 100644
index 00000000..c0d4f170
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/status/JobStatus.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.model.training.status;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import io.fabric8.kubernetes.api.model.KubernetesResource;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonDeserialize(
+        using = JsonDeserializer.None.class
+)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonPropertyOrder({"apiVersion", "kind", "metadata", "completionTime", "conditions", "lastReconcileTime", "replicaStatuses", "startTime"})
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JobStatus implements KubernetesResource {
+
+    private String completionTime;
+
+    private List<JobCondition> conditions;
+
+    private String lastReconcileTime;
+
+    private Map<String, ReplicaStatus> replicaStatuses;
+
+    private String startTime;
+
+    public String getCompletionTime() {
+        return completionTime;
+    }
+
+    public void setCompletionTime(String completionTime) {
+        this.completionTime = completionTime;
+    }
+
+    public List<JobCondition> getConditions() {
+        return conditions;
+    }
+
+    public void setConditions(List<JobCondition> conditions) {
+        this.conditions = conditions;
+    }
+
+    public String getLastReconcileTime() {
+        return lastReconcileTime;
+    }
+
+    public void setLastReconcileTime(String lastReconcileTime) {
+        this.lastReconcileTime = lastReconcileTime;
+    }
+
+    public Map<String, ReplicaStatus> getReplicaStatuses() {
+        return replicaStatuses;
+    }
+
+    public void setReplicaStatuses(Map<String, ReplicaStatus> replicaStatuses) {
+        this.replicaStatuses = replicaStatuses;
+    }
+
+    public String getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(String startTime) {
+        this.startTime = startTime;
+    }
+
+    @Override
+    public String toString() {
+        return "JobStatus{" +
+                "completionTime='" + completionTime + '\'' +
+                ", conditions=" + conditions +
+                ", lastReconcileTime='" + lastReconcileTime + '\'' +
+                ", replicaStatuses=" + replicaStatuses +
+                ", startTime='" + startTime + '\'' +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        JobStatus jobStatus = (JobStatus) o;
+        return Objects.equals(completionTime, jobStatus.completionTime)
+                && Objects.equals(conditions, jobStatus.conditions)
+                && Objects.equals(lastReconcileTime, jobStatus.lastReconcileTime)
+                && Objects.equals(replicaStatuses, jobStatus.replicaStatuses)
+                && Objects.equals(startTime, jobStatus.startTime);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(completionTime, conditions, lastReconcileTime, replicaStatuses, startTime);
+    }
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/status/ReplicaStatus.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/status/ReplicaStatus.java
new file mode 100644
index 00000000..80a7a276
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/model/training/status/ReplicaStatus.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.model.training.status;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import io.fabric8.kubernetes.api.model.KubernetesResource;
+
+import java.util.Objects;
+
+@JsonDeserialize(
+        using = JsonDeserializer.None.class
+)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonPropertyOrder({"apiVersion", "kind", "metadata", "active", "failed", "succeeded"})
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ReplicaStatus implements KubernetesResource {
+
+  public ReplicaStatus() {
+  }
+
+  public ReplicaStatus(Integer active, Integer failed, Integer succeeded) {
+    this.active = active;
+    this.failed = failed;
+    this.succeeded = succeeded;
+  }
+
+  private Integer active;
+
+    private Integer failed;
+
+    private Integer succeeded;
+
+    public Integer getActive() {
+        return active;
+    }
+
+    public void setActive(Integer active) {
+        this.active = active;
+    }
+
+    public Integer getFailed() {
+        return failed;
+    }
+
+    public void setFailed(Integer failed) {
+        this.failed = failed;
+    }
+
+    public Integer getSucceeded() {
+        return succeeded;
+    }
+
+    public void setSucceeded(Integer succeeded) {
+        this.succeeded = succeeded;
+    }
+
+    @Override
+    public String toString() {
+        return "ReplicaStatus{" +
+                "active=" + active +
+                ", failed=" + failed +
+                ", succeeded=" + succeeded +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ReplicaStatus that = (ReplicaStatus) o;
+        return Objects.equals(active, that.active)
+                && Objects.equals(failed, that.failed)
+                && Objects.equals(succeeded, that.succeeded);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(active, failed, succeeded);
+    }
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/JobReconciler.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/JobReconciler.java
new file mode 100644
index 00000000..306586e2
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/JobReconciler.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.reconciler;
+
+import io.fabric8.kubernetes.api.model.OwnerReference;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.database.experiment.mappers.ExperimentMapper;
+import org.apache.submarine.server.database.utils.MyBatisUtil;
+import org.apache.submarine.server.k8s.agent.model.training.JobResource;
+import org.apache.submarine.server.k8s.agent.model.training.status.JobCondition;
+import org.apache.submarine.server.k8s.utils.OwnerReferenceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.submarine.server.k8s.agent.SubmarineAgentListener.DTF;
+
+/**
+ * Training Operator CR Job Reconciler
+ */
+public abstract class JobReconciler<T extends JobResource> {
+
+  public abstract CustomResourceType type();
+
+  private final Logger LOGGER = LoggerFactory.getLogger(getClass());
+
+  /**
+   * Update experiment status to 'Created'
+   */
+  protected void create(String id, Date acceptedTime) {
+    try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
+      ExperimentMapper mapper = sqlSession.getMapper(ExperimentMapper.class);
+      mapper.create(id, acceptedTime);
+      sqlSession.commit();
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Update experiment status to 'Succeeded'
+   */
+  protected void succeed(String id, Date finishedTime) {
+    try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
+      ExperimentMapper mapper = sqlSession.getMapper(ExperimentMapper.class);
+      mapper.succeed(id, finishedTime);
+      sqlSession.commit();
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Update experiment status to 'Failed'
+   */
+  protected void failed(String id, Date finishedTime) {
+    try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
+      ExperimentMapper mapper = sqlSession.getMapper(ExperimentMapper.class);
+      mapper.failed(id, finishedTime);
+      sqlSession.commit();
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Update experiment status to 'Running'
+   */
+  protected void running(String id, Date runningTime) {
+    try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
+      ExperimentMapper mapper = sqlSession.getMapper(ExperimentMapper.class);
+      mapper.running(id, runningTime);
+      sqlSession.commit();
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Trigger status
+   */
+  protected void triggerStatus(T resource) {
+    LOGGER.debug("Reconciling {}: {}", type(), resource);
+    if (!resource.hasOwnerReferenceFor(OwnerReferenceConfig.getSubmarineUid())) {
+      LOGGER.trace("OwnerReference is {}, Skip the processing of this job",
+          resource.getMetadata().getOwnerReferences().stream()
+              .map(OwnerReference::getUid).findFirst().orElse(null));
+      return;
+    }
+    /*
+     * get conditions, Here is an example yaml of a state
+     * status:
+     *   completionTime: "2022-11-23T02:23:21Z"
+     *   conditions:
+     *   - lastTransitionTime: "2022-11-23T02:20:51Z"
+     *     lastUpdateTime: "2022-11-23T02:20:51Z"
+     *     message: TFJob experiment-1669169951603-0001 is created.
+     *     reason: TFJobCreated
+     *     status: "True"
+     *     type: Created
+     *   - lastTransitionTime: "2022-11-23T02:20:52Z"
+     *     lastUpdateTime: "2022-11-23T02:20:52Z"
+     *     message: TFJob submarine-user-test/experiment-1669169951603-0001 is running.
+     *     reason: TFJobRunning
+     *     status: "False"
+     *     type: Running
+     *   - lastTransitionTime: "2022-11-23T02:23:21Z"
+     *     lastUpdateTime: "2022-11-23T02:23:21Z"
+     *     message: TFJob submarine-user-test/experiment-1669169951603-0001 successfully
+     *       completed.
+     *     reason: TFJobSucceeded
+     *     status: "True"
+     *     type: Succeeded
+     *   replicaStatuses:
+     *     Worker:
+     *       succeeded: 2
+     *   startTime: "2022-11-23T02:20:51Z"
+     */
+    if (resource.getStatus() == null) return;
+    List<JobCondition> conditions = resource.getStatus().getConditions();
+    // find experiment name/experiment_id
+    String name = resource.getMetadata().getName();
+    if (conditions == null || conditions.isEmpty()) {
+      LOGGER.warn("{} conditions is empty, skip ...", name);
+    } else {
+      // get condition and update experiment
+      JobCondition lastCondition = conditions.get(conditions.size() - 1);
+      // The reason value can refer to https://github.com/kubeflow/common/blob/master/pkg/util/status.go
+      String reason = Objects.requireNonNull(lastCondition.getReason());
+      // The type value can refer to https://github.com/kubeflow/common/blob/master/pkg/apis/common/v1/types.go#L112
+      String type = Objects.requireNonNull(lastCondition.getType());
+      // time
+      ZonedDateTime zdt = ZonedDateTime.parse(lastCondition.getLastTransitionTime(), DTF);
+      Date date = Date.from(zdt.toInstant());
+      LOGGER.info("current type/status/reason of {} is {} / {} / {}",
+          name, type, lastCondition.getStatus(), reason);
+      switch (type) {
+        case "Created":
+          create(name, date);
+          break;
+        case "Restarting":
+        case "Running":
+          running(name, date);
+          break;
+        case "Succeeded":
+          succeed(name, date);
+          break;
+        case "Failed":
+          failed(name, date);
+          break;
+        default:
+          LOGGER.warn("Unprocessed event type: {}, skip it...", type);
+          break;
+      }
+    }
+  }
+
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/NotebookReconciler.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/NotebookReconciler.java
new file mode 100644
index 00000000..4bd08088
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/NotebookReconciler.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.reconciler;
+
+import io.fabric8.kubernetes.api.model.OwnerReference;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.submarine.server.api.notebook.Notebook;
+import org.apache.submarine.server.database.notebook.mappers.NotebookMapper;
+import org.apache.submarine.server.database.utils.MyBatisUtil;
+import org.apache.submarine.server.k8s.agent.model.notebook.NotebookResource;
+import org.apache.submarine.server.k8s.agent.model.notebook.status.NotebookCondition;
+import org.apache.submarine.server.k8s.utils.OwnerReferenceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.submarine.server.k8s.agent.SubmarineAgentListener.DTF;
+
+/**
+ * Notebook Reconciler
+ * <p>
+ * Submarine will add `notebook-id` and `notebook-owner-id` labels when creating the notebook,
+ * so we need to do the filtering.
+ * <p>
+ * Label selectors reference:
+ * https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#api
+ */
+@ControllerConfiguration(
+    labelSelector = "notebook-id,notebook-owner-id",
+    generationAwareEventProcessing = false
+)
+public class NotebookReconciler implements Reconciler<NotebookResource> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(NotebookReconciler.class);
+
+  /* reasons */
+  private static final String CREATING_REASON = "The notebook instance is creating";
+  private static final String RUNNING_REASON = "The notebook instance is running";
+  private static final String FAILED_REASON = "The notebook instance is failed";
+  private static final String TERMINATING_REASON = "The notebook instance is terminating";
+
+  @Override
+  public UpdateControl<NotebookResource> reconcile(NotebookResource notebook,
+                                                   Context<NotebookResource> context) {
+    LOGGER.debug("Reconciling Notebook: {}", notebook);
+    if (!notebook.hasOwnerReferenceFor(OwnerReferenceConfig.getSubmarineUid())) {
+      LOGGER.trace("OwnerReference is {}, Skip the processing of this notebook",
+          notebook.getMetadata().getOwnerReferences() == null ? "" :
+              notebook.getMetadata().getOwnerReferences().stream()
+                  .map(OwnerReference::getUid).findFirst().orElse(null));
+    } else {
+      triggerStatus(notebook);
+    }
+    return UpdateControl.noUpdate();
+  }
+
+  /**
+   * Trigger status
+   */
+  private void triggerStatus(NotebookResource notebook) {
+    if (notebook.getStatus() == null) return;
+    List<NotebookCondition> conditions = notebook.getStatus().getConditions();
+    // find notebook name/notebook_id
+    String name = notebook.getMetadata().getName();
+    if (conditions == null || conditions.isEmpty()) {
+      LOGGER.warn("{} conditions is empty, skip ...", name);
+    } else {
+      /*
+       * get conditions and update notebook, Here is an example yaml of a state
+       * status:
+       *   conditions:
+       *   - lastProbeTime: "2022-11-24T01:07:12Z"
+       *     type: Running
+       *   - lastProbeTime: "2022-11-24T01:07:07Z"
+       *     message: Error
+       *     reason: Error
+       *     type: Terminated
+       *   - lastProbeTime: "2022-11-23T10:24:57Z"
+       *     type: Running
+       *   - lastProbeTime: "2022-11-23T10:24:36Z"
+       *     reason: PodInitializing
+       *     type: Waiting
+       *   containerState:
+       *     running:
+       *       startedAt: "2022-11-24T01:07:00Z"
+       *   readyReplicas: 1
+       */
+      // get sorted latest status
+      // Sometimes the status will be out of order after the notebook-controller restarts
+      NotebookCondition lastCondition = conditions.stream()
+          .max((c1, c2) -> getLastProbeTime(c1).compareTo(getLastProbeTime(c2))).get();
+      // The type value can refer to
+      // https://github.com/kubeflow/kubeflow/blob/master/components/notebook-controller/api/v1/notebook_types.go#L48
+      // Possible values are Running|Waiting|Terminated
+      String type = Objects.requireNonNull(lastCondition.getType());
+      // The reason value can refer to
+      // https://github.com/kubeflow/kubeflow/blob/master/components/notebook-controller/api/v1/notebook_types.go#L46
+      // it may be optional
+      String reason = getReason(lastCondition);
+      // time
+      Date date = getLastProbeTime(lastCondition);
+      LOGGER.info("current type/status/reason of {} is {} / {} / {}",
+          name, type, lastCondition.getStatus(), reason);
+      String id = notebook.getMetadata().getLabels().get("notebook-id");
+      switch (reason) {
+        case "Created":
+        case "Scheduled":
+          updateNotebookStatus(id, Notebook.Status.STATUS_CREATING, CREATING_REASON, date);
+          break;
+        case "Started":
+        case "Pulled":
+          updateNotebookStatus(id, Notebook.Status.STATUS_RUNNING, RUNNING_REASON, date);
+          break;
+        case "BackOff":
+        case "Failed":
+          updateNotebookStatus(id, Notebook.Status.STATUS_FAILED, FAILED_REASON, date);
+          break;
+        case "Pulling":
+          updateNotebookStatus(id, Notebook.Status.STATUS_PULLING, CREATING_REASON, date);
+          break;
+        case "Killing":
+          updateNotebookStatus(id, Notebook.Status.STATUS_TERMINATING, TERMINATING_REASON, date);
+          break;
+        default:
+          LOGGER.warn("Unprocessed event type: {}, skip it...", type);
+          break;
+      }
+    }
+  }
+
+  /**
+   * Get condition reason
+   */
+  private String getReason(NotebookCondition condition) {
+    String reason = condition.getReason();
+    if (reason == null || reason.isEmpty()) {
+      switch (condition.getType()) {
+        case "Running":
+          reason = "Started";
+          break;
+        case "Terminated":
+          reason = "Killing";
+          break;
+        default:
+          reason = "Waiting";
+          break;
+      }
+    }
+    return reason;
+  }
+
+  /**
+   * Get condition lastProbeTime with date type
+   */
+  private Date getLastProbeTime(NotebookCondition condition) {
+    ZonedDateTime zdt = ZonedDateTime.parse(condition.getLastProbeTime(), DTF);
+    return Date.from(zdt.toInstant());
+  }
+
+  /**
+   * Update notebook status
+   */
+  private void updateNotebookStatus(String id, Notebook.Status status, String reason, Date updateTime) {
+    try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
+      NotebookMapper mapper = sqlSession.getMapper(NotebookMapper.class);
+      mapper.updateStatus(id, status.getValue(), reason, updateTime);
+      sqlSession.commit();
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+  }
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/PyTorchJobReconciler.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/PyTorchJobReconciler.java
new file mode 100644
index 00000000..b02bda9d
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/PyTorchJobReconciler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.reconciler;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.k8s.agent.model.training.resource.PyTorchJob;
+
+/**
+ * PyTorch Job Reconciler
+ * <p>
+ * Submarine will add `submarine-experiment-name` label when creating the experiment,
+ * so we need to do the filtering.
+ * <p>
+ * Label selectors reference:
+ * https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#api
+ */
+@ControllerConfiguration(
+    labelSelector = "submarine-experiment-name",
+    generationAwareEventProcessing = false
+)
+public class PyTorchJobReconciler extends JobReconciler<PyTorchJob> implements Reconciler<PyTorchJob> {
+
+  @Override
+  public UpdateControl<PyTorchJob> reconcile(PyTorchJob pyTorchJob, Context<PyTorchJob> context) {
+    triggerStatus(pyTorchJob);
+    return UpdateControl.noUpdate();
+  }
+
+  @Override
+  public CustomResourceType type() {
+    return CustomResourceType.PyTorchJob;
+  }
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/TFJobReconciler.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/TFJobReconciler.java
new file mode 100644
index 00000000..ccdd2304
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/TFJobReconciler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.reconciler;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.k8s.agent.model.training.resource.TFJob;
+
+/**
+ * TF Job Reconciler
+ * <p>
+ * Submarine will add `submarine-experiment-name` label when creating the experiment,
+ * so we need to do the filtering.
+ * <p>
+ * Label selectors reference:
+ * https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#api
+ */
+@ControllerConfiguration(
+    labelSelector = "submarine-experiment-name",
+    generationAwareEventProcessing = false
+)
+public class TFJobReconciler extends JobReconciler<TFJob> implements Reconciler<TFJob> {
+
+  @Override
+  public UpdateControl<TFJob> reconcile(TFJob tfJob, Context<TFJob> context) {
+    triggerStatus(tfJob);
+    return UpdateControl.noUpdate();
+  }
+
+  @Override
+  public CustomResourceType type() {
+    return CustomResourceType.TFJob;
+  }
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/XGBoostJobReconciler.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/XGBoostJobReconciler.java
new file mode 100644
index 00000000..c3510887
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/reconciler/XGBoostJobReconciler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.reconciler;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.k8s.agent.model.training.resource.XGBoostJob;
+
+/**
+ * XGBoost Job Reconciler
+ * <p>
+ * Submarine will add `submarine-experiment-name` label when creating the experiment,
+ * so we need to do the filtering.
+ * <p>
+ * Label selectors reference:
+ * https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#api
+ */
+@ControllerConfiguration(
+    labelSelector = "submarine-experiment-name",
+    generationAwareEventProcessing = false
+)
+public class XGBoostJobReconciler extends JobReconciler<XGBoostJob> implements Reconciler<XGBoostJob> {
+
+  @Override
+  public UpdateControl<XGBoostJob> reconcile(XGBoostJob xgBoostJob, Context<XGBoostJob> context) throws Exception {
+    triggerStatus(xgBoostJob);
+    return UpdateControl.noUpdate();
+  }
+
+  @Override
+  public CustomResourceType type() {
+    return CustomResourceType.XGBoost;
+  }
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/util/RestClient.java b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/util/RestClient.java
deleted file mode 100644
index 1ff9427c..00000000
--- a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/util/RestClient.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.k8s.agent.util;
-
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.submarine.server.api.common.CustomResourceType;
-import org.apache.submarine.server.rest.RestConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RestClient {
-  private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
-  
-  private Client client = ClientBuilder.newClient();
-  private final String API_SERVER_URL;
-  public RestClient(String host, Integer port) {
-    LOG.info("SERVER_HOST:" + host);
-    LOG.info("SERVER_PORT:" + port);
-    API_SERVER_URL = String.format("http://%s:%d/", host, port);
-  }
-  
-  public void callStatusUpdate(CustomResourceType type, String resourceId, Object updateObject) {
-
-    String uri = String.format("api/%s/%s/%s/%s", RestConstants.V1,
-          RestConstants.INTERNAL, type.toString(), resourceId);
-    LOG.info("Targeting uri:" + uri);
-            
-    client.target(API_SERVER_URL)
-      .path(uri)      
-      .request(MediaType.APPLICATION_JSON)
-      .post(Entity.entity(updateObject, MediaType.APPLICATION_JSON), String.class);        
-  }
-
-}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/resources/log4j.properties b/submarine-server/server-submitter/submarine-k8s-agent/src/main/resources/log4j.properties
index 55e02b6d..5a7d55b4 100644
--- a/submarine-server/server-submitter/submarine-k8s-agent/src/main/resources/log4j.properties
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/resources/log4j.properties
@@ -15,3 +15,7 @@ log4j.appender.stdout = org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.Target = System.out
 log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
+log4j.appender.stdout.encoding=UTF-8
+
+# mybatis sql debug
+log4j.logger.org.apache.submarine.server.database=DEBUG
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/resources/mybatis-config.xml b/submarine-server/server-submitter/submarine-k8s-agent/src/main/resources/mybatis-config.xml
new file mode 100755
index 00000000..a11c0c88
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/main/resources/mybatis-config.xml
@@ -0,0 +1,48 @@
+<?xml version='1.0' encoding='UTF-8' ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<!DOCTYPE configuration PUBLIC '-//mybatis.org//DTD Config 3.0//EN'
+    'http://mybatis.org/dtd/mybatis-3-config.dtd'>
+<configuration>
+  <settings>
+    <setting name="cacheEnabled" value="true"/>
+    <setting name="lazyLoadingEnabled" value="false"/>
+    <setting name="aggressiveLazyLoading" value="true"/>
+    <setting name="logImpl" value="SLF4J"/>
+  </settings>
+
+  <environments default="development">
+    <environment id="development">
+      <transactionManager type="JDBC"/>
+      <dataSource type="POOLED">
+        <property name="driver" value="${jdbc.driverClassName}"/>
+        <property name="url" value="${jdbc.url}"/>
+        <property name="username" value="${jdbc.username}"/>
+        <property name="password" value="${jdbc.password}"/>
+        <property name="poolPingQuery" value="SELECT NOW()"/>
+        <property name="poolPingEnabled" value="true"/>
+      </dataSource>
+    </environment>
+  </environments>
+
+  <mappers>
+    <mapper resource='org/apache/submarine/database/mappers/ExperimentMapper.xml'/>
+    <mapper resource='org/apache/submarine/database/mappers/NotebookMapper.xml'/>
+  </mappers>
+</configuration>
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/test/java/org/apache/submarine/server/k8s/agent/SubmitSubmarineAgentTest.java b/submarine-server/server-submitter/submarine-k8s-agent/src/test/java/org/apache/submarine/server/k8s/agent/SubmitSubmarineAgentTest.java
new file mode 100644
index 00000000..c03a0642
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/test/java/org/apache/submarine/server/k8s/agent/SubmitSubmarineAgentTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
+import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
+import io.fabric8.kubernetes.internal.KubernetesDeserializer;
+import io.javaoperatorsdk.operator.Operator;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.server.database.experiment.entity.ExperimentEntity;
+import org.apache.submarine.server.database.experiment.mappers.ExperimentMapper;
+import org.apache.submarine.server.database.notebook.entity.NotebookEntity;
+import org.apache.submarine.server.database.notebook.mappers.NotebookMapper;
+import org.apache.submarine.server.database.utils.MyBatisUtil;
+import org.apache.submarine.server.k8s.agent.model.notebook.NotebookResource;
+import org.apache.submarine.server.k8s.agent.model.notebook.status.NotebookCondition;
+import org.apache.submarine.server.k8s.agent.model.notebook.status.NotebookStatus;
+import org.apache.submarine.server.k8s.agent.model.training.resource.PyTorchJob;
+import org.apache.submarine.server.k8s.agent.model.training.resource.TFJob;
+import org.apache.submarine.server.k8s.agent.model.training.resource.XGBoostJob;
+import org.apache.submarine.server.k8s.agent.model.training.status.JobCondition;
+import org.apache.submarine.server.k8s.agent.model.training.status.JobStatus;
+import org.apache.submarine.server.k8s.agent.model.training.status.ReplicaStatus;
+import org.apache.submarine.server.k8s.agent.reconciler.NotebookReconciler;
+import org.apache.submarine.server.k8s.agent.reconciler.PyTorchJobReconciler;
+import org.apache.submarine.server.k8s.agent.reconciler.TFJobReconciler;
+import org.apache.submarine.server.k8s.agent.reconciler.XGBoostJobReconciler;
+import org.apache.submarine.server.k8s.utils.OwnerReferenceConfig;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class SubmitSubmarineAgentTest {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SubmitSubmarineAgentTest.class);
+
+  @ClassRule
+  public static KubernetesServer server = new KubernetesServer(true, true);
+
+  private static KubernetesClient client;
+
+  private static Operator operator;
+
+  private static final SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
+
+  private static final String H2_JDBC_URL = "jdbc:h2:mem:submarine-test;MODE=MYSQL;DB_CLOSE_DELAY=-1";
+  private static final String H2_JDBC_DRIVERCLASS = "org.h2.Driver";
+  private static final String H2_JDBC_USERNAME = "root";
+  private static final String H2_JDBC_PASSWORD = "";
+
+  @BeforeClass
+  public static void beforeInit() {
+    // setup h2 database
+    conf.setJdbcUrl(H2_JDBC_URL);
+    conf.setJdbcDriverClassName(H2_JDBC_DRIVERCLASS);
+    conf.setJdbcUserName(H2_JDBC_USERNAME);
+    conf.setJdbcPassword(H2_JDBC_PASSWORD);
+    try (Connection conn = DriverManager.getConnection(H2_JDBC_URL,
+            H2_JDBC_USERNAME, H2_JDBC_PASSWORD);
+         Statement stmt = conn.createStatement()) {
+      stmt.execute("RUNSCRIPT FROM 'classpath:/db/agent-init.sql'");
+    } catch (SQLException e) {
+      e.printStackTrace();
+    }
+
+    // set client and operator
+    client = server.getClient();
+    operator = new Operator(client);
+
+    // create notbook resource
+    KubernetesDeserializer.registerCustomKind("apiextensions.k8s.io/v1beta1", "Notebook", NotebookResource.class);
+    CustomResourceDefinition notebookCrd = client
+            .apiextensions().v1()
+            .customResourceDefinitions()
+            .load(SubmitSubmarineAgentTest.class.getResourceAsStream("/custom-resources/notebook.yml"))
+            .get();
+    LOGGER.info("Create Notebook CRD ...");
+    client.apiextensions().v1().customResourceDefinitions().create(notebookCrd);
+
+    // create tf resource
+    KubernetesDeserializer.registerCustomKind("apiextensions.k8s.io/v1", "TFJob", TFJob.class);
+    CustomResourceDefinition tfCrd = client
+            .apiextensions().v1()
+            .customResourceDefinitions()
+            .load(SubmitSubmarineAgentTest.class.getResourceAsStream("/custom-resources/tfjobs.yaml"))
+            .get();
+    LOGGER.info("Create TF CRD ...");
+    client.apiextensions().v1().customResourceDefinitions().create(tfCrd);
+
+    // create pytorch resource
+    KubernetesDeserializer.registerCustomKind("apiextensions.k8s.io/v1", "PyTorchJob", PyTorchJob.class);
+    CustomResourceDefinition ptCrd = client
+            .apiextensions().v1()
+            .customResourceDefinitions()
+            .load(SubmitSubmarineAgentTest.class.getResourceAsStream("/custom-resources/pytorchjobs.yaml"))
+            .get();
+    LOGGER.info("Create PyTorch CRD ...");
+    client.apiextensions().v1().customResourceDefinitions().create(ptCrd);
+
+    // create xgboost resource
+    KubernetesDeserializer.registerCustomKind("apiextensions.k8s.io/v1", "XGBoostJob", XGBoostJob.class);
+    CustomResourceDefinition xgbCrd = client
+            .apiextensions().v1()
+            .customResourceDefinitions()
+            .load(SubmitSubmarineAgentTest.class.getResourceAsStream("/custom-resources/xgboostjobs.yaml"))
+            .get();
+    LOGGER.info("Create XGBoost CRD ...");
+    client.apiextensions().v1().customResourceDefinitions().create(xgbCrd);
+
+    // add reconcilers to listen custom resources
+    operator.register(new NotebookReconciler());
+    operator.register(new TFJobReconciler());
+    operator.register(new PyTorchJobReconciler());
+    operator.register(new XGBoostJobReconciler());
+
+    // start operator
+    operator.start();
+  }
+
+  @Test
+  public void testTfJobAgent() throws InterruptedException {
+    // add notebook
+    JobStatus status = new JobStatus();
+    JobCondition condition = new JobCondition();
+    condition.setMessage("TFJob test/experiment-1659167632755-0001 is running.");
+    condition.setReason("TFJobRunning");
+    condition.setStatus("True");
+    condition.setType("Running");
+    condition.setLastTransitionTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
+    condition.setLastUpdateTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
+    status.setConditions(List.of(condition));
+    status.setReplicaStatuses(Map.of("PS", new ReplicaStatus(1, 0, 0),
+            "Worker", new ReplicaStatus(1, 0, 0)));
+    ObjectMeta meta = new ObjectMetaBuilder()
+            .withName("experiment-1659167632755-0001")
+            .withNamespace(client.getNamespace())
+            .withLabels(Map.of("submarine-experiment-name", "test-tfjob"))
+            .addToOwnerReferences(new OwnerReferenceBuilder()
+                    .withUid(OwnerReferenceConfig.getSubmarineUid())
+                    .withApiVersion(OwnerReferenceConfig.DEFAULT_SUBMARINE_APIVERSION)
+                    .withKind(OwnerReferenceConfig.DEFAULT_SUBMARINE_KIND)
+                    .build())
+            .build();
+    TFJob resource = new TFJob();
+    resource.setMetadata(meta);
+    resource.setStatus(status);
+    client.resources(TFJob.class)
+            .inNamespace(client.getNamespace())
+            .withName("experiment-1659167632755-0001")
+            .createOrReplace(resource);
+
+    // left 5s to process
+    Thread.sleep(TimeUnit.SECONDS.toMillis(5));
+
+    // check status have changed
+    try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
+      ExperimentMapper mapper = sqlSession.getMapper(ExperimentMapper.class);
+      ExperimentEntity tfjob = mapper.select("experiment-1659167632755-0001");
+      Assert.assertEquals("Running", tfjob.getExperimentStatus());
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+      throw e;
+    }
+  }
+
+  @Test
+  public void testPytorchJobAgent() throws InterruptedException {
+    // add notebook
+    JobStatus status = new JobStatus();
+    JobCondition condition = new JobCondition();
+    condition.setMessage("PytorchJob test/experiment-1659167632755-0002 is running.");
+    condition.setReason("PytorchJobRunning");
+    condition.setStatus("True");
+    condition.setType("Running");
+    condition.setLastTransitionTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
+    condition.setLastUpdateTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
+    status.setConditions(List.of(condition));
+    ObjectMeta meta = new ObjectMetaBuilder()
+            .withName("experiment-1659167632755-0002")
+            .withNamespace(client.getNamespace())
+            .withLabels(Map.of("submarine-experiment-name", "test-pytorchjob"))
+            .addToOwnerReferences(new OwnerReferenceBuilder()
+                    .withUid(OwnerReferenceConfig.getSubmarineUid())
+                    .withApiVersion(OwnerReferenceConfig.DEFAULT_SUBMARINE_APIVERSION)
+                    .withKind(OwnerReferenceConfig.DEFAULT_SUBMARINE_KIND)
+                    .build())
+            .build();
+    PyTorchJob resource = new PyTorchJob();
+    resource.setMetadata(meta);
+    resource.setStatus(status);
+    client.resources(PyTorchJob.class)
+            .inNamespace(client.getNamespace())
+            .withName("experiment-1659167632755-0002")
+            .createOrReplace(resource);
+
+    // left 5s to process
+    Thread.sleep(TimeUnit.SECONDS.toMillis(5));
+
+    // check status have changed
+    try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
+      ExperimentMapper mapper = sqlSession.getMapper(ExperimentMapper.class);
+      ExperimentEntity tfjob = mapper.select("experiment-1659167632755-0002");
+      Assert.assertEquals("Running", tfjob.getExperimentStatus());
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+      throw e;
+    }
+  }
+
+  @Test
+  public void testXGBoostJobAgent() throws InterruptedException {
+    // add notebook
+    JobStatus status = new JobStatus();
+    JobCondition condition = new JobCondition();
+    condition.setMessage("XGBoostJob test/experiment-1659167632755-0003 is running.");
+    condition.setReason("XGBoostJobRunning");
+    condition.setStatus("True");
+    condition.setType("Running");
+    condition.setLastTransitionTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
+    condition.setLastUpdateTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
+    status.setConditions(List.of(condition));
+    ObjectMeta meta = new ObjectMetaBuilder()
+            .withName("experiment-1659167632755-0003")
+            .withNamespace(client.getNamespace())
+            .withLabels(Map.of("submarine-experiment-name", "test-xgboostjob"))
+            .addToOwnerReferences(new OwnerReferenceBuilder()
+                    .withUid(OwnerReferenceConfig.getSubmarineUid())
+                    .withApiVersion(OwnerReferenceConfig.DEFAULT_SUBMARINE_APIVERSION)
+                    .withKind(OwnerReferenceConfig.DEFAULT_SUBMARINE_KIND)
+                    .build())
+            .build();
+    XGBoostJob resource = new XGBoostJob();
+    resource.setMetadata(meta);
+    resource.setStatus(status);
+    client.resources(XGBoostJob.class)
+            .inNamespace(client.getNamespace())
+            .withName("experiment-1659167632755-0003")
+            .createOrReplace(resource);
+
+    // left 5s to process
+    Thread.sleep(TimeUnit.SECONDS.toMillis(5));
+
+    // check status have changed
+    try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
+      ExperimentMapper mapper = sqlSession.getMapper(ExperimentMapper.class);
+      ExperimentEntity tfjob = mapper.select("experiment-1659167632755-0003");
+      Assert.assertEquals("Running", tfjob.getExperimentStatus());
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+      throw e;
+    }
+  }
+
+
+  @Test
+  public void testNotebookAgent() throws InterruptedException {
+    // add notebook
+    NotebookStatus status = new NotebookStatus();
+    status.setReadyReplicas(1);
+    NotebookCondition condition = new NotebookCondition();
+    condition.setType("Running");
+    condition.setLastProbeTime(LocalDateTime.now().atZone(ZoneOffset.UTC).toString());
+    status.setConditions(List.of(condition));
+    ObjectMeta meta = new ObjectMetaBuilder()
+            .withName("notebook-1642402491519-0003-test-notebook")
+            .withNamespace(client.getNamespace())
+            .withLabels(Map.of("notebook-id", "notebook_1642402491519_0003",
+                    "notebook-owner-id", "e9ca23d68d884d4ebb19d07889727dae"))
+            .addToOwnerReferences(new OwnerReferenceBuilder()
+                    .withUid(OwnerReferenceConfig.getSubmarineUid())
+                    .withApiVersion(OwnerReferenceConfig.DEFAULT_SUBMARINE_APIVERSION)
+                    .withKind(OwnerReferenceConfig.DEFAULT_SUBMARINE_KIND)
+                    .build())
+            .build();
+    NotebookResource resource = new NotebookResource();
+    resource.setMetadata(meta);
+    resource.setStatus(status);
+    client.resource(resource).createOrReplace();
+
+    // left 5s to process
+    Thread.sleep(TimeUnit.SECONDS.toMillis(5));
+
+    // check status have changed
+    try (SqlSession sqlSession = MyBatisUtil.getSqlSession()) {
+      NotebookMapper mapper = sqlSession.getMapper(NotebookMapper.class);
+      NotebookEntity notebook = mapper.select("notebook_1642402491519_0003");
+      Assert.assertEquals("running", notebook.getNotebookStatus());
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+      throw e;
+    }
+  }
+
+  @AfterClass
+  public static void close() {
+    operator.stop();
+    client.close();
+  }
+
+}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/notebook.yml b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/notebook.yml
new file mode 100644
index 00000000..5528f46b
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/notebook.yml
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+  name: notebooks.kubeflow.org
+spec:
+  group: kubeflow.org
+  names:
+    kind: Notebook
+    plural: notebooks
+    singular: notebook
+  scope: Namespaced
+  subresources:
+    status: {}
+  versions:
+    - name: v1alpha1
+      served: true
+      storage: false
+    - name: v1beta1
+      served: true
+      storage: true
+    - name: v1
+      served: true
+      storage: false
+  validation:
+    openAPIV3Schema:
+      properties:
+        apiVersion:
+          type: string
+        kind:
+          type: string
+        metadata:
+          type: object
+        spec:
+          properties:
+            template:
+              properties:
+                spec:
+                  type: object
+              type: object
+          type: object
+        status:
+          properties:
+            conditions:
+              items:
+                properties:
+                  type:
+                    type: string
+                required:
+                  - type
+                type: object
+              type: array
+          required:
+            - conditions
+          type: object
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/pytorchjobs.yaml b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/pytorchjobs.yaml
new file mode 100644
index 00000000..0b99c44d
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/pytorchjobs.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+  name: pytorchjobs.kubeflow.org
+spec:
+  group: kubeflow.org
+  names:
+    kind: PyTorchJob
+    listKind: PyTorchJobList
+    plural: pytorchjobs
+    singular: pytorchjob
+  scope: Namespaced
+  subresources:
+    status: {}
+  versions:
+    - name: v1
+      served: true
+      storage: false
+  validation:
+    openAPIV3Schema:
+      properties:
+        apiVersion:
+          type: string
+        kind:
+          type: string
+        metadata:
+          type: object
+        spec:
+          type: object
+        status:
+          properties:
+            conditions:
+              items:
+                properties:
+                  type:
+                    type: string
+                required:
+                  - type
+                type: object
+              type: array
+          required:
+            - conditions
+          type: object
+
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/tfjobs.yaml b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/tfjobs.yaml
new file mode 100644
index 00000000..50f2c215
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/tfjobs.yaml
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+  name: tfjobs.kubeflow.org
+spec:
+  group: kubeflow.org
+  names:
+    kind: TFJob
+    plural: tfjobs
+    singular: tfjob
+  scope: Namespaced
+  subresources:
+    status: {}
+  versions:
+    - name: v1
+      served: true
+      storage: false
+  validation:
+    openAPIV3Schema:
+      properties:
+        apiVersion:
+          type: string
+        kind:
+          type: string
+        metadata:
+          type: object
+        spec:
+          type: object
+        status:
+          properties:
+            conditions:
+              items:
+                properties:
+                  type:
+                    type: string
+                required:
+                  - type
+                type: object
+              type: array
+          required:
+            - conditions
+          type: object
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/xgboostjobs.yaml b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/xgboostjobs.yaml
new file mode 100644
index 00000000..589f050e
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/custom-resources/xgboostjobs.yaml
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: apiextensions.k8s.io/v1beta1
+kind: CustomResourceDefinition
+metadata:
+  name: xgboostjobs.kubeflow.org
+spec:
+  group: kubeflow.org
+  names:
+    kind: XGBoostJob
+    listKind: XGBoostJobList
+    plural: xgboostjobs
+    singular: xgboostjob
+  scope: Namespaced
+  subresources:
+    status: {}
+  versions:
+    - name: v1
+      served: true
+      storage: false
+  validation:
+    openAPIV3Schema:
+      properties:
+        apiVersion:
+          type: string
+        kind:
+          type: string
+        metadata:
+          type: object
+        spec:
+          type: object
+        status:
+          properties:
+            conditions:
+              items:
+                properties:
+                  type:
+                    type: string
+                required:
+                  - type
+                type: object
+              type: array
+          required:
+            - conditions
+          type: object
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/db/agent-init.sql b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/db/agent-init.sql
new file mode 100644
index 00000000..484a7d01
--- /dev/null
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/db/agent-init.sql
@@ -0,0 +1,51 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--    http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE `notebook`
+(
+    `id`              varchar(64) PRIMARY KEY,
+    `notebook_spec`   text,
+    `create_by`       varchar(32),
+    `create_time`     datetime,
+    `update_by`       varchar(32),
+    `update_time`     datetime,
+    `notebook_status` varchar(20),
+    `notebook_url`    varchar(256),
+    `reason`          varchar(512),
+    `deleted_time`    datetime
+);
+
+insert into notebook (id, notebook_status)
+values ('notebook_1642402491519_0003', 'starting');
+
+CREATE TABLE `experiment`
+(
+    `id`                varchar(64) primary key,
+    `experiment_spec`   text,
+    `create_by`         varchar(32),
+    `create_time`       datetime,
+    `update_by`         varchar(32),
+    `update_time`       datetime,
+    `experiment_status` varchar(20),
+    `accepted_time`     datetime,
+    `running_time`      datetime,
+    `finished_time`     datetime,
+    `uid`               varchar(64)
+);
+
+insert into experiment (id, experiment_status)
+values ('experiment-1659167632755-0001', 'Starting');
+insert into experiment (id, experiment_status)
+values ('experiment-1659167632755-0002', 'Starting');
+insert into experiment (id, experiment_status)
+values ('experiment-1659167632755-0003', 'Starting');
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/src/main/resources/log4j.properties b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/log4j.properties
similarity index 79%
copy from submarine-server/server-submitter/submarine-k8s-agent/src/main/resources/log4j.properties
copy to submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/log4j.properties
index 55e02b6d..0162fd5c 100644
--- a/submarine-server/server-submitter/submarine-k8s-agent/src/main/resources/log4j.properties
+++ b/submarine-server/server-submitter/submarine-k8s-agent/src/test/resources/log4j.properties
@@ -15,3 +15,10 @@ log4j.appender.stdout = org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.Target = System.out
 log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
+log4j.appender.stdout.encoding=UTF-8
+
+# mybatis sql debug
+log4j.logger.org.apache.submarine.server.database=DEBUG
+# operator debug
+log4j.logger.io.javaoperatorsdk.operator=DEBUG
+log4j.logger.io.fabric8.kubernetes=DEBUG
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
index 301fc38d..ccb7be24 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
@@ -40,7 +40,6 @@ import org.apache.submarine.commons.utils.SubmarineConfiguration;
 import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
 import org.apache.submarine.server.k8s.utils.K8sUtils;
 import org.apache.submarine.server.api.Submitter;
-import org.apache.submarine.server.api.common.CustomResourceType;
 import org.apache.submarine.server.api.exception.InvalidSpecException;
 import org.apache.submarine.server.api.experiment.Experiment;
 import org.apache.submarine.server.api.experiment.ExperimentLog;
@@ -52,7 +51,6 @@ import org.apache.submarine.server.api.spec.ExperimentSpec;
 import org.apache.submarine.server.api.spec.NotebookSpec;
 import org.apache.submarine.server.submitter.k8s.client.K8sClient;
 import org.apache.submarine.server.submitter.k8s.client.K8sDefaultClient;
-import org.apache.submarine.server.submitter.k8s.model.AgentPod;
 import org.apache.submarine.server.submitter.k8s.model.K8sResource;
 import org.apache.submarine.server.submitter.k8s.model.common.Configmap;
 import org.apache.submarine.server.submitter.k8s.model.istio.IstioVirtualService;
@@ -196,12 +194,7 @@ public class K8sSubmitter implements Submitter {
       // MLJob K8s resource object
       MLJob mlJob = MLJobFactory.getMLJob(spec);
       mlJob.getMetadata().setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
-      // Agent pod K8s resource object
-      AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(),
-          mlJob.getResourceType(), spec.getMeta().getExperimentId());
-      // commit resources/CRD with transaction
-      List<Object> values = resourceTransaction(mlJob, agentPod);
-      return (Experiment) values.get(0);
+      return mlJob.create(k8sClient);
     } catch (InvalidSpecException e) {
       LOG.error(String.format("K8s submitter: parse %s object failed by %s",
               spec.getMeta().getFramework(), e.getMessage()), e);
@@ -240,13 +233,9 @@ public class K8sSubmitter implements Submitter {
     try {
       // MLJob K8s resource object
       MLJob mlJob = MLJobFactory.getMLJob(spec);
-      // Agent pod K8s resource object
-      AgentPod agentPod = new AgentPod(getServerNamespace(), spec.getMeta().getName(),
-              mlJob.getResourceType(), spec.getMeta().getExperimentId());
-      // Delete with transaction
-      return deleteResourcesTransaction(mlJob, agentPod);
+      return mlJob.delete(k8sClient);
     } catch (InvalidSpecException e) {
-      throw new SubmarineRuntimeException(200, e.getMessage());
+      throw new SubmarineRuntimeException(500, e.getMessage());
     }
   }
 
@@ -317,7 +306,7 @@ public class K8sSubmitter implements Submitter {
     V1Deployment deploy = k8sClient.getAppsV1Api()
             .readNamespacedDeploymentStatus(name, getServerNamespace(), "true");
     return deploy == null ? false : Optional.ofNullable(deploy.getStatus().getAvailableReplicas())
-        .map(ar -> ar > 0).orElse(false); // at least one replica is running
+            .map(ar -> ar > 0).orElse(false); // at least one replica is running
   }
 
   @Override
@@ -339,14 +328,12 @@ public class K8sSubmitter implements Submitter {
       overwrite = new Configmap(namespace, String.format("%s-%s", NotebookUtils.OVERWRITE_PREFIX, name),
               NotebookUtils.DEFAULT_OVERWRITE_FILE_NAME, OVERWRITE_JSON);
     }
-    // index-4: agent
-    AgentPod agentPod = new AgentPod(namespace, name, CustomResourceType.Notebook, notebookId);
-    // index-5: notebook VirtualService custom resource
+    // index-4: notebook VirtualService custom resource
     IstioVirtualService istioVirtualService = new IstioVirtualService(createMeta(namespace, name));
 
     // commit resources/CRD with transaction
     List<Object> values = resourceTransaction(workspace, userset, overwrite, notebookCR,
-            agentPod, istioVirtualService);
+            istioVirtualService);
     return (Notebook) values.get(3);
   }
 
@@ -386,12 +373,6 @@ public class K8sSubmitter implements Submitter {
       dependents.add(new Configmap(namespace, String.format("%s-%s", NotebookUtils.OVERWRITE_PREFIX, name)));
     }
 
-    // delete agent
-    AgentPod agentPod = new AgentPod(namespace, name, CustomResourceType.Notebook, notebookId);
-    LOG.info(String.format("Notebook:%s had been deleted, start to delete agent pod:%s",
-            spec.getMeta().getName(), agentPod.getMetadata().getName()));
-    dependents.add(agentPod);
-
     // delete resources
     return deleteResourcesTransaction(notebookCR, dependents.toArray(dependents.toArray(new K8sResource[0])));
   }
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
index d3ad1bbf..9f4937bd 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
@@ -37,6 +37,11 @@ import org.apache.submarine.server.submitter.k8s.util.YamlUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Starting from 0.8.0, we deploy agent as a separate service.
+ * Therefore, we no longer need to install when creating/deleting resources
+ */
+@Deprecated(since = "0.8.0")
 public class AgentPod extends V1Pod implements K8sResource<AgentPod> {
 
   private static final Logger LOG = LoggerFactory.getLogger(AgentPod.class);
diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/OwnerReferenceUtils.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/OwnerReferenceUtils.java
index 3951ac31..78fac31c 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/OwnerReferenceUtils.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/OwnerReferenceUtils.java
@@ -22,21 +22,18 @@ package org.apache.submarine.server.submitter.k8s.util;
 import java.util.ArrayList;
 
 import io.kubernetes.client.openapi.models.V1OwnerReference;
+import org.apache.submarine.server.k8s.utils.OwnerReferenceConfig;
 
 public class OwnerReferenceUtils {
-  private static final String SUBMARINE_APIVERSION = "SUBMARINE_APIVERSION";
-  private static final String SUBMARINE_KIND = "SUBMARINE_KIND";
-  private static final String SUBMARINE_NAME = "SUBMARINE_NAME";
-  private static final String SUBMARINE_UID = "SUBMARINE_UID";
-
+  
   public static ArrayList<V1OwnerReference> getOwnerReference() {
     ArrayList<V1OwnerReference> ownerReferences = new ArrayList<>();
     V1OwnerReference owner = new V1OwnerReference();
-    if (System.getenv(SUBMARINE_UID) != null) {
-      String apiVersion = System.getenv(SUBMARINE_APIVERSION);
-      String kind = System.getenv(SUBMARINE_KIND);
-      String name = System.getenv(SUBMARINE_NAME);
-      String uid = System.getenv(SUBMARINE_UID);
+    String uid = OwnerReferenceConfig.getSubmarineUid();
+    if (uid != null) {
+      String apiVersion = OwnerReferenceConfig.getSubmarineApiversion();
+      String kind = OwnerReferenceConfig.getSubmarineKind();
+      String name = OwnerReferenceConfig.getSubmarineName();
       Boolean blockOwnerDeletion = true;
       Boolean controller = true;
       owner.setApiVersion(apiVersion);
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterNotebookApiTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterNotebookApiTest.java
index 5a08b534..663190c5 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterNotebookApiTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SubmitterNotebookApiTest.java
@@ -23,7 +23,6 @@ import com.github.tomakehurst.wiremock.client.MappingBuilder;
 import com.github.tomakehurst.wiremock.junit.WireMockRule;
 import com.github.tomakehurst.wiremock.matching.EqualToPattern;
 import org.apache.submarine.commons.utils.SubmarineConfiguration;
-import org.apache.submarine.server.api.common.CustomResourceType;
 import org.apache.submarine.server.api.notebook.Notebook;
 import org.apache.submarine.server.api.spec.EnvironmentSpec;
 import org.apache.submarine.server.api.spec.NotebookMeta;
@@ -32,7 +31,6 @@ import org.apache.submarine.server.api.spec.NotebookSpec;
 import org.apache.submarine.server.submitter.k8s.client.K8sClient;
 import org.apache.submarine.server.submitter.k8s.client.K8sMockClient;
 import org.apache.submarine.server.submitter.k8s.client.MockClientUtil;
-import org.apache.submarine.server.submitter.k8s.model.AgentPod;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -100,14 +98,6 @@ public class SubmitterNotebookApiTest {
             aResponse()
                 .withStatus(200)
                 .withBody(getResourceFileContent("client/notebook/notebook-read-api.json")));
-    //  save pod agent url
-    String agentName = AgentPod.getNormalizePodName(CustomResourceType.Notebook, notebookId);
-    MappingBuilder agentPost = post(urlPathEqualTo("/api/v1/namespaces/default/pods"))
-        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
-        .willReturn(
-            aResponse()
-                .withStatus(200)
-                .withBody("{\"metadata\":{\"name\":\"" + agentName + "\",\"namespace\":\"default\"}}"));
     //  save istio url
     MappingBuilder istioPost = post(urlPathEqualTo(
         "/apis/networking.istio.io/v1beta1/namespaces/default/virtualservices"))
@@ -154,17 +144,10 @@ public class SubmitterNotebookApiTest {
             aResponse()
                 .withStatus(200)
                 .withBody(MockClientUtil.getMockSuccessStatus(configmapName)));
-    //  delete agent pod url
-    MappingBuilder podDelete = delete(urlPathEqualTo(MockClientUtil.getPodUrl(namespace, agentName)))
-        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
-        .willReturn(
-            aResponse()
-                .withStatus(200)
-                .withBody(MockClientUtil.getMockSuccessStatus(agentName)));
     K8sClient k8sClient = new K8sMockClient(
         notebookGet, // get endpoint
-        pvcPost, configmapPost, notebookPost, agentPost, istioPost, // save endpoint
-        notebookDelete, istioDelete, pvcDelete, userPvcDelete, configmapDelete, podDelete // delete endpoint
+        pvcPost, configmapPost, notebookPost, istioPost, // save endpoint
+        notebookDelete, istioDelete, pvcDelete, userPvcDelete, configmapDelete // delete endpoint
     );
 
     // init notebook spec
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterPyTorchApiTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterPyTorchApiTest.java
index d7883390..49af2b09 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterPyTorchApiTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterPyTorchApiTest.java
@@ -22,7 +22,6 @@ package org.apache.submarine.server.submitter.k8s.mljob;
 import com.github.tomakehurst.wiremock.client.MappingBuilder;
 import com.github.tomakehurst.wiremock.junit.WireMockRule;
 import com.github.tomakehurst.wiremock.matching.EqualToPattern;
-import org.apache.submarine.server.api.common.CustomResourceType;
 import org.apache.submarine.server.api.experiment.Experiment;
 import org.apache.submarine.server.api.spec.ExperimentSpec;
 import org.apache.submarine.server.submitter.k8s.K8sSubmitter;
@@ -30,7 +29,6 @@ import org.apache.submarine.server.submitter.k8s.SpecBuilder;
 import org.apache.submarine.server.submitter.k8s.client.K8sClient;
 import org.apache.submarine.server.submitter.k8s.client.K8sMockClient;
 import org.apache.submarine.server.submitter.k8s.client.MockClientUtil;
-import org.apache.submarine.server.submitter.k8s.model.AgentPod;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -74,14 +72,6 @@ public class SubmitterPyTorchApiTest {
             aResponse()
               .withStatus(200)
               .withBody(getResourceFileContent("client/experiment/pytorch-read-api.json")));
-    // save pod agent url
-    String agentName = AgentPod.getNormalizePodName(CustomResourceType.PyTorchJob, experimentId);
-    MappingBuilder agentPost = post(urlPathEqualTo("/api/v1/namespaces/default/pods"))
-        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
-        .willReturn(
-            aResponse()
-                .withStatus(200)
-                .withBody("{\"metadata\":{\"name\":\"" + agentName + "\"," + "\"namespace\":\"default\"}}"));
 
     // get pytorch url
     MappingBuilder pytorchGet = get(urlPathEqualTo(
@@ -100,15 +90,8 @@ public class SubmitterPyTorchApiTest {
             aResponse()
                 .withStatus(200)
                 .withBody(getResourceFileContent("client/experiment/pytorch-delete-api.json")));
-    //  delete agent pod url
-    MappingBuilder agentDelete = delete(urlPathEqualTo(MockClientUtil.getPodUrl(namespace, agentName)))
-        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
-        .willReturn(
-            aResponse()
-                .withStatus(200)
-                .withBody(MockClientUtil.getMockSuccessStatus(agentName)));
 
-    K8sClient k8sClient = new K8sMockClient(pytorchPost, agentPost, pytorchGet, pytorchDelete, agentDelete);
+    K8sClient k8sClient = new K8sMockClient(pytorchPost, pytorchGet, pytorchDelete);
     try {
       submitter = new K8sSubmitter(k8sClient);
       submitter.initialize(null);
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterTensorflowApiTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterTensorflowApiTest.java
index 6a5682c9..5d1e912e 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterTensorflowApiTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterTensorflowApiTest.java
@@ -22,7 +22,6 @@ package org.apache.submarine.server.submitter.k8s.mljob;
 import com.github.tomakehurst.wiremock.client.MappingBuilder;
 import com.github.tomakehurst.wiremock.junit.WireMockRule;
 import com.github.tomakehurst.wiremock.matching.EqualToPattern;
-import org.apache.submarine.server.api.common.CustomResourceType;
 import org.apache.submarine.server.api.experiment.Experiment;
 import org.apache.submarine.server.api.spec.ExperimentSpec;
 import org.apache.submarine.server.submitter.k8s.K8sSubmitter;
@@ -30,7 +29,6 @@ import org.apache.submarine.server.submitter.k8s.SpecBuilder;
 import org.apache.submarine.server.submitter.k8s.client.K8sClient;
 import org.apache.submarine.server.submitter.k8s.client.K8sMockClient;
 import org.apache.submarine.server.submitter.k8s.client.MockClientUtil;
-import org.apache.submarine.server.submitter.k8s.model.AgentPod;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -74,14 +72,6 @@ public class SubmitterTensorflowApiTest {
             aResponse()
                 .withStatus(200)
                 .withBody(getResourceFileContent("client/experiment/tf-read-api.json")));
-    // save pod agent url
-    String agentName = AgentPod.getNormalizePodName(CustomResourceType.TFJob, experimentId);
-    MappingBuilder agentPost = post(urlPathEqualTo("/api/v1/namespaces/default/pods"))
-        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
-        .willReturn(
-            aResponse()
-                .withStatus(200)
-                .withBody("{\"metadata\":{\"name\":\"" + agentName + "\"," + "\"namespace\":\"default\"}}"));
 
     // get tf url
     MappingBuilder tfGet = get(urlPathEqualTo(
@@ -100,15 +90,8 @@ public class SubmitterTensorflowApiTest {
             aResponse()
                 .withStatus(200)
                 .withBody(getResourceFileContent("client/experiment/tf-delete-api.json")));
-    //  delete agent pod url
-    MappingBuilder agentDelete = delete(urlPathEqualTo(MockClientUtil.getPodUrl(namespace, agentName)))
-        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
-        .willReturn(
-            aResponse()
-                .withStatus(200)
-                .withBody(MockClientUtil.getMockSuccessStatus(agentName)));
 
-    K8sClient k8sClient = new K8sMockClient(tfPost, agentPost, tfGet, tfDelete, agentDelete);
+    K8sClient k8sClient = new K8sMockClient(tfPost, tfGet, tfDelete);
     try {
       submitter = new K8sSubmitter(k8sClient);
       submitter.initialize(null);
diff --git a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterXGBoostApiTest.java b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterXGBoostApiTest.java
index d5bf9137..d1a72c62 100644
--- a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterXGBoostApiTest.java
+++ b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/mljob/SubmitterXGBoostApiTest.java
@@ -22,7 +22,6 @@ package org.apache.submarine.server.submitter.k8s.mljob;
 import com.github.tomakehurst.wiremock.client.MappingBuilder;
 import com.github.tomakehurst.wiremock.junit.WireMockRule;
 import com.github.tomakehurst.wiremock.matching.EqualToPattern;
-import org.apache.submarine.server.api.common.CustomResourceType;
 import org.apache.submarine.server.api.experiment.Experiment;
 import org.apache.submarine.server.api.spec.ExperimentSpec;
 import org.apache.submarine.server.submitter.k8s.K8sSubmitter;
@@ -30,7 +29,6 @@ import org.apache.submarine.server.submitter.k8s.SpecBuilder;
 import org.apache.submarine.server.submitter.k8s.client.K8sClient;
 import org.apache.submarine.server.submitter.k8s.client.K8sMockClient;
 import org.apache.submarine.server.submitter.k8s.client.MockClientUtil;
-import org.apache.submarine.server.submitter.k8s.model.AgentPod;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -74,14 +72,6 @@ public class SubmitterXGBoostApiTest {
             aResponse()
                 .withStatus(200)
                 .withBody(getResourceFileContent("client/experiment/xgboost-read-api.json")));
-    // save pod agent url
-    String agentName = AgentPod.getNormalizePodName(CustomResourceType.XGBoost, experimentId);
-    MappingBuilder agentPost = post(urlPathEqualTo("/api/v1/namespaces/default/pods"))
-        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
-        .willReturn(
-            aResponse()
-                .withStatus(200)
-                .withBody("{\"metadata\":{\"name\":\"" + agentName + "\"," + "\"namespace\":\"default\"}}"));
 
     // get xgboost url
     MappingBuilder xgboostGet = get(urlPathEqualTo(
@@ -100,16 +90,8 @@ public class SubmitterXGBoostApiTest {
             aResponse()
                 .withStatus(200)
                 .withBody(getResourceFileContent("client/experiment/xgboost-delete-api.json")));
-    //  delete agent pod url
-    MappingBuilder agentDelete = delete(urlPathEqualTo(MockClientUtil.getPodUrl(namespace, agentName)))
-        .withHeader("Content-Type", new EqualToPattern("application/json; charset=UTF-8"))
-        .willReturn(
-            aResponse()
-                .withStatus(200)
-                .withBody(MockClientUtil.getMockSuccessStatus(agentName)));
 
-    K8sClient k8sClient = new K8sMockClient(xgboostPost, agentPost,
-        xgboostGet, xgboostDelete, agentDelete);
+    K8sClient k8sClient = new K8sMockClient(xgboostPost, xgboostGet, xgboostDelete);
     try {
       submitter = new K8sSubmitter(k8sClient);
       submitter.initialize(null);


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org