You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/05/15 06:37:33 UTC
[shardingsphere-on-cloud] branch main updated: feat(operator): add-pressure-exec replace job(get pod log)
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 6b4ae4c feat(operator): add-pressure-exec replace job(get pod log)
new 8e895b9 Merge pull request #358 from moomman/add-pressure-exec
6b4ae4c is described below
commit 6b4ae4caa25978541cc0314d0919398f9348aa7d
Author: moonman <ag...@163.com>
AuthorDate: Wed May 10 18:44:58 2023 +0800
feat(operator): add-pressure-exec replace job(get pod log)
---
.../api/v1alpha1/shardingsphere_chaos_types.go | 21 +-
.../api/v1alpha1/zz_generated.deepcopy.go | 61 ++-
.../cmd/shardingsphere-operator/manager/option.go | 19 +-
shardingsphere-operator/go.mod | 4 +-
shardingsphere-operator/go.sum | 6 +
.../controllers/shardingsphere_chaos_controller.go | 417 ++++++---------------
shardingsphere-operator/pkg/pressure/pressure.go | 203 ++++++++++
.../pkg/pressure/pressure_test.go | 91 +++++
8 files changed, 483 insertions(+), 339 deletions(-)
diff --git a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
index e1c5e90..43431e4 100644
--- a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
+++ b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
@@ -42,13 +42,24 @@ type ShardingSphereChaos struct {
// ShardingSphereChaosSpec defines the desired state of ShardingSphereChaos
type ShardingSphereChaosSpec struct {
- InjectJob JobSpec `json:"injectJob,omitempty"`
- EmbedChaos `json:",inline"`
- Expect Expect `json:"expect,omitempty"`
+ InjectJob JobSpec `json:"injectJob,omitempty"`
+ EmbedChaos `json:",inline"`
+ PressureCfg PressureCfg `json:"pressureCfg"`
}
-type Expect struct {
- Verify string `json:"verify,omitempty"`
+type PressureCfg struct {
+ ZkHost string `json:"zkHost,omitempty"`
+ SsHost string `json:"ssHost"`
+ Duration metav1.Duration `json:"duration"`
+ ReqTime metav1.Duration `json:"reqTime"`
+ DistSQLs []DistSQL `json:"distSQLs,omitempty"`
+ ConcurrentNum int `json:"concurrentNum"`
+ ReqNum int `json:"reqNum"`
+}
+
+type DistSQL struct {
+ SQL string `json:"sql"`
+ Args []string `json:"args"`
}
type Script string
diff --git a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
index 466979d..f2e6b08 100644
--- a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
+++ b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
@@ -550,6 +550,26 @@ func (in *DelayParams) DeepCopy() *DelayParams {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *DistSQL) DeepCopyInto(out *DistSQL) {
+ *out = *in
+ if in.Args != nil {
+ in, out := &in.Args, &out.Args
+ *out = make([]string, len(*in))
+ copy(*out, *in)
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DistSQL.
+func (in *DistSQL) DeepCopy() *DistSQL {
+ if in == nil {
+ return nil
+ }
+ out := new(DistSQL)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DuplicationParams) DeepCopyInto(out *DuplicationParams) {
*out = *in
@@ -605,21 +625,6 @@ func (in *Endpoint) DeepCopy() *Endpoint {
return out
}
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
-func (in *Expect) DeepCopyInto(out *Expect) {
- *out = *in
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Expect.
-func (in *Expect) DeepCopy() *Expect {
- if in == nil {
- return nil
- }
- out := new(Expect)
- in.DeepCopyInto(out)
- return out
-}
-
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *InstanceStatus) DeepCopyInto(out *InstanceStatus) {
*out = *in
@@ -1047,6 +1052,30 @@ func (in *PortBinding) DeepCopy() *PortBinding {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *PressureCfg) DeepCopyInto(out *PressureCfg) {
+ *out = *in
+ out.Duration = in.Duration
+ out.ReqTime = in.ReqTime
+ if in.DistSQLs != nil {
+ in, out := &in.DistSQLs, &out.DistSQLs
+ *out = make([]DistSQL, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PressureCfg.
+func (in *PressureCfg) DeepCopy() *PressureCfg {
+ if in == nil {
+ return nil
+ }
+ out := new(PressureCfg)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Privilege) DeepCopyInto(out *Privilege) {
*out = *in
@@ -1436,7 +1465,7 @@ func (in *ShardingSphereChaosSpec) DeepCopyInto(out *ShardingSphereChaosSpec) {
*out = *in
out.InjectJob = in.InjectJob
in.EmbedChaos.DeepCopyInto(&out.EmbedChaos)
- out.Expect = in.Expect
+ in.PressureCfg.DeepCopyInto(&out.PressureCfg)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardingSphereChaosSpec.
diff --git a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index ab69d45..9314332 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -21,6 +21,8 @@ import (
"flag"
"strings"
+ "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/controllers"
sschaos "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaosmesh"
@@ -179,14 +181,15 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
return err
}
if err := (&controllers.ShardingSphereChaosReconciler{
- Client: mgr.GetClient(),
- Scheme: mgr.GetScheme(),
- Log: mgr.GetLogger(),
- Chaos: sschaos.NewChaos(mgr.GetClient()),
- Job: job.NewJob(mgr.GetClient()),
- ConfigMap: configmap.NewConfigMapClient(mgr.GetClient()),
- Events: mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
- ClientSet: clientset,
+ Client: mgr.GetClient(),
+ Scheme: mgr.GetScheme(),
+ Log: mgr.GetLogger(),
+ Chaos: sschaos.NewChaos(mgr.GetClient()),
+ Job: job.NewJob(mgr.GetClient()),
+ ExecRecorder: make([]*pressure.Pressure, 0),
+ ConfigMap: configmap.NewConfigMapClient(mgr.GetClient()),
+ Events: mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
+ ClientSet: clientset,
}).SetupWithManager(mgr); err != nil {
logger.Error(err, "unable to create controller", "controller", "ShardingSphereChaos")
return err
diff --git a/shardingsphere-operator/go.mod b/shardingsphere-operator/go.mod
index d20fc79..621957a 100644
--- a/shardingsphere-operator/go.mod
+++ b/shardingsphere-operator/go.mod
@@ -6,9 +6,9 @@ require (
bou.ke/monkey v1.0.2
github.com/antlr/antlr4 v0.0.0-20181218183524-be58ebffde8e
github.com/chaos-mesh/chaos-mesh/api v0.0.0-20230410023700-25a841a23cd2
- github.com/golang/mock v1.6.0
github.com/database-mesh/golang-sdk v0.0.0-20230420101548-53265cd9883a
github.com/go-logr/logr v1.2.4
+ github.com/golang/mock v1.6.0
github.com/onsi/ginkgo/v2 v2.8.0
github.com/onsi/gomega v1.26.0
github.com/prometheus/client_golang v1.14.0
@@ -25,6 +25,7 @@ require (
require github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
require (
+ github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.17.5 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect
@@ -50,6 +51,7 @@ require (
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
+ github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
diff --git a/shardingsphere-operator/go.sum b/shardingsphere-operator/go.sum
index eee5ba1..863342f 100644
--- a/shardingsphere-operator/go.sum
+++ b/shardingsphere-operator/go.sum
@@ -3,6 +3,8 @@ bou.ke/monkey v1.0.2/go.mod h1:OqickVX3tNx6t33n1xvtTtu85YN5s6cKwVug+oHMaIA=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
+github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
@@ -93,6 +95,10 @@ github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2Kv
github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
+github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
+github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
+github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
+github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
diff --git a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
index 2123747..089abfe 100644
--- a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
+++ b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
@@ -21,7 +21,8 @@ import (
"context"
"errors"
"fmt"
- "time"
+
+ "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/pressure"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
@@ -37,10 +38,8 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
- "k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
@@ -70,9 +69,10 @@ type ShardingSphereChaosReconciler struct {
Events record.EventRecorder
ClientSet *clientset.Clientset
- Chaos sschaos.Chaos
- Job job.Job
- ConfigMap configmap.ConfigMap
+ Chaos sschaos.Chaos
+ Job job.Job
+ ExecRecorder []*pressure.Pressure
+ ConfigMap configmap.ConfigMap
}
// Reconcile handles main function of this controller
@@ -114,12 +114,10 @@ func (r *ShardingSphereChaosReconciler) Reconcile(ctx context.Context, req ctrl.
r.Events.Event(ssChaos, "Warning", "configmap error", err.Error())
}
- if err := r.reconcileJob(ctx, ssChaos); err != nil {
+ if err := r.reconcilePressure(ctx, ssChaos); err != nil {
if err != nil {
errors = append(errors, err)
}
- logger.Error(err, "reconcile job error")
- r.Events.Event(ssChaos, "Warning", "job error", err.Error())
}
if err := r.reconcileStatus(ctx, ssChaos); err != nil {
@@ -134,6 +132,106 @@ func (r *ShardingSphereChaosReconciler) Reconcile(ctx context.Context, req ctrl.
return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
}
+func (r *ShardingSphereChaosReconciler) reconcilePressure(ctx context.Context, chao *v1alpha1.ShardingSphereChaos) error {
+ exec := r.getNeedExec(chao)
+
+ //if exec in this phase do not exist,create it
+ if exec == nil {
+ exec := pressure.NewPressure(getExecName(chao), chao.Spec.PressureCfg.DistSQLs)
+ go exec.Run(ctx, &chao.Spec.PressureCfg)
+ r.ExecRecorder = append(r.ExecRecorder, exec)
+ }
+
+ return nil
+}
+
+func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
+ namespacedName := types.NamespacedName{
+ Name: chaos.Name,
+ Namespace: chaos.Namespace,
+ }
+
+ setDefaultStatus(chaos)
+ r.updatePhaseExec(chaos)
+
+ if err := r.updateChaosCondition(ctx, chaos); err != nil {
+ return err
+ }
+
+ rt, err := r.getRuntimeChaos(ctx, namespacedName)
+ if err != nil {
+ return err
+ }
+ rt.Status = chaos.Status
+
+ return r.Status().Update(ctx, rt)
+}
+
+func (r *ShardingSphereChaosReconciler) updatePhaseExec(chaos *v1alpha1.ShardingSphereChaos) {
+ exec := r.getNeedExec(chaos)
+ if exec == nil || exec.Active {
+ return
+ }
+
+ //todo: judge error
+
+ msg := generateMsgFromExec(exec)
+ //when exec finished, update phase
+ switch chaos.Status.Phase {
+ case v1alpha1.BeforeSteady:
+ chaos.Status.Result.Steady = *msg
+ chaos.Status.Phase = v1alpha1.BeforeChaos
+ case v1alpha1.BeforeChaos:
+ chaos.Status.Result.Chaos = *msg
+ chaos.Status.Phase = v1alpha1.AfterChaos
+ }
+
+}
+
+func generateMsgFromExec(exec *pressure.Pressure) *v1alpha1.Msg {
+ //todo: wait to change result compute way
+ rate := 0
+ if exec.Result.Total == 0 {
+ rate = 0
+ } else {
+ rate = exec.Result.Success / exec.Result.Total
+ }
+ msg := v1alpha1.Msg{
+ Result: fmt.Sprintf("%d", rate),
+ Duration: exec.Result.Duration.String(),
+ }
+ if exec.Err != nil {
+ msg.FailureDetails = exec.Err.Error()
+ }
+
+ return &msg
+}
+
+func getExecName(chao *v1alpha1.ShardingSphereChaos) string {
+ var execName string
+ if chao.Status.Phase == v1alpha1.BeforeSteady || chao.Status.Phase == v1alpha1.AfterSteady {
+ execName = reconcile.MakeJobName(chao.Name, reconcile.InSteady)
+ }
+ if chao.Status.Phase == v1alpha1.BeforeChaos || chao.Status.Phase == v1alpha1.AfterChaos {
+ execName = reconcile.MakeJobName(chao.Name, reconcile.InChaos)
+ }
+
+ return execName
+}
+
+func (r *ShardingSphereChaosReconciler) getNeedExec(chao *v1alpha1.ShardingSphereChaos) *pressure.Pressure {
+ jobName := getExecName(chao)
+
+ //if pressure do not exist,run it
+ for i := range r.ExecRecorder {
+ if r.ExecRecorder[i].Name == jobName {
+ return r.ExecRecorder[i]
+ }
+ }
+
+ return nil
+}
+
func (r *ShardingSphereChaosReconciler) getRuntimeChaos(ctx context.Context, name types.NamespacedName) (*v1alpha1.ShardingSphereChaos, error) {
var rt = &v1alpha1.ShardingSphereChaos{}
err := r.Get(ctx, name, rt)
@@ -215,7 +313,7 @@ func (r *ShardingSphereChaosReconciler) deleteNetworkChaos(ctx context.Context,
func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
logger := r.Log.WithValues("reconcile shardingspherechaos", fmt.Sprintf("%s/%s", chaos.Namespace, chaos.Name))
- if chaos.Status.Phase == v1alpha1.BeforeSteady || chaos.Status.Phase == v1alpha1.AfterSteady {
+ if chaos.Status.Phase == "" || chaos.Status.Phase == v1alpha1.BeforeSteady || chaos.Status.Phase == v1alpha1.AfterSteady {
return nil
}
@@ -336,221 +434,12 @@ func (r *ShardingSphereChaosReconciler) reconcileConfigMap(ctx context.Context,
return nil
}
-func (r *ShardingSphereChaosReconciler) reconcileJob(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
- var jobType reconcile.JobType
-
- switch chaos.Status.Phase {
- case v1alpha1.BeforeChaos, v1alpha1.AfterChaos:
- jobType = reconcile.InChaos
- case v1alpha1.BeforeSteady, v1alpha1.AfterSteady:
- jobType = reconcile.InSteady
- default:
- jobType = reconcile.InSteady
- }
-
- namespaceName := types.NamespacedName{
- Namespace: chaos.Namespace,
- Name: reconcile.MakeJobName(chaos.Name, jobType),
- }
-
- job, err := r.getJobByNamespacedName(ctx, namespaceName)
- if err != nil {
- return err
- }
-
- if job != nil {
- return r.updateJob(ctx, jobType, chaos, job)
- }
-
- return r.createJob(ctx, jobType, chaos)
-}
-
-func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
- namespacedName := types.NamespacedName{
- Name: chaos.Name,
- Namespace: chaos.Namespace,
- }
- chaos, err := r.getRuntimeChaos(ctx, namespacedName)
- if err != nil {
- if apierrors.IsNotFound(err) {
- return nil
- }
- return err
- }
- setDefaultStatus(chaos)
-
- req := getJobType(chaos)
- job, err := r.getJobByNamespacedName(ctx, types.NamespacedName{Namespace: chaos.Namespace, Name: reconcile.MakeJobName(chaos.Name, req)})
- if err != nil || job == nil {
- return err
- }
-
- updatePhase(chaos, job)
-
- condition := getJobCondition(job.Status.Conditions)
- if condition == FailureJob {
- r.Events.Event(chaos, "Warning", "failed", fmt.Sprintf("job: %s", job.Name))
- }
-
- //update result,when one part job finished,rely on current status
- if chaos.Status.Phase == v1alpha1.AfterSteady || chaos.Status.Phase == v1alpha1.AfterChaos {
- if err := r.collectJobMsg(ctx, chaos, job); err != nil {
- r.Events.Event(chaos, "Warning", "getPodLog", err.Error())
- return err
- }
- }
-
- if err := r.updateChaosCondition(ctx, chaos); err != nil {
- return err
- }
-
- rt, err := r.getRuntimeChaos(ctx, namespacedName)
- if err != nil {
- return err
- }
- rt.Status = chaos.Status
-
- return r.Status().Update(ctx, rt)
-}
-
-func updatePhase(chaos *v1alpha1.ShardingSphereChaos, job *batchV1.Job) {
- switch {
- //in this phase,update to next,and collect job msg
- case chaos.Status.Phase == v1alpha1.BeforeSteady && job.Status.Succeeded == 1:
- chaos.Status.Phase = v1alpha1.AfterSteady
- //update in next reconcile,wait steady msg collection
- case chaos.Status.Phase == v1alpha1.AfterSteady && chaos.Status.Result.Steady.Result != "":
- chaos.Status.Phase = v1alpha1.BeforeChaos
- case chaos.Status.Phase == v1alpha1.BeforeChaos && job.Status.Succeeded == 1:
- chaos.Status.Phase = v1alpha1.AfterChaos
- }
-
-}
-
func setDefaultStatus(chaos *v1alpha1.ShardingSphereChaos) {
if chaos.Status.Phase == "" {
chaos.Status.Phase = v1alpha1.BeforeSteady
}
}
-// getJobType to get the coming job requirement
-// * BeforeSteady: it hasn't been started, could start a new experiment
-// * AfterSteady: it has been finished, could start a new experiment
-// * InjectChaos: it has been started, could start some pressure
-// * recoveredChaos: it has been recovered, could start to verify
-func getJobType(ssChaos *v1alpha1.ShardingSphereChaos) reconcile.JobType {
- var jobType reconcile.JobType
-
- if ssChaos.Status.Phase == v1alpha1.BeforeSteady || ssChaos.Status.Phase == v1alpha1.AfterSteady {
- jobType = reconcile.InSteady
- }
-
- if ssChaos.Status.Phase == v1alpha1.BeforeChaos || ssChaos.Status.Phase == v1alpha1.AfterChaos {
- jobType = reconcile.InChaos
- }
-
- return jobType
-}
-
-func (r *ShardingSphereChaosReconciler) getJobByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*batchV1.Job, error) {
- job, err := r.Job.GetByNamespacedName(ctx, namespacedName)
- if err != nil {
- return nil, err
- }
- return job, nil
-}
-
-func getJobCondition(conditions []batchV1.JobCondition) JobCondition {
- var ret = ActiveJob
- for i := range conditions {
- p := &conditions[i]
- switch {
- case p.Type == batchV1.JobComplete && p.Status == corev1.ConditionTrue:
- ret = CompleteJob
- case p.Type == batchV1.JobFailed && p.Status == corev1.ConditionTrue:
- ret = FailureJob
- case p.Type == batchV1.JobSuspended && p.Status == corev1.ConditionTrue:
- ret = SuspendJob
- case p.Type == batchV1.JobFailureTarget:
- ret = FailureJob
- }
-
- }
- return ret
-}
-
-func (r *ShardingSphereChaosReconciler) collectJobMsg(ctx context.Context, ssChaos *v1alpha1.ShardingSphereChaos, rJob *batchV1.Job) error {
- if isExistResult(ssChaos) {
- return nil
- }
-
- condition := getJobCondition(rJob.Status.Conditions)
- var result *v1alpha1.Msg
- if ssChaos.Status.Phase == v1alpha1.AfterSteady {
- result = &ssChaos.Status.Result.Steady
- } else if ssChaos.Status.Phase == v1alpha1.AfterChaos {
- result = &ssChaos.Status.Result.Chaos
- }
-
- if condition == CompleteJob {
- log, err := r.getPodLog(ctx, rJob)
- if err != nil {
- return err
- }
- //todo: unpack msg with json
- //if err := json.Unmarshal(log, result); err != nil {
- // return err
- //}
- result.Result = string(log)
- }
-
- if condition == FailureJob {
- log, err := r.getPodLog(ctx, rJob)
- if err != nil {
- return err
- }
- result.FailureDetails = string(log)
- }
-
- return nil
-}
-
-func isExistResult(ssChaos *v1alpha1.ShardingSphereChaos) bool {
- if ssChaos.Status.Phase == v1alpha1.AfterSteady && ssChaos.Status.Result.Steady.Result == "" {
- return true
- }
- if ssChaos.Status.Phase == v1alpha1.AfterChaos && ssChaos.Status.Result.Chaos.Result == "" {
- return true
- }
-
- return false
-}
-
-// FIXME: this will broke if the log is too long
-func (r *ShardingSphereChaosReconciler) getPodLog(ctx context.Context, rJob *batchV1.Job) ([]byte, error) {
- pods := &corev1.PodList{}
-
- if err := r.List(ctx, pods, client.MatchingLabels{"controller-uid": rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
- return nil, err
- }
- if pods.Items == nil || len(pods.Items) == 0 {
- return nil, nil
- }
-
- pod := &pods.Items[0]
-
- req := r.ClientSet.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{})
- res := req.Do(ctx)
- if res.Error() != nil {
- return []byte{}, res.Error()
- }
- ret, err := res.Raw()
- if err != nil {
- return []byte{}, err
- }
- return ret, nil
-}
-
func (r *ShardingSphereChaosReconciler) updateChaosCondition(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
namespacedName := types.NamespacedName{
Namespace: chaos.Namespace,
@@ -617,96 +506,6 @@ func (r *ShardingSphereChaosReconciler) createConfigMap(ctx context.Context, cha
return err
}
-// TODO: consider a new job name pattern
-func (r *ShardingSphereChaosReconciler) updateJob(ctx context.Context, requirement reconcile.JobType, chao *v1alpha1.ShardingSphereChaos, cur *batchV1.Job) error {
- isEqual, err := reconcile.IsJobChanged(chao, requirement, cur)
- if err != nil {
- return err
- }
- if !isEqual {
- if err := r.Delete(ctx, cur); err != nil && !apierrors.IsNotFound(err) {
- return err
- }
- }
- return nil
-}
-
-func (r *ShardingSphereChaosReconciler) createJob(ctx context.Context, requirement reconcile.JobType, chao *v1alpha1.ShardingSphereChaos) error {
- injectJob, err := reconcile.NewJob(chao, requirement)
- if err != nil {
- return err
- }
- if err := ctrl.SetControllerReference(chao, injectJob, r.Scheme); err != nil {
- return err
- }
-
- err = r.Create(ctx, injectJob)
- if err != nil {
- return client.IgnoreAlreadyExists(err)
- }
- //FIXME: consider remove the following L620-L676, perhaps don't need to pay much attention to the pod
- rJob := &batchV1.Job{}
- backoff := wait.Backoff{
- Steps: 6,
- Duration: 500 * time.Millisecond,
- Factor: 5.0,
- Jitter: 0.1,
- }
-
- if err := retry.OnError(
- backoff,
- func(e error) bool {
- return true
- },
- func() error {
- return r.Get(ctx, types.NamespacedName{Namespace: chao.Namespace, Name: reconcile.MakeJobName(chao.Name, requirement)}, rJob)
- },
- ); err != nil {
- return err
- }
-
- podList := &corev1.PodList{}
- if err := retry.OnError(backoff, func(e error) bool {
- return e != nil
- }, func() error {
- if err := r.List(ctx, podList, client.MatchingLabels{"controller-uid": rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
- return err
- }
- if len(podList.Items) == 0 {
- return ErrNoPod
- }
- return nil
- }); err != nil {
- return err
- }
-
- for i := range podList.Items {
- rPod := &podList.Items[i]
- if err := ctrl.SetControllerReference(rJob, rPod, r.Scheme); err != nil {
- return err
- }
-
- exp := rPod.DeepCopy()
- updateBackoff := wait.Backoff{
- Steps: 5,
- Duration: 30 * time.Millisecond,
- Factor: 5.0,
- Jitter: 0.1,
- }
- if err := retry.RetryOnConflict(updateBackoff, func() error {
- if err := r.Update(ctx, exp); err != nil {
- return err
- }
- return nil
- }); err != nil {
- return err
- }
- }
-
- r.Events.Event(chao, "Normal", "Created", fmt.Sprintf("%s job created", requirement))
- return nil
-}
-
// SetupWithManager sets up the controller with the Manager.
func (r *ShardingSphereChaosReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
diff --git a/shardingsphere-operator/pkg/pressure/pressure.go b/shardingsphere-operator/pkg/pressure/pressure.go
new file mode 100644
index 0000000..061bc3b
--- /dev/null
+++ b/shardingsphere-operator/pkg/pressure/pressure.go
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pressure
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/database-mesh/golang-sdk/pkg/random"
+
+ "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+ _ "github.com/go-sql-driver/mysql"
+)
+
+type Pressure struct {
+ Active bool
+ Name string
+ Result Result
+ Err error
+ Tasks []v1alpha1.DistSQL
+ finishSignalCh chan struct{}
+ wg sync.WaitGroup
+}
+
+var (
+ db *sql.DB
+ totalReq int
+)
+
+type Result struct {
+ //total exec req Number
+ Total int
+ //total success req Number
+ Success int
+ //todo: get total or get every exec
+
+ //total time in this Pressure execution
+ Duration time.Duration
+}
+
+func NewPressure(name string, tasks []v1alpha1.DistSQL) *Pressure {
+ return &Pressure{
+ Active: false,
+ Name: name,
+ Result: Result{},
+ Err: nil,
+ Tasks: tasks,
+ wg: sync.WaitGroup{},
+ finishSignalCh: make(chan struct{}),
+ }
+}
+
+// todo: get conn args by labels over string
+func initDB(connArgs string) error {
+ var err error
+ db, err = sql.Open("mysql", connArgs)
+ if err != nil {
+ return err
+ }
+ if err := db.Ping(); err != nil {
+ return err
+ }
+ db.SetConnMaxLifetime(60 * time.Second)
+ return nil
+}
+
+func (p *Pressure) Run(ctx context.Context, pressureCfg *v1alpha1.PressureCfg) {
+ p.Active = true
+ totalReq = 0
+
+ //judge nil for simplify test
+ if db == nil {
+ if err := initDB(pressureCfg.SsHost); err != nil {
+ p.Err = err
+ return
+ }
+ defer func() {
+ if err := db.Close(); err != nil {
+ p.Err = err
+ }
+ }()
+ }
+
+ result := &p.Result
+ pressureCtx, cancel := context.WithTimeout(context.Background(), pressureCfg.Duration.Duration)
+ defer cancel()
+ ticker := time.NewTicker(pressureCfg.ReqTime.Duration)
+ resCh := make(chan bool, 1000)
+
+ //handle result
+ go p.handleResponse(pressureCtx, resCh, result)
+
+ //statistics the running time
+ start := time.Now()
+FOR:
+ for {
+ select {
+ case <-ctx.Done():
+ break FOR
+ case <-pressureCtx.Done():
+ break FOR
+ case <-ticker.C:
+ for i := 0; i < pressureCfg.ConcurrentNum; i++ {
+ totalReq += pressureCfg.ReqNum
+ //todo: handle err
+
+ //put wg here to prevent: when root ctx is closed,but some exec task do not start yet
+ p.wg.Add(1)
+ go p.exec(pressureCtx, pressureCfg.ReqNum, resCh)
+ }
+ }
+ }
+
+ //occur when pressureCtx or root ctx closed
+
+ //wait all exec calls return,we can safely close the result channel
+ p.wg.Wait()
+ end := time.Now()
+ p.Result.Duration = end.Sub(start)
+ close(resCh)
+
+ //wait collect results channel finished
+ <-p.finishSignalCh
+
+ //when all task finished,update active
+ p.Active = false
+}
+
+func (p *Pressure) exec(ctx context.Context, times int, res chan bool) {
+ defer p.wg.Done()
+ for i := 0; i < times; i++ {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ if len(p.Tasks) == 0 {
+ return
+ }
+ for i := range p.Tasks {
+ //generate diff sql, put result into channel
+ args := randomArgs(p.Tasks[i].Args)
+ _, err := db.Exec(p.Tasks[i].SQL, args)
+ res <- err == nil
+ }
+ }
+}
+
+func (p *Pressure) handleResponse(ctx context.Context, resCh chan bool, result *Result) {
+For:
+ for {
+ select {
+ case <-ctx.Done():
+ break For
+ case ret := <-resCh:
+ //todo: add more msg
+ handle(ret, result)
+ }
+ }
+
+ //get left handleResponse
+ for ret := range resCh {
+ handle(ret, result)
+ }
+
+ //when all handle finish,put a signal to finish chan
+ p.finishSignalCh <- struct{}{}
+}
+
+//todo:add more logic and change ret type(bool ---> struct)
+func handle(ret bool, result *Result) {
+ if ret {
+ result.Success++
+ }
+ result.Total++
+}
+
+func randomArgs(args []string) []string {
+ var ret []string
+ for i := range args {
+ randomArg := fmt.Sprintf("%s-%s", args[i], random.StringN(4))
+ ret = append(ret, randomArg)
+ }
+ return ret
+}
diff --git a/shardingsphere-operator/pkg/pressure/pressure_test.go b/shardingsphere-operator/pkg/pressure/pressure_test.go
new file mode 100644
index 0000000..ac2b2f3
--- /dev/null
+++ b/shardingsphere-operator/pkg/pressure/pressure_test.go
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pressure
+
+import (
+ "bou.ke/monkey"
+ "context"
+ "database/sql"
+ "github.com/DATA-DOG/go-sqlmock"
+ "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "regexp"
+ "testing"
+ "time"
+)
+
+func TestPressure(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Controllers Suite")
+}
+
+var _ = Describe("test pressure", func() {
+ var (
+ dbmock sqlmock.Sqlmock
+ err error
+ )
+ BeforeEach(func() {
+ db, dbmock, err = sqlmock.New()
+ Expect(err).To(BeNil())
+ Expect(db).NotTo(BeNil())
+ Expect(dbmock).NotTo(BeNil())
+
+ monkey.Patch(sql.Open, func(driverName, dataSourceName string) (*sql.DB, error) {
+ return db, nil
+ })
+ })
+
+ AfterEach(func() {
+ monkey.Unpatch(sql.Open)
+ db.Close()
+ })
+
+ Context("test Run function", func() {
+ It("should Run successfully", func() {
+ registerStorageUnitCase := &v1alpha1.PressureCfg{
+ ZkHost: "",
+ SsHost: "test",
+ Duration: metav1.Duration{Duration: 20 * time.Second},
+ ReqTime: metav1.Duration{Duration: 5 * time.Second},
+ DistSQLs: []v1alpha1.DistSQL{
+ {
+ SQL: "REGISTER STORAGE UNIT ?",
+ Args: []string{
+ "**",
+ },
+ },
+ },
+ ConcurrentNum: 2,
+ ReqNum: 5,
+ }
+
+ dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
+ pressure := NewPressure("verify", registerStorageUnitCase.DistSQLs)
+ pressure.Run(context.TODO(), registerStorageUnitCase)
+
+ Expect(pressure.Result.Total > 0).To(BeTrue())
+ Expect(pressure.Result.Success >= 0).To(BeTrue())
+ Expect(pressure.Result.Total >= pressure.Result.Success).To(BeTrue())
+ Expect(pressure.Result.Duration.Milliseconds() >= registerStorageUnitCase.Duration.Milliseconds()).To(BeTrue())
+ Expect(pressure.Active).To(BeFalse())
+ })
+ })
+
+})