You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/04/24 08:59:55 UTC

[shardingsphere-on-cloud] branch main updated: add reconcile logic and verfity Spec

This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 0632954  add reconcile logic and verfity Spec
     new ef582e2  Merge pull request #316 from moomman/main
0632954 is described below

commit 063295453bcc3f3c629f5e02c598bdcfba5a87ab
Author: moonman <ag...@163.com>
AuthorDate: Fri Apr 21 15:25:54 2023 +0800

    add reconcile logic and verfity Spec
---
 .../api/v1alpha1/shardingsphere_chaos_types.go     |  20 ++
 .../api/v1alpha1/zz_generated.deepcopy.go          |  57 ++-
 shardingsphere-operator/build/tools/Dockerfile     |   2 +
 .../cmd/shardingsphere-operator/manager/manager.go |   1 -
 .../cmd/shardingsphere-operator/manager/option.go  |   7 +
 .../controllers/shardingsphere_chaos_controller.go | 391 +++++++++++++++++----
 .../pkg/reconcile/shardingspherechaos/configmap.go |   9 +-
 .../pkg/reconcile/shardingspherechaos/job.go       |  47 ++-
 .../shardingspherechaos_suite_test.go              |   7 +
 9 files changed, 460 insertions(+), 81 deletions(-)

diff --git a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
index 32c2126..50ac996 100644
--- a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
+++ b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
@@ -44,6 +44,11 @@ type ShardingSphereChaos struct {
 type ShardingSphereChaosSpec struct {
 	InjectJob  JobSpec `json:"injectJob,omitempty"`
 	EmbedChaos `json:",inline"`
+	Expect     Expect `json:"expect,omitempty"`
+}
+
+type Expect struct {
+	Verify string `json:"verify,omitempty"`
 }
 
 // JobSpec Specifies the config of job to create
@@ -54,6 +59,8 @@ type JobSpec struct {
 	Pressure string `json:"pressure,omitempty"`
 	// +optional
 	Position string `json:"position,omitempty"`
+	// +optional
+	Verify string `json:"verify,omitempty"`
 }
 
 type EmbedChaos struct {
@@ -78,6 +85,18 @@ const (
 type ShardingSphereChaosStatus struct {
 	ChaosCondition ChaosCondition `json:"chaosCondition"`
 	Phase          Phase          `json:"phase"`
+	Result         []Result       `json:"result"`
+}
+
+// Result represents the result of the ShardingSphereChaos
+type Result struct {
+	Success bool   `json:"success"`
+	Detail  Detail `json:"details"`
+}
+
+type Detail struct {
+	Time metav1.Time `json:"time"`
+	Msg  string      `json:"message"`
 }
 
 type Phase string
@@ -85,6 +104,7 @@ type Phase string
 var (
 	PhaseBeforeExperiment Phase = "BeforeReq"
 	PhaseAfterExperiment  Phase = "AfterReq"
+	PhaseCreatingChaos    Phase = "Creating"
 	PhaseInChaos          Phase = "Injected"
 	PhaseRecoveredChaos   Phase = "Recovered"
 )
diff --git a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
index 9f4063b..5f9622f 100644
--- a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
+++ b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
@@ -491,6 +491,22 @@ func (in *DelayActionParams) DeepCopy() *DelayActionParams {
 	return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Detail) DeepCopyInto(out *Detail) {
+	*out = *in
+	in.Time.DeepCopyInto(&out.Time)
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Detail.
+func (in *Detail) DeepCopy() *Detail {
+	if in == nil {
+		return nil
+	}
+	out := new(Detail)
+	in.DeepCopyInto(out)
+	return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *DuplicateActionParams) DeepCopyInto(out *DuplicateActionParams) {
 	*out = *in
@@ -531,6 +547,21 @@ func (in *EmbedChaos) DeepCopy() *EmbedChaos {
 	return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Expect) DeepCopyInto(out *Expect) {
+	*out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Expect.
+func (in *Expect) DeepCopy() *Expect {
+	if in == nil {
+		return nil
+	}
+	out := new(Expect)
+	in.DeepCopyInto(out)
+	return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *JobSpec) DeepCopyInto(out *JobSpec) {
 	*out = *in
@@ -1158,6 +1189,22 @@ func (in *RepositoryConfig) DeepCopy() *RepositoryConfig {
 	return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Result) DeepCopyInto(out *Result) {
+	*out = *in
+	in.Detail.DeepCopyInto(&out.Detail)
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Result.
+func (in *Result) DeepCopy() *Result {
+	if in == nil {
+		return nil
+	}
+	out := new(Result)
+	in.DeepCopyInto(out)
+	return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *ServerConfig) DeepCopyInto(out *ServerConfig) {
 	*out = *in
@@ -1225,7 +1272,7 @@ func (in *ShardingSphereChaos) DeepCopyInto(out *ShardingSphereChaos) {
 	out.TypeMeta = in.TypeMeta
 	in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
 	in.Spec.DeepCopyInto(&out.Spec)
-	out.Status = in.Status
+	in.Status.DeepCopyInto(&out.Status)
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardingSphereChaos.
@@ -1283,6 +1330,7 @@ func (in *ShardingSphereChaosSpec) DeepCopyInto(out *ShardingSphereChaosSpec) {
 	*out = *in
 	out.InjectJob = in.InjectJob
 	in.EmbedChaos.DeepCopyInto(&out.EmbedChaos)
+	out.Expect = in.Expect
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardingSphereChaosSpec.
@@ -1298,6 +1346,13 @@ func (in *ShardingSphereChaosSpec) DeepCopy() *ShardingSphereChaosSpec {
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *ShardingSphereChaosStatus) DeepCopyInto(out *ShardingSphereChaosStatus) {
 	*out = *in
+	if in.Result != nil {
+		in, out := &in.Result, &out.Result
+		*out = make([]Result, len(*in))
+		for i := range *in {
+			(*in)[i].DeepCopyInto(&(*out)[i])
+		}
+	}
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardingSphereChaosStatus.
diff --git a/shardingsphere-operator/build/tools/Dockerfile b/shardingsphere-operator/build/tools/Dockerfile
index d73511b..fc3ad0f 100644
--- a/shardingsphere-operator/build/tools/Dockerfile
+++ b/shardingsphere-operator/build/tools/Dockerfile
@@ -33,6 +33,8 @@ RUN set -eux; \
         apt-get install -y --no-install-recommends libncursesw6 libreadline8 libssh-4; \
         apt-get install -y --no-install-recommends openjdk-11-jre-headless; \
         echo "export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11-amd64" >> /etc/profile; \
+        echo "alias mysql=\"mysqlsh\"" >> /etc/profile; \
+        . /etc/profile; \
         \
         rm -rf /var/lib/apt/lists/*; \
         \
diff --git a/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go b/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go
index 06166a9..05681f0 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go
@@ -49,7 +49,6 @@ func SetupWithOptions(opts *Options) *Manager {
 		logger.Error(err, "unable to start manager")
 		os.Exit(1)
 	}
-
 	if err = (&controllers.ProxyReconciler{
 		Client: mgr.GetClient(),
 		Scheme: mgr.GetScheme(),
diff --git a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index 10ff9f5..e1e1fe5 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -21,6 +21,8 @@ import (
 	"flag"
 	"strings"
 
+	clientset "k8s.io/client-go/kubernetes"
+
 	"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/job"
 
 	"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaos"
@@ -139,6 +141,10 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
 		return nil
 	},
 	"ShardingSphereChaos": func(mgr manager.Manager) error {
+		clientset, err := clientset.NewForConfig(mgr.GetConfig())
+		if err != nil {
+			return err
+		}
 		if err := (&controllers.ShardingSphereChaosReconciler{
 			Client:    mgr.GetClient(),
 			Scheme:    mgr.GetScheme(),
@@ -147,6 +153,7 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
 			Job:       job.NewJob(mgr.GetClient()),
 			ConfigMap: configmap.NewConfigMap(mgr.GetClient()),
 			Events:    mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
+			ClientSet: clientset,
 		}).SetupWithManager(mgr); err != nil {
 			logger.Error(err, "unable to create controller", "controller", "ShardingSphereChaos")
 			return err
diff --git a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
index 95dfa97..b84b0be 100644
--- a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
+++ b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
@@ -19,9 +19,18 @@ package controllers
 
 import (
 	"context"
+	"errors"
 	"fmt"
+	"strings"
 	"time"
 
+	"k8s.io/apimachinery/pkg/util/wait"
+	"k8s.io/client-go/util/retry"
+
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+	clientset "k8s.io/client-go/kubernetes"
+
 	"k8s.io/client-go/tools/record"
 
 	"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap"
@@ -44,6 +53,20 @@ import (
 const (
 	ShardingSphereChaosControllerName = "shardingsphere-chaos-controller"
 	ssChaosDefaultEnqueueTime         = 10 * time.Second
+	VerifyJobCheck                    = "Verify"
+)
+
+var (
+	ErrNoPod = errors.New("no pod in list")
+)
+
+type JobCondition string
+
+var (
+	CompleteJob JobCondition = "complete"
+	FailureJob  JobCondition = "failure"
+	SuspendJob  JobCondition = "suspend"
+	ActiveJob   JobCondition = "active"
 )
 
 // ShardingSphereChaosReconciler is a controller for the ShardingSphereChaos
@@ -55,6 +78,7 @@ type ShardingSphereChaosReconciler struct { //
 	Job       job.Job
 	ConfigMap configmap.ConfigMap
 	Events    record.EventRecorder
+	ClientSet *clientset.Clientset
 }
 
 // Reconcile handles main function of this controller
@@ -120,72 +144,92 @@ func (r *ShardingSphereChaosReconciler) getRuntimeSSChaos(ctx context.Context, n
 
 func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx context.Context, ssChao *sschaosv1alpha1.ShardingSphereChaos) error {
 	logger := r.Log.WithValues("reconcile chaos", ssChao.Name)
+
 	if ssChao.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChao.Status.Phase == "" {
-		fmt.Println("reach here")
 		return nil
 	}
-	fmt.Println("reach here  after")
 	namespaceName := types.NamespacedName{Namespace: ssChao.Namespace, Name: ssChao.Name}
+
 	if ssChao.Spec.EmbedChaos.PodChaos != nil {
-		chao, isExist, err := r.getPodChaosByNamespacedName(ctx, namespaceName)
+		chao, err := r.getPodChaosByNamespacedName(ctx, namespaceName)
 		if err != nil {
 			logger.Error(err, "pod chaos err")
 			return err
 		}
-		if isExist {
+		if chao != nil {
 			return r.updatePodChaos(ctx, ssChao, chao)
 		}
 		return r.CreatePodChaos(ctx, ssChao)
 	} else if ssChao.Spec.EmbedChaos.NetworkChaos != nil {
-		chao, isExist, err := r.getNetworkChaosByNamespacedName(ctx, namespaceName)
+		chao, err := r.getNetworkChaosByNamespacedName(ctx, namespaceName)
 		if err != nil {
 			logger.Error(err, "network chao err")
 			return err
 		}
-		if isExist {
+		if chao != nil {
 			return r.updateNetWorkChaos(ctx, ssChao, chao)
 		}
 		return r.CreateNetworkChaos(ctx, ssChao)
 	}
+
 	return nil
 }
 
 func (r *ShardingSphereChaosReconciler) reconcileConfigMap(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
 	logger := r.Log.WithValues("reconcile configmap", ssChaos.Name)
 	namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, Name: ssChaos.Name}
-	rConfigmap, isExist, err := r.getConfigMapByNamespacedName(ctx, namespaceName)
+	rConfigmap, err := r.getConfigMapByNamespacedName(ctx, namespaceName)
 	if err != nil {
 		logger.Error(err, "get configmap error")
 		return err
 	}
 
-	if isExist {
+	if rConfigmap != nil {
 		return r.updateConfigMap(ctx, ssChaos, rConfigmap)
 	}
 
-	return r.CreateConfigMap(ctx, ssChaos)
+	err = r.CreateConfigMap(ctx, ssChaos)
+	if err != nil {
+		r.Events.Event(ssChaos, "Warning", "Created", fmt.Sprintf("configmap created fail %s", err))
+		return err
+	}
+
+	r.Events.Event(ssChaos, "Normal", "Created", "configmap created successfully")
+	return nil
 }
 
 func (r *ShardingSphereChaosReconciler) reconcileJob(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
 	logger := r.Log.WithValues("reconcile job", ssChaos.Name)
-	namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, Name: ssChaos.Name}
-	rJob, isExist, err := r.getJobByNamespacedName(ctx, namespaceName)
-	if err != nil {
-		logger.Error(err, "get job err")
-		return err
-	}
+
 	var nowInjectRequirement reconcile.InjectRequirement
-	if ssChaos.Status.Phase == "" || ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment {
+	switch {
+	case ssChaos.Status.Phase == "" || ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment:
 		nowInjectRequirement = reconcile.Experimental
-	}
-	if ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos || ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
+	case ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos:
 		nowInjectRequirement = reconcile.Pressure
+	case ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos:
+		nowInjectRequirement = reconcile.Verify
+	}
+
+	namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, Name: reconcile.SetJobNamespaceName(ssChaos.Name, nowInjectRequirement)}
+
+	rJob, err := r.getJobByNamespacedName(ctx, namespaceName)
+	if err != nil {
+		logger.Error(err, "get job err")
+		return err
 	}
-	if isExist {
+
+	if rJob != nil {
 		return r.updateJob(ctx, nowInjectRequirement, ssChaos, rJob)
 	}
 
-	return r.createJob(ctx, nowInjectRequirement, ssChaos)
+	err = r.createJob(ctx, nowInjectRequirement, ssChaos)
+	if err != nil {
+		return err
+	}
+
+	r.Events.Event(ssChaos, "Normal", "Created", fmt.Sprintf("%s job created successfully", string(nowInjectRequirement)))
+	return nil
 }
 
 func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, namespacedName types.NamespacedName) error {
@@ -193,17 +237,30 @@ func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, nam
 	if err != nil {
 		return err
 	}
-	if ssChaos.Status.Phase == "" {
-		ssChaos.Status.Phase = sschaosv1alpha1.PhaseBeforeExperiment
-	}
-	rJob := &batchV1.Job{}
-	if err := r.Get(ctx, namespacedName, rJob); err != nil {
+
+	setDefault(ssChaos)
+
+	jobName := getRequirement(ssChaos)
+	rJob, err := r.getJobByNamespacedName(ctx, types.NamespacedName{Namespace: ssChaos.Namespace, Name: reconcile.SetJobNamespaceName(ssChaos.Name, jobName)})
+	if err != nil || rJob == nil {
 		return err
 	}
 
 	if ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment && rJob.Status.Succeeded == 1 {
 		ssChaos.Status.Phase = sschaosv1alpha1.PhaseAfterExperiment
 	}
+	jobConditions := rJob.Status.Conditions
+	condition := getJobCondition(jobConditions)
+
+	if condition == FailureJob {
+		r.Events.Event(ssChaos, "Warning", "failed", fmt.Sprintf("job: %s", rJob.Name))
+	}
+	if ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
+		if err := r.updateRecoveredJob(ctx, ssChaos, rJob); err != nil {
+			r.Events.Event(ssChaos, "Warning", "getPodLog", err.Error())
+			return err
+		}
+	}
 
 	if err := r.updatePhaseStart(ctx, ssChaos); err != nil {
 		return err
@@ -213,10 +270,180 @@ func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, nam
 	if err != nil {
 		return err
 	}
-	rt.Status = ssChaos.Status
+	setRtStatus(rt, ssChaos)
 	return r.Status().Update(ctx, rt)
 }
 
+func getRequirement(ssChaos *sschaosv1alpha1.ShardingSphereChaos) reconcile.InjectRequirement {
+	var jobName reconcile.InjectRequirement
+	if ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment {
+		jobName = reconcile.Experimental
+	}
+	if ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos {
+		jobName = reconcile.Pressure
+	}
+	if ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
+		jobName = reconcile.Verify
+	}
+	return jobName
+}
+
+func getJobCondition(conditions []batchV1.JobCondition) JobCondition {
+	var ret = ActiveJob
+	for i := range conditions {
+		p := &conditions[i]
+		switch {
+		case p.Type == batchV1.JobComplete && p.Status == v1.ConditionTrue:
+			ret = CompleteJob
+		case p.Type == batchV1.JobFailed && p.Status == v1.ConditionTrue:
+			ret = FailureJob
+		case p.Type == batchV1.JobSuspended && p.Status == v1.ConditionTrue:
+			ret = SuspendJob
+		case p.Type == batchV1.JobFailureTarget:
+			ret = FailureJob
+		}
+	}
+	return ret
+}
+
+func setDefault(ssChaos *sschaosv1alpha1.ShardingSphereChaos) {
+	if ssChaos.Status.Phase == "" {
+		ssChaos.Status.Phase = sschaosv1alpha1.PhaseBeforeExperiment
+	}
+	if ssChaos.Status.Result == nil {
+		ssChaos.Status.Result = []sschaosv1alpha1.Result{}
+	}
+}
+
+func setRtStatus(rt *sschaosv1alpha1.ShardingSphereChaos, ssChaos *sschaosv1alpha1.ShardingSphereChaos) {
+	rt.Status.Result = []sschaosv1alpha1.Result{}
+	for i := range ssChaos.Status.Result {
+		r := &ssChaos.Status.Result[i]
+		rt.Status.Result = append(rt.Status.Result, sschaosv1alpha1.Result{
+			Success: r.Success,
+			Detail: sschaosv1alpha1.Detail{
+				Time: metav1.Time{Time: time.Now()},
+				Msg:  r.Detail.Msg,
+			},
+		})
+	}
+
+	rt.Status.Phase = ssChaos.Status.Phase
+	rt.Status.ChaosCondition = ssChaos.Status.ChaosCondition
+}
+
+func (r *ShardingSphereChaosReconciler) updateRecoveredJob(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos, rJob *batchV1.Job) error {
+	if isResultExist(rJob) {
+		return nil
+	}
+
+	for i := range ssChaos.Status.Result {
+		r := &ssChaos.Status.Result[i]
+		if strings.HasPrefix(r.Detail.Msg, VerifyJobCheck) {
+			return nil
+		}
+	}
+
+	logOpts := &v1.PodLogOptions{}
+	pod, err := r.getPodHaveLog(ctx, rJob)
+	if err != nil || pod == nil {
+		return err
+	}
+	podNamespacedName := types.NamespacedName{
+		Namespace: pod.Namespace,
+		Name:      pod.Name,
+	}
+	condition := getJobCondition(rJob.Status.Conditions)
+	result := &sschaosv1alpha1.Result{}
+
+	if condition == CompleteJob {
+		log, err := r.getPodLog(ctx, podNamespacedName, logOpts)
+		if err != nil {
+			return err
+		}
+		if ssChaos.Spec.Expect.Verify == "" || ssChaos.Spec.Expect.Verify == log {
+			result.Success = true
+			result.Detail = sschaosv1alpha1.Detail{
+				Time: metav1.Time{Time: time.Now()},
+				Msg:  fmt.Sprintf("%s: job succeeded", VerifyJobCheck),
+			}
+		} else {
+			result.Success = false
+			result.Detail = sschaosv1alpha1.Detail{
+				Time: metav1.Time{Time: time.Now()},
+				Msg:  fmt.Sprintf("%s: %s", VerifyJobCheck, log),
+			}
+		}
+	}
+
+	if condition == FailureJob {
+		log, err := r.getPodLog(ctx, podNamespacedName, logOpts)
+		if err != nil {
+			return err
+		}
+		result.Success = false
+		result.Detail = sschaosv1alpha1.Detail{
+			Time: metav1.Time{Time: time.Now()},
+			Msg:  fmt.Sprintf("%s: %s", VerifyJobCheck, log),
+		}
+	}
+
+	ssChaos.Status.Result = updateResult(ssChaos.Status.Result, *result, VerifyJobCheck)
+
+	return nil
+}
+
+func (r *ShardingSphereChaosReconciler) getPodHaveLog(ctx context.Context, rJob *batchV1.Job) (*v1.Pod, error) {
+	pods := &v1.PodList{}
+	if err := r.List(ctx, pods, client.MatchingLabels{"controller-uid": rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
+		return nil, err
+	}
+	if pods.Items == nil {
+		return nil, nil
+	}
+	var pod *v1.Pod
+	for i := range pods.Items {
+		pod = &pods.Items[i]
+		break
+	}
+	return pod, nil
+}
+
+func isResultExist(rJob *batchV1.Job) bool {
+	for _, cmd := range rJob.Spec.Template.Spec.Containers[0].Args {
+		if strings.Contains(cmd, string(reconcile.Verify)) {
+			return true
+		}
+	}
+	return false
+}
+
+func updateResult(results []sschaosv1alpha1.Result, r sschaosv1alpha1.Result, check string) []sschaosv1alpha1.Result {
+	for i := range results {
+		msg := results[i].Detail.Msg
+		if strings.HasPrefix(msg, check) && strings.HasPrefix(r.Detail.Msg, check) {
+			results[i] = r
+			return results
+		}
+	}
+	results = append(results, r)
+	return results
+}
+
+func (r *ShardingSphereChaosReconciler) getPodLog(ctx context.Context, namespacedName types.NamespacedName, options *v1.PodLogOptions) (string, error) {
+	req := r.ClientSet.CoreV1().Pods(namespacedName.Namespace).GetLogs(namespacedName.Name, options)
+	res := req.Do(ctx)
+	if res.Error() != nil {
+		return "", res.Error()
+	}
+	var ret []byte
+	ret, err := res.Raw()
+	if err != nil {
+		return "", err
+	}
+	return string(ret), nil
+}
+
 func (r *ShardingSphereChaosReconciler) updatePhaseStart(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
 	if ssChaos.Status.Phase != sschaosv1alpha1.PhaseBeforeExperiment {
 		if err := r.updateChaosCondition(ctx, ssChaos); err != nil {
@@ -257,50 +484,37 @@ func (r *ShardingSphereChaosReconciler) updateChaosCondition(ctx context.Context
 	return nil
 }
 
-func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (reconcile.NetworkChaos, bool, error) {
+func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (reconcile.NetworkChaos, error) {
 	nc, err := r.Chaos.GetNetworkChaosByNamespacedName(ctx, namespacedName)
 	if err != nil {
-		return nil, false, err
-	}
-	if nc == nil {
-		return nil, false, nil
+		return nil, err
 	}
-	return nc, true, nil
+	return nc, nil
 }
 
-func (r *ShardingSphereChaosReconciler) getPodChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (reconcile.PodChaos, bool, error) {
+func (r *ShardingSphereChaosReconciler) getPodChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (reconcile.PodChaos, error) {
 	pc, err := r.Chaos.GetPodChaosByNamespacedName(ctx, namespacedName)
 	if err != nil {
-		return nil, false, err
-	}
-	if pc == nil {
-		return nil, false, nil
+		return nil, err
 	}
-	return pc, true, nil
+	return pc, nil
 }
 
-func (r *ShardingSphereChaosReconciler) getConfigMapByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*v1.ConfigMap, bool, error) {
+func (r *ShardingSphereChaosReconciler) getConfigMapByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*v1.ConfigMap, error) {
 	config, err := r.ConfigMap.GetByNamespacedName(ctx, namespacedName)
 	if err != nil {
-		return nil, false, err
-	}
-	if config == nil {
-		return nil, false, nil
+		return nil, err
 	}
 
-	return config, true, nil
+	return config, nil
 }
 
-func (r *ShardingSphereChaosReconciler) getJobByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*batchV1.Job, bool, error) {
+func (r *ShardingSphereChaosReconciler) getJobByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*batchV1.Job, error) {
 	injectJob, err := r.Job.GetByNamespacedName(ctx, namespacedName)
 	if err != nil {
-		return nil, false, err
-	}
-	if injectJob == nil {
-		return nil, false, nil
+		return nil, err
 	}
-
-	return injectJob, true, nil
+	return injectJob, nil
 }
 
 func (r *ShardingSphereChaosReconciler) updateConfigMap(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos, cur *v1.ConfigMap) error {
@@ -325,39 +539,87 @@ func (r *ShardingSphereChaosReconciler) CreateConfigMap(ctx context.Context, cha
 }
 
 func (r *ShardingSphereChaosReconciler) updateJob(ctx context.Context, requirement reconcile.InjectRequirement, chao *sschaosv1alpha1.ShardingSphereChaos, cur *batchV1.Job) error {
-	exp, err := reconcile.UpdateJob(chao, requirement, cur)
+	isEqual, err := reconcile.IsJobChanged(chao, requirement, cur)
 	if err != nil {
 		return err
 	}
-	if exp != nil {
-		if err := r.Delete(ctx, cur); err != nil {
-			return err
-		}
-		if err := ctrl.SetControllerReference(chao, exp, r.Scheme); err != nil {
-			return err
-		}
-		if err := r.Create(ctx, exp); err != nil {
+	if !isEqual {
+		if err := r.Delete(ctx, cur); err != nil && !apierrors.IsNotFound(err) {
 			return err
 		}
+		r.Events.Event(chao, "Normal", "Updated", "job Updated")
 	}
 	return nil
 }
 
-// todo:
 func (r *ShardingSphereChaosReconciler) createJob(ctx context.Context, requirement reconcile.InjectRequirement, chao *sschaosv1alpha1.ShardingSphereChaos) error {
 	injectJob, err := reconcile.NewJob(chao, requirement)
+	if err != nil {
+		return err
+	}
 	if err := ctrl.SetControllerReference(chao, injectJob, r.Scheme); err != nil {
 		return err
 	}
+
+	err = r.Create(ctx, injectJob)
 	if err != nil {
+		return client.IgnoreAlreadyExists(err)
+	}
+
+	rJob := &batchV1.Job{}
+	backoff := wait.Backoff{
+		Steps:    6,
+		Duration: 500 * time.Millisecond,
+		Factor:   5.0,
+		Jitter:   0.1,
+	}
+
+	if err := retry.OnError(backoff, func(e error) bool {
+		return true
+	}, func() error {
+		return r.Get(ctx, types.NamespacedName{Namespace: chao.Namespace, Name: reconcile.SetJobNamespaceName(chao.Name, requirement)}, rJob)
+	}); err != nil {
 		return err
 	}
-	err = r.Create(ctx, injectJob)
-	if err == nil && apierrors.IsAlreadyExists(err) {
+
+	podList := &v1.PodList{}
+	if err := retry.OnError(backoff, func(e error) bool {
+		return e != nil
+	}, func() error {
+		if err := r.List(ctx, podList, client.MatchingLabels{"controller-uid": rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
+			return err
+		}
+		if len(podList.Items) == 0 {
+			return ErrNoPod
+		}
 		return nil
+	}); err != nil {
+		return err
 	}
 
-	return err
+	for i := range podList.Items {
+		rPod := &podList.Items[i]
+		if err := ctrl.SetControllerReference(rJob, rPod, r.Scheme); err != nil {
+			return err
+		}
+
+		exp := rPod.DeepCopy()
+		updateBackoff := wait.Backoff{
+			Steps:    5,
+			Duration: 30 * time.Millisecond,
+			Factor:   5.0,
+			Jitter:   0.1,
+		}
+		if err := retry.RetryOnConflict(updateBackoff, func() error {
+			if err := r.Update(ctx, exp); err != nil {
+				return err
+			}
+			return nil
+		}); err != nil {
+			return err
+		}
+	}
+	return nil
 }
 
 func (r *ShardingSphereChaosReconciler) updatePodChaos(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos, podChaos reconcile.PodChaos) error {
@@ -417,6 +679,7 @@ func (r *ShardingSphereChaosReconciler) SetupWithManager(mgr ctrl.Manager) error
 		For(&sschaosv1alpha1.ShardingSphereChaos{}).
 		Owns(&chaosv1alpha1.PodChaos{}).
 		Owns(&chaosv1alpha1.NetworkChaos{}).
+		Owns(&v1.ConfigMap{}).
 		Owns(&batchV1.Job{}).
 		Complete(r)
 }
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go
index f3054be..48d2f20 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go
@@ -29,6 +29,7 @@ import (
 const (
 	configExperimental = "experimental.sh"
 	configPressure     = "pressure.sh"
+	configVerify       = "verify.sh"
 )
 
 const (
@@ -40,7 +41,7 @@ func NewSSConfigMap(chaos *v1alpha1.ShardingSphereChaos) *v1.ConfigMap {
 
 	cmb.SetName(chaos.Name).SetNamespace(chaos.Namespace).SetLabels(chaos.Labels)
 
-	cmb.SetExperimental(chaos.Spec.InjectJob.Experimental).SetPressure(chaos.Spec.InjectJob.Pressure)
+	cmb.SetExperimental(chaos.Spec.InjectJob.Experimental).SetPressure(chaos.Spec.InjectJob.Pressure).SetVerify(chaos.Spec.InjectJob.Verify)
 
 	return cmb.Build()
 }
@@ -50,6 +51,7 @@ type SSConfigMapBuilder interface {
 	common.ConfigMapBuilder
 	SetExperimental(string) SSConfigMapBuilder
 	SetPressure(string) SSConfigMapBuilder
+	SetVerify(string) SSConfigMapBuilder
 }
 
 type configmapBuilder struct {
@@ -75,6 +77,11 @@ func (c *configmapBuilder) SetPressure(cmd string) SSConfigMapBuilder {
 	return c
 }
 
+func (c *configmapBuilder) SetVerify(cmd string) SSConfigMapBuilder {
+	c.configmap.Data[configVerify] = cmd
+	return c
+}
+
 // defaultConfigMap returns a ConfigMap filling with default expected values
 func defaultConfigMap() *v1.ConfigMap {
 	return &v1.ConfigMap{
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
index 9f0ac74..273a8a9 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
@@ -30,12 +30,16 @@ import (
 )
 
 const (
-	DefaultImageName     = "agoiyanzsa/tools-runtime:1.0"
+	DefaultImageName     = "agoiyanzsa/tools-runtime:2.0"
 	DefaultContainerName = "tools-runtime"
 	DefaultWorkPath      = "/app/start"
 	DefaultConfigName    = "cmd-conf"
 )
 
+var (
+	DefaultTTLSecondsAfterFinished int32 = 300
+)
+
 var DefaultFileMode int32 = 493
 
 const (
@@ -52,11 +56,16 @@ type InjectRequirement string
 var (
 	Experimental InjectRequirement = "experimental"
 	Pressure     InjectRequirement = "pressure"
+	Verify       InjectRequirement = "verify"
 )
 
+func SetJobNamespaceName(name string, requirement InjectRequirement) string {
+	return fmt.Sprintf("%s-%s", name, string(requirement))
+}
+
 func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement) (*v1.Job, error) {
 	jbd := NewJobBuilder()
-	jbd.SetNamespace(ssChaos.Namespace).SetLabels(ssChaos.Labels).SetName(ssChaos.Name)
+	jbd.SetNamespace(ssChaos.Namespace).SetLabels(ssChaos.Labels).SetName(SetJobNamespaceName(ssChaos.Name, requirement))
 
 	if v, ok := ssChaos.Annotations[completions]; ok {
 		value, err := MustInt32(v)
@@ -117,11 +126,10 @@ func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement
 
 	vm := &corev1.VolumeMount{Name: DefaultConfigName, MountPath: DefaultWorkPath}
 	cbd := common.NewContainerBuilder()
-	//todo: replace as DefaultImageName
 	cbd.SetImage(DefaultImageName)
 	cbd.SetName(DefaultContainerName)
 	cbd.SetVolumeMount(vm)
-	cbd.SetCommand([]string{"sh"})
+	cbd.SetCommand([]string{"sh", "-c"})
 	container := cbd.Build()
 	container.Args = NewCmds(requirement)
 	jbd.SetContainers(container)
@@ -131,12 +139,14 @@ func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement
 
 func NewCmds(requirement InjectRequirement) []string {
 	var cmds []string
-	cmds = append(cmds, "-c")
 	if requirement == Experimental {
 		cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, configExperimental))
 	}
 	if requirement == Pressure {
-		cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, configExperimental), fmt.Sprintf("%s/%s", DefaultWorkPath, configPressure))
+		cmds = append(cmds, fmt.Sprintf("%s/%s;%s/%s", DefaultWorkPath, configPressure, DefaultWorkPath, configExperimental))
+	}
+	if requirement == Verify {
+		cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, configVerify))
 	}
 	return cmds
 }
@@ -149,17 +159,17 @@ func MustInt32(s string) (int32, error) {
 	return int32(v), nil
 }
 
-func UpdateJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement, cur *v1.Job) (*v1.Job, error) {
+func IsJobChanged(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement, cur *v1.Job) (bool, error) {
 	now, err := NewJob(ssChaos, requirement)
 	if err != nil {
-		return nil, err
+		return false, err
 	}
 	isEqual := judgeJobEqual(cur, now)
 	if isEqual {
-		return nil, nil
+		return true, nil
 	}
 
-	return now, nil
+	return false, nil
 }
 
 func judgeJobEqual(now *v1.Job, exp *v1.Job) bool {
@@ -194,10 +204,15 @@ func judgeJobConfigEqual(now *v1.Job, exp *v1.Job) bool {
 	return true
 }
 func judgeTTLSecondsAfterFinished(cur *int32, exp *int32) bool {
-	if exp != nil && *cur != *exp {
-		return false
+	if cur == nil && exp == nil {
+		return true
 	}
-	return true
+	if cur != nil && exp != nil {
+		if *cur == *exp {
+			return true
+		}
+	}
+	return false
 }
 func judgeActiveDeadlineSeconds(cur *int64, exp *int64) bool {
 	if exp != nil && *cur != *exp {
@@ -212,6 +227,9 @@ func judgeContainerEqual(now *corev1.Container, exp *corev1.Container) bool {
 	if !reflect.DeepEqual(now.Command, exp.Command) {
 		return false
 	}
+	if !reflect.DeepEqual(now.Args, exp.Args) {
+		return false
+	}
 	if now.Image != exp.Image {
 		return false
 	}
@@ -300,7 +318,8 @@ func (j *jobBuilder) SetContainers(container *corev1.Container) JobBuilder {
 }
 
 func (j *jobBuilder) SetTTLSecondsAfterFinished(i int32) JobBuilder {
-	j.job.Spec.TTLSecondsAfterFinished = &i
+	ret := i
+	j.job.Spec.TTLSecondsAfterFinished = &ret
 	return j
 }
 
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go
index 33a6a42..cd10e4f 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go
@@ -26,3 +26,10 @@ func TestShardingSphereChaos(t *testing.T) {
 	RegisterFailHandler(Fail)
 	RunSpecs(t, "ShardingSphereChaos Suite")
 }
+
+//func installChaosMesh() {
+//	var (
+//		installCmd = "curl -sSL https://mirrors.chaos-mesh.org/v2.5.1/install.sh | bash -s -- --docker-mirror"
+//	)
+//
+//}