You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/05/15 06:37:33 UTC

[shardingsphere-on-cloud] branch main updated: feat(operator): add-pressure-exec replace job(get pod log)

This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b4ae4c  feat(operator): add-pressure-exec replace job(get pod log)
     new 8e895b9  Merge pull request #358 from moomman/add-pressure-exec
6b4ae4c is described below

commit 6b4ae4caa25978541cc0314d0919398f9348aa7d
Author: moonman <ag...@163.com>
AuthorDate: Wed May 10 18:44:58 2023 +0800

    feat(operator): add-pressure-exec replace job(get pod log)
---
 .../api/v1alpha1/shardingsphere_chaos_types.go     |  21 +-
 .../api/v1alpha1/zz_generated.deepcopy.go          |  61 ++-
 .../cmd/shardingsphere-operator/manager/option.go  |  19 +-
 shardingsphere-operator/go.mod                     |   4 +-
 shardingsphere-operator/go.sum                     |   6 +
 .../controllers/shardingsphere_chaos_controller.go | 417 ++++++---------------
 shardingsphere-operator/pkg/pressure/pressure.go   | 203 ++++++++++
 .../pkg/pressure/pressure_test.go                  |  91 +++++
 8 files changed, 483 insertions(+), 339 deletions(-)

diff --git a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
index e1c5e90..43431e4 100644
--- a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
+++ b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
@@ -42,13 +42,24 @@ type ShardingSphereChaos struct {
 
 // ShardingSphereChaosSpec defines the desired state of ShardingSphereChaos
 type ShardingSphereChaosSpec struct {
-	InjectJob  JobSpec `json:"injectJob,omitempty"`
-	EmbedChaos `json:",inline"`
-	Expect     Expect `json:"expect,omitempty"`
+	InjectJob   JobSpec `json:"injectJob,omitempty"`
+	EmbedChaos  `json:",inline"`
+	PressureCfg PressureCfg `json:"pressureCfg"`
 }
 
-type Expect struct {
-	Verify string `json:"verify,omitempty"`
+type PressureCfg struct {
+	ZkHost        string          `json:"zkHost,omitempty"`
+	SsHost        string          `json:"ssHost"`
+	Duration      metav1.Duration `json:"duration"`
+	ReqTime       metav1.Duration `json:"reqTime"`
+	DistSQLs      []DistSQL       `json:"distSQLs,omitempty"`
+	ConcurrentNum int             `json:"concurrentNum"`
+	ReqNum        int             `json:"reqNum"`
+}
+
+type DistSQL struct {
+	SQL  string   `json:"sql"`
+	Args []string `json:"args"`
 }
 
 type Script string
diff --git a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
index 466979d..f2e6b08 100644
--- a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
+++ b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
@@ -550,6 +550,26 @@ func (in *DelayParams) DeepCopy() *DelayParams {
 	return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *DistSQL) DeepCopyInto(out *DistSQL) {
+	*out = *in
+	if in.Args != nil {
+		in, out := &in.Args, &out.Args
+		*out = make([]string, len(*in))
+		copy(*out, *in)
+	}
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DistSQL.
+func (in *DistSQL) DeepCopy() *DistSQL {
+	if in == nil {
+		return nil
+	}
+	out := new(DistSQL)
+	in.DeepCopyInto(out)
+	return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *DuplicationParams) DeepCopyInto(out *DuplicationParams) {
 	*out = *in
@@ -605,21 +625,6 @@ func (in *Endpoint) DeepCopy() *Endpoint {
 	return out
 }
 
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
-func (in *Expect) DeepCopyInto(out *Expect) {
-	*out = *in
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Expect.
-func (in *Expect) DeepCopy() *Expect {
-	if in == nil {
-		return nil
-	}
-	out := new(Expect)
-	in.DeepCopyInto(out)
-	return out
-}
-
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *InstanceStatus) DeepCopyInto(out *InstanceStatus) {
 	*out = *in
@@ -1047,6 +1052,30 @@ func (in *PortBinding) DeepCopy() *PortBinding {
 	return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *PressureCfg) DeepCopyInto(out *PressureCfg) {
+	*out = *in
+	out.Duration = in.Duration
+	out.ReqTime = in.ReqTime
+	if in.DistSQLs != nil {
+		in, out := &in.DistSQLs, &out.DistSQLs
+		*out = make([]DistSQL, len(*in))
+		for i := range *in {
+			(*in)[i].DeepCopyInto(&(*out)[i])
+		}
+	}
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PressureCfg.
+func (in *PressureCfg) DeepCopy() *PressureCfg {
+	if in == nil {
+		return nil
+	}
+	out := new(PressureCfg)
+	in.DeepCopyInto(out)
+	return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 func (in *Privilege) DeepCopyInto(out *Privilege) {
 	*out = *in
@@ -1436,7 +1465,7 @@ func (in *ShardingSphereChaosSpec) DeepCopyInto(out *ShardingSphereChaosSpec) {
 	*out = *in
 	out.InjectJob = in.InjectJob
 	in.EmbedChaos.DeepCopyInto(&out.EmbedChaos)
-	out.Expect = in.Expect
+	in.PressureCfg.DeepCopyInto(&out.PressureCfg)
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardingSphereChaosSpec.
diff --git a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index ab69d45..9314332 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -21,6 +21,8 @@ import (
 	"flag"
 	"strings"
 
+	"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
+
 	"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
 	"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/controllers"
 	sschaos "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh"
@@ -179,14 +181,15 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
 			return err
 		}
 		if err := (&controllers.ShardingSphereChaosReconciler{
-			Client:    mgr.GetClient(),
-			Scheme:    mgr.GetScheme(),
-			Log:       mgr.GetLogger(),
-			Chaos:     sschaos.NewChaos(mgr.GetClient()),
-			Job:       job.NewJob(mgr.GetClient()),
-			ConfigMap: configmap.NewConfigMapClient(mgr.GetClient()),
-			Events:    mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
-			ClientSet: clientset,
+			Client:       mgr.GetClient(),
+			Scheme:       mgr.GetScheme(),
+			Log:          mgr.GetLogger(),
+			Chaos:        sschaos.NewChaos(mgr.GetClient()),
+			Job:          job.NewJob(mgr.GetClient()),
+			ExecRecorder: make([]*pressure.Pressure, 0),
+			ConfigMap:    configmap.NewConfigMapClient(mgr.GetClient()),
+			Events:       mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
+			ClientSet:    clientset,
 		}).SetupWithManager(mgr); err != nil {
 			logger.Error(err, "unable to create controller", "controller", "ShardingSphereChaos")
 			return err
diff --git a/shardingsphere-operator/go.mod b/shardingsphere-operator/go.mod
index d20fc79..621957a 100644
--- a/shardingsphere-operator/go.mod
+++ b/shardingsphere-operator/go.mod
@@ -6,9 +6,9 @@ require (
 	bou.ke/monkey v1.0.2
 	github.com/antlr/antlr4 v0.0.0-20181218183524-be58ebffde8e
 	github.com/chaos-mesh/chaos-mesh/api v0.0.0-20230410023700-25a841a23cd2
-	github.com/golang/mock v1.6.0
 	github.com/database-mesh/golang-sdk v0.0.0-20230420101548-53265cd9883a
 	github.com/go-logr/logr v1.2.4
+	github.com/golang/mock v1.6.0
 	github.com/onsi/ginkgo/v2 v2.8.0
 	github.com/onsi/gomega v1.26.0
 	github.com/prometheus/client_golang v1.14.0
@@ -25,6 +25,7 @@ require (
 require github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
 
 require (
+	github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect
 	github.com/aws/aws-sdk-go-v2 v1.17.5 // indirect
 	github.com/aws/aws-sdk-go-v2/config v1.18.4 // indirect
 	github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect
@@ -50,6 +51,7 @@ require (
 	github.com/go-openapi/jsonpointer v0.19.6 // indirect
 	github.com/go-openapi/jsonreference v0.20.2 // indirect
 	github.com/go-openapi/swag v0.22.3 // indirect
+	github.com/go-sql-driver/mysql v1.7.0 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
 	github.com/golang/protobuf v1.5.3 // indirect
diff --git a/shardingsphere-operator/go.sum b/shardingsphere-operator/go.sum
index eee5ba1..863342f 100644
--- a/shardingsphere-operator/go.sum
+++ b/shardingsphere-operator/go.sum
@@ -3,6 +3,8 @@ bou.ke/monkey v1.0.2/go.mod h1:OqickVX3tNx6t33n1xvtTtu85YN5s6cKwVug+oHMaIA=
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
+github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
 github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
@@ -93,6 +95,10 @@ github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2Kv
 github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
 github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
 github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
+github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
+github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
+github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
+github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
diff --git a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
index 2123747..089abfe 100644
--- a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
+++ b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
@@ -21,7 +21,8 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"time"
+
+	"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
 
 	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
 
@@ -37,10 +38,8 @@ import (
 	apierrors "k8s.io/apimachinery/pkg/api/errors"
 	"k8s.io/apimachinery/pkg/runtime"
 	"k8s.io/apimachinery/pkg/types"
-	"k8s.io/apimachinery/pkg/util/wait"
 	clientset "k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/tools/record"
-	"k8s.io/client-go/util/retry"
 	ctrl "sigs.k8s.io/controller-runtime"
 	"sigs.k8s.io/controller-runtime/pkg/client"
 )
@@ -70,9 +69,10 @@ type ShardingSphereChaosReconciler struct {
 	Events    record.EventRecorder
 	ClientSet *clientset.Clientset
 
-	Chaos     sschaos.Chaos
-	Job       job.Job
-	ConfigMap configmap.ConfigMap
+	Chaos        sschaos.Chaos
+	Job          job.Job
+	ExecRecorder []*pressure.Pressure
+	ConfigMap    configmap.ConfigMap
 }
 
 // Reconcile handles main function of this controller
@@ -114,12 +114,10 @@ func (r *ShardingSphereChaosReconciler) Reconcile(ctx context.Context, req ctrl.
 		r.Events.Event(ssChaos, "Warning", "configmap error", err.Error())
 	}
 
-	if err := r.reconcileJob(ctx, ssChaos); err != nil {
+	if err := r.reconcilePressure(ctx, ssChaos); err != nil {
 		if err != nil {
 			errors = append(errors, err)
 		}
-		logger.Error(err, "reconcile job error")
-		r.Events.Event(ssChaos, "Warning", "job error", err.Error())
 	}
 
 	if err := r.reconcileStatus(ctx, ssChaos); err != nil {
@@ -134,6 +132,106 @@ func (r *ShardingSphereChaosReconciler) Reconcile(ctx context.Context, req ctrl.
 	return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
 }
 
+func (r *ShardingSphereChaosReconciler) reconcilePressure(ctx context.Context, chao *v1alpha1.ShardingSphereChaos) error {
+	exec := r.getNeedExec(chao)
+
+	//if exec in this phase do not exist,create it
+	if exec == nil {
+		exec := pressure.NewPressure(getExecName(chao), chao.Spec.PressureCfg.DistSQLs)
+		go exec.Run(ctx, &chao.Spec.PressureCfg)
+		r.ExecRecorder = append(r.ExecRecorder, exec)
+	}
+
+	return nil
+}
+
+func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
+	namespacedName := types.NamespacedName{
+		Name:      chaos.Name,
+		Namespace: chaos.Namespace,
+	}
+
+	setDefaultStatus(chaos)
+	r.updatePhaseExec(chaos)
+
+	if err := r.updateChaosCondition(ctx, chaos); err != nil {
+		return err
+	}
+
+	rt, err := r.getRuntimeChaos(ctx, namespacedName)
+	if err != nil {
+		return err
+	}
+	rt.Status = chaos.Status
+
+	return r.Status().Update(ctx, rt)
+}
+
+func (r *ShardingSphereChaosReconciler) updatePhaseExec(chaos *v1alpha1.ShardingSphereChaos) {
+	exec := r.getNeedExec(chaos)
+	if exec == nil || exec.Active {
+		return
+	}
+
+	//todo: judge error
+
+	msg := generateMsgFromExec(exec)
+	//when exec finished, update phase
+	switch chaos.Status.Phase {
+	case v1alpha1.BeforeSteady:
+		chaos.Status.Result.Steady = *msg
+		chaos.Status.Phase = v1alpha1.BeforeChaos
+	case v1alpha1.BeforeChaos:
+		chaos.Status.Result.Chaos = *msg
+		chaos.Status.Phase = v1alpha1.AfterChaos
+	}
+
+}
+
+func generateMsgFromExec(exec *pressure.Pressure) *v1alpha1.Msg {
+	//todo: wait to change result compute way
+	rate := 0
+	if exec.Result.Total == 0 {
+		rate = 0
+	} else {
+		rate = exec.Result.Success / exec.Result.Total
+	}
+	msg := v1alpha1.Msg{
+		Result:   fmt.Sprintf("%d", rate),
+		Duration: exec.Result.Duration.String(),
+	}
+	if exec.Err != nil {
+		msg.FailureDetails = exec.Err.Error()
+	}
+
+	return &msg
+}
+
+func getExecName(chao *v1alpha1.ShardingSphereChaos) string {
+	var execName string
+	if chao.Status.Phase == v1alpha1.BeforeSteady || chao.Status.Phase == v1alpha1.AfterSteady {
+		execName = reconcile.MakeJobName(chao.Name, reconcile.InSteady)
+	}
+	if chao.Status.Phase == v1alpha1.BeforeChaos || chao.Status.Phase == v1alpha1.AfterChaos {
+		execName = reconcile.MakeJobName(chao.Name, reconcile.InChaos)
+	}
+
+	return execName
+}
+
+func (r *ShardingSphereChaosReconciler) getNeedExec(chao *v1alpha1.ShardingSphereChaos) *pressure.Pressure {
+	jobName := getExecName(chao)
+
+	//if pressure do not exist,run it
+	for i := range r.ExecRecorder {
+		if r.ExecRecorder[i].Name == jobName {
+			return r.ExecRecorder[i]
+		}
+	}
+
+	return nil
+}
+
 func (r *ShardingSphereChaosReconciler) getRuntimeChaos(ctx context.Context, name types.NamespacedName) (*v1alpha1.ShardingSphereChaos, error) {
 	var rt = &v1alpha1.ShardingSphereChaos{}
 	err := r.Get(ctx, name, rt)
@@ -215,7 +313,7 @@ func (r *ShardingSphereChaosReconciler) deleteNetworkChaos(ctx context.Context,
 func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
 	logger := r.Log.WithValues("reconcile shardingspherechaos", fmt.Sprintf("%s/%s", chaos.Namespace, chaos.Name))
 
-	if chaos.Status.Phase == v1alpha1.BeforeSteady || chaos.Status.Phase == v1alpha1.AfterSteady {
+	if chaos.Status.Phase == "" || chaos.Status.Phase == v1alpha1.BeforeSteady || chaos.Status.Phase == v1alpha1.AfterSteady {
 		return nil
 	}
 
@@ -336,221 +434,12 @@ func (r *ShardingSphereChaosReconciler) reconcileConfigMap(ctx context.Context,
 	return nil
 }
 
-func (r *ShardingSphereChaosReconciler) reconcileJob(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
-	var jobType reconcile.JobType
-
-	switch chaos.Status.Phase {
-	case v1alpha1.BeforeChaos, v1alpha1.AfterChaos:
-		jobType = reconcile.InChaos
-	case v1alpha1.BeforeSteady, v1alpha1.AfterSteady:
-		jobType = reconcile.InSteady
-	default:
-		jobType = reconcile.InSteady
-	}
-
-	namespaceName := types.NamespacedName{
-		Namespace: chaos.Namespace,
-		Name:      reconcile.MakeJobName(chaos.Name, jobType),
-	}
-
-	job, err := r.getJobByNamespacedName(ctx, namespaceName)
-	if err != nil {
-		return err
-	}
-
-	if job != nil {
-		return r.updateJob(ctx, jobType, chaos, job)
-	}
-
-	return r.createJob(ctx, jobType, chaos)
-}
-
-func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
-	namespacedName := types.NamespacedName{
-		Name:      chaos.Name,
-		Namespace: chaos.Namespace,
-	}
-	chaos, err := r.getRuntimeChaos(ctx, namespacedName)
-	if err != nil {
-		if apierrors.IsNotFound(err) {
-			return nil
-		}
-		return err
-	}
-	setDefaultStatus(chaos)
-
-	req := getJobType(chaos)
-	job, err := r.getJobByNamespacedName(ctx, types.NamespacedName{Namespace: chaos.Namespace, Name: reconcile.MakeJobName(chaos.Name, req)})
-	if err != nil || job == nil {
-		return err
-	}
-
-	updatePhase(chaos, job)
-
-	condition := getJobCondition(job.Status.Conditions)
-	if condition == FailureJob {
-		r.Events.Event(chaos, "Warning", "failed", fmt.Sprintf("job: %s", job.Name))
-	}
-
-	//update result,when one part job finished,rely on current status
-	if chaos.Status.Phase == v1alpha1.AfterSteady || chaos.Status.Phase == v1alpha1.AfterChaos {
-		if err := r.collectJobMsg(ctx, chaos, job); err != nil {
-			r.Events.Event(chaos, "Warning", "getPodLog", err.Error())
-			return err
-		}
-	}
-
-	if err := r.updateChaosCondition(ctx, chaos); err != nil {
-		return err
-	}
-
-	rt, err := r.getRuntimeChaos(ctx, namespacedName)
-	if err != nil {
-		return err
-	}
-	rt.Status = chaos.Status
-
-	return r.Status().Update(ctx, rt)
-}
-
-func updatePhase(chaos *v1alpha1.ShardingSphereChaos, job *batchV1.Job) {
-	switch {
-	//in this phase,update to next,and collect job msg
-	case chaos.Status.Phase == v1alpha1.BeforeSteady && job.Status.Succeeded == 1:
-		chaos.Status.Phase = v1alpha1.AfterSteady
-		//update in next reconcile,wait steady msg collection
-	case chaos.Status.Phase == v1alpha1.AfterSteady && chaos.Status.Result.Steady.Result != "":
-		chaos.Status.Phase = v1alpha1.BeforeChaos
-	case chaos.Status.Phase == v1alpha1.BeforeChaos && job.Status.Succeeded == 1:
-		chaos.Status.Phase = v1alpha1.AfterChaos
-	}
-
-}
-
 func setDefaultStatus(chaos *v1alpha1.ShardingSphereChaos) {
 	if chaos.Status.Phase == "" {
 		chaos.Status.Phase = v1alpha1.BeforeSteady
 	}
 }
 
-// getJobType to get the coming job requirement
-// * BeforeSteady: it hasn't been started, could start a new experiment
-// * AfterSteady: it has been finished, could start a new experiment
-// * InjectChaos: it has been started, could start some pressure
-// * recoveredChaos: it has been recovered, could start to verify
-func getJobType(ssChaos *v1alpha1.ShardingSphereChaos) reconcile.JobType {
-	var jobType reconcile.JobType
-
-	if ssChaos.Status.Phase == v1alpha1.BeforeSteady || ssChaos.Status.Phase == v1alpha1.AfterSteady {
-		jobType = reconcile.InSteady
-	}
-
-	if ssChaos.Status.Phase == v1alpha1.BeforeChaos || ssChaos.Status.Phase == v1alpha1.AfterChaos {
-		jobType = reconcile.InChaos
-	}
-
-	return jobType
-}
-
-func (r *ShardingSphereChaosReconciler) getJobByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*batchV1.Job, error) {
-	job, err := r.Job.GetByNamespacedName(ctx, namespacedName)
-	if err != nil {
-		return nil, err
-	}
-	return job, nil
-}
-
-func getJobCondition(conditions []batchV1.JobCondition) JobCondition {
-	var ret = ActiveJob
-	for i := range conditions {
-		p := &conditions[i]
-		switch {
-		case p.Type == batchV1.JobComplete && p.Status == corev1.ConditionTrue:
-			ret = CompleteJob
-		case p.Type == batchV1.JobFailed && p.Status == corev1.ConditionTrue:
-			ret = FailureJob
-		case p.Type == batchV1.JobSuspended && p.Status == corev1.ConditionTrue:
-			ret = SuspendJob
-		case p.Type == batchV1.JobFailureTarget:
-			ret = FailureJob
-		}
-
-	}
-	return ret
-}
-
-func (r *ShardingSphereChaosReconciler) collectJobMsg(ctx context.Context, ssChaos *v1alpha1.ShardingSphereChaos, rJob *batchV1.Job) error {
-	if isExistResult(ssChaos) {
-		return nil
-	}
-
-	condition := getJobCondition(rJob.Status.Conditions)
-	var result *v1alpha1.Msg
-	if ssChaos.Status.Phase == v1alpha1.AfterSteady {
-		result = &ssChaos.Status.Result.Steady
-	} else if ssChaos.Status.Phase == v1alpha1.AfterChaos {
-		result = &ssChaos.Status.Result.Chaos
-	}
-
-	if condition == CompleteJob {
-		log, err := r.getPodLog(ctx, rJob)
-		if err != nil {
-			return err
-		}
-		//todo: unpack msg with json
-		//if err := json.Unmarshal(log, result); err != nil {
-		//	return err
-		//}
-		result.Result = string(log)
-	}
-
-	if condition == FailureJob {
-		log, err := r.getPodLog(ctx, rJob)
-		if err != nil {
-			return err
-		}
-		result.FailureDetails = string(log)
-	}
-
-	return nil
-}
-
-func isExistResult(ssChaos *v1alpha1.ShardingSphereChaos) bool {
-	if ssChaos.Status.Phase == v1alpha1.AfterSteady && ssChaos.Status.Result.Steady.Result == "" {
-		return true
-	}
-	if ssChaos.Status.Phase == v1alpha1.AfterChaos && ssChaos.Status.Result.Chaos.Result == "" {
-		return true
-	}
-
-	return false
-}
-
-// FIXME: this will broke if the log is too long
-func (r *ShardingSphereChaosReconciler) getPodLog(ctx context.Context, rJob *batchV1.Job) ([]byte, error) {
-	pods := &corev1.PodList{}
-
-	if err := r.List(ctx, pods, client.MatchingLabels{"controller-uid": rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
-		return nil, err
-	}
-	if pods.Items == nil || len(pods.Items) == 0 {
-		return nil, nil
-	}
-
-	pod := &pods.Items[0]
-
-	req := r.ClientSet.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{})
-	res := req.Do(ctx)
-	if res.Error() != nil {
-		return []byte{}, res.Error()
-	}
-	ret, err := res.Raw()
-	if err != nil {
-		return []byte{}, err
-	}
-	return ret, nil
-}
-
 func (r *ShardingSphereChaosReconciler) updateChaosCondition(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
 	namespacedName := types.NamespacedName{
 		Namespace: chaos.Namespace,
@@ -617,96 +506,6 @@ func (r *ShardingSphereChaosReconciler) createConfigMap(ctx context.Context, cha
 	return err
 }
 
-// TODO: consider a new job name pattern
-func (r *ShardingSphereChaosReconciler) updateJob(ctx context.Context, requirement reconcile.JobType, chao *v1alpha1.ShardingSphereChaos, cur *batchV1.Job) error {
-	isEqual, err := reconcile.IsJobChanged(chao, requirement, cur)
-	if err != nil {
-		return err
-	}
-	if !isEqual {
-		if err := r.Delete(ctx, cur); err != nil && !apierrors.IsNotFound(err) {
-			return err
-		}
-	}
-	return nil
-}
-
-func (r *ShardingSphereChaosReconciler) createJob(ctx context.Context, requirement reconcile.JobType, chao *v1alpha1.ShardingSphereChaos) error {
-	injectJob, err := reconcile.NewJob(chao, requirement)
-	if err != nil {
-		return err
-	}
-	if err := ctrl.SetControllerReference(chao, injectJob, r.Scheme); err != nil {
-		return err
-	}
-
-	err = r.Create(ctx, injectJob)
-	if err != nil {
-		return client.IgnoreAlreadyExists(err)
-	}
-	//FIXME: consider remove the following L620-L676, perhaps don't need to pay much attention to the pod
-	rJob := &batchV1.Job{}
-	backoff := wait.Backoff{
-		Steps:    6,
-		Duration: 500 * time.Millisecond,
-		Factor:   5.0,
-		Jitter:   0.1,
-	}
-
-	if err := retry.OnError(
-		backoff,
-		func(e error) bool {
-			return true
-		},
-		func() error {
-			return r.Get(ctx, types.NamespacedName{Namespace: chao.Namespace, Name: reconcile.MakeJobName(chao.Name, requirement)}, rJob)
-		},
-	); err != nil {
-		return err
-	}
-
-	podList := &corev1.PodList{}
-	if err := retry.OnError(backoff, func(e error) bool {
-		return e != nil
-	}, func() error {
-		if err := r.List(ctx, podList, client.MatchingLabels{"controller-uid": rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
-			return err
-		}
-		if len(podList.Items) == 0 {
-			return ErrNoPod
-		}
-		return nil
-	}); err != nil {
-		return err
-	}
-
-	for i := range podList.Items {
-		rPod := &podList.Items[i]
-		if err := ctrl.SetControllerReference(rJob, rPod, r.Scheme); err != nil {
-			return err
-		}
-
-		exp := rPod.DeepCopy()
-		updateBackoff := wait.Backoff{
-			Steps:    5,
-			Duration: 30 * time.Millisecond,
-			Factor:   5.0,
-			Jitter:   0.1,
-		}
-		if err := retry.RetryOnConflict(updateBackoff, func() error {
-			if err := r.Update(ctx, exp); err != nil {
-				return err
-			}
-			return nil
-		}); err != nil {
-			return err
-		}
-	}
-
-	r.Events.Event(chao, "Normal", "Created", fmt.Sprintf("%s job created", requirement))
-	return nil
-}
-
 // SetupWithManager sets up the controller with the Manager.
 func (r *ShardingSphereChaosReconciler) SetupWithManager(mgr ctrl.Manager) error {
 	return ctrl.NewControllerManagedBy(mgr).
diff --git a/shardingsphere-operator/pkg/pressure/pressure.go b/shardingsphere-operator/pkg/pressure/pressure.go
new file mode 100644
index 0000000..061bc3b
--- /dev/null
+++ b/shardingsphere-operator/pkg/pressure/pressure.go
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pressure
+
+import (
+	"context"
+	"database/sql"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/database-mesh/golang-sdk/pkg/random"
+
+	"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+	_ "github.com/go-sql-driver/mysql"
+)
+
+type Pressure struct {
+	Active         bool
+	Name           string
+	Result         Result
+	Err            error
+	Tasks          []v1alpha1.DistSQL
+	finishSignalCh chan struct{}
+	wg             sync.WaitGroup
+}
+
+var (
+	db       *sql.DB
+	totalReq int
+)
+
+type Result struct {
+	//total exec req Number
+	Total int
+	//total success req Number
+	Success int
+	//todo: get total or get every exec
+
+	//total time in this Pressure execution
+	Duration time.Duration
+}
+
+func NewPressure(name string, tasks []v1alpha1.DistSQL) *Pressure {
+	return &Pressure{
+		Active:         false,
+		Name:           name,
+		Result:         Result{},
+		Err:            nil,
+		Tasks:          tasks,
+		wg:             sync.WaitGroup{},
+		finishSignalCh: make(chan struct{}),
+	}
+}
+
+// todo: get conn args by labels over string
+func initDB(connArgs string) error {
+	var err error
+	db, err = sql.Open("mysql", connArgs)
+	if err != nil {
+		return err
+	}
+	if err := db.Ping(); err != nil {
+		return err
+	}
+	db.SetConnMaxLifetime(60 * time.Second)
+	return nil
+}
+
+func (p *Pressure) Run(ctx context.Context, pressureCfg *v1alpha1.PressureCfg) {
+	p.Active = true
+	totalReq = 0
+
+	//judge nil for simplify test
+	if db == nil {
+		if err := initDB(pressureCfg.SsHost); err != nil {
+			p.Err = err
+			return
+		}
+		defer func() {
+			if err := db.Close(); err != nil {
+				p.Err = err
+			}
+		}()
+	}
+
+	result := &p.Result
+	pressureCtx, cancel := context.WithTimeout(context.Background(), pressureCfg.Duration.Duration)
+	defer cancel()
+	ticker := time.NewTicker(pressureCfg.ReqTime.Duration)
+	resCh := make(chan bool, 1000)
+
+	//handle result
+	go p.handleResponse(pressureCtx, resCh, result)
+
+	//statistics the running time
+	start := time.Now()
+FOR:
+	for {
+		select {
+		case <-ctx.Done():
+			break FOR
+		case <-pressureCtx.Done():
+			break FOR
+		case <-ticker.C:
+			for i := 0; i < pressureCfg.ConcurrentNum; i++ {
+				totalReq += pressureCfg.ReqNum
+				//todo: handle err
+
+				//put wg here to prevent: when root ctx is closed,but some exec task do not start yet
+				p.wg.Add(1)
+				go p.exec(pressureCtx, pressureCfg.ReqNum, resCh)
+			}
+		}
+	}
+
+	//occur when pressureCtx or root ctx closed
+
+	//wait all exec calls return,we can safely close the result channel
+	p.wg.Wait()
+	end := time.Now()
+	p.Result.Duration = end.Sub(start)
+	close(resCh)
+
+	//wait collect results channel finished
+	<-p.finishSignalCh
+
+	//when all task finished,update active
+	p.Active = false
+}
+
+func (p *Pressure) exec(ctx context.Context, times int, res chan bool) {
+	defer p.wg.Done()
+	for i := 0; i < times; i++ {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+		}
+		if len(p.Tasks) == 0 {
+			return
+		}
+		for i := range p.Tasks {
+			//generate diff sql, put result into channel
+			args := randomArgs(p.Tasks[i].Args)
+			_, err := db.Exec(p.Tasks[i].SQL, args)
+			res <- err == nil
+		}
+	}
+}
+
+func (p *Pressure) handleResponse(ctx context.Context, resCh chan bool, result *Result) {
+For:
+	for {
+		select {
+		case <-ctx.Done():
+			break For
+		case ret := <-resCh:
+			//todo: add more msg
+			handle(ret, result)
+		}
+	}
+
+	//get left handleResponse
+	for ret := range resCh {
+		handle(ret, result)
+	}
+
+	//when all handle finish,put a signal to finish chan
+	p.finishSignalCh <- struct{}{}
+}
+
+//todo:add more logic and change ret type(bool ---> struct)
+func handle(ret bool, result *Result) {
+	if ret {
+		result.Success++
+	}
+	result.Total++
+}
+
+func randomArgs(args []string) []string {
+	var ret []string
+	for i := range args {
+		randomArg := fmt.Sprintf("%s-%s", args[i], random.StringN(4))
+		ret = append(ret, randomArg)
+	}
+	return ret
+}
diff --git a/shardingsphere-operator/pkg/pressure/pressure_test.go b/shardingsphere-operator/pkg/pressure/pressure_test.go
new file mode 100644
index 0000000..ac2b2f3
--- /dev/null
+++ b/shardingsphere-operator/pkg/pressure/pressure_test.go
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pressure
+
+import (
+	"bou.ke/monkey"
+	"context"
+	"database/sql"
+	"github.com/DATA-DOG/go-sqlmock"
+	"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"regexp"
+	"testing"
+	"time"
+)
+
+func TestPressure(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Controllers Suite")
+}
+
+var _ = Describe("test pressure", func() {
+	var (
+		dbmock sqlmock.Sqlmock
+		err    error
+	)
+	BeforeEach(func() {
+		db, dbmock, err = sqlmock.New()
+		Expect(err).To(BeNil())
+		Expect(db).NotTo(BeNil())
+		Expect(dbmock).NotTo(BeNil())
+
+		monkey.Patch(sql.Open, func(driverName, dataSourceName string) (*sql.DB, error) {
+			return db, nil
+		})
+	})
+
+	AfterEach(func() {
+		monkey.Unpatch(sql.Open)
+		db.Close()
+	})
+
+	Context("test Run function", func() {
+		It("should Run successfully", func() {
+			registerStorageUnitCase := &v1alpha1.PressureCfg{
+				ZkHost:   "",
+				SsHost:   "test",
+				Duration: metav1.Duration{Duration: 20 * time.Second},
+				ReqTime:  metav1.Duration{Duration: 5 * time.Second},
+				DistSQLs: []v1alpha1.DistSQL{
+					{
+						SQL: "REGISTER STORAGE UNIT ?",
+						Args: []string{
+							"**",
+						},
+					},
+				},
+				ConcurrentNum: 2,
+				ReqNum:        5,
+			}
+
+			dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
+			pressure := NewPressure("verify", registerStorageUnitCase.DistSQLs)
+			pressure.Run(context.TODO(), registerStorageUnitCase)
+
+			Expect(pressure.Result.Total > 0).To(BeTrue())
+			Expect(pressure.Result.Success >= 0).To(BeTrue())
+			Expect(pressure.Result.Total >= pressure.Result.Success).To(BeTrue())
+			Expect(pressure.Result.Duration.Milliseconds() >= registerStorageUnitCase.Duration.Milliseconds()).To(BeTrue())
+			Expect(pressure.Active).To(BeFalse())
+		})
+	})
+
+})