You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/05/09 04:05:01 UTC
[shardingsphere-on-cloud] branch main updated: refactor(operator): update ssChaos.Status and reconcile logic
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 5bf253c refactor(operator): update ssChaos.Status and reconcile logic
new b84e23e Merge pull request #347 from moomman/refactor-verify
5bf253c is described below
commit 5bf253c4b8abe32d803136bf227ac285fd608390
Author: moonman <ag...@163.com>
AuthorDate: Mon May 8 12:26:30 2023 +0800
refactor(operator): update ssChaos.Status and reconcile logic
---
.../api/v1alpha1/shardingsphere_chaos_types.go | 25 ++-
.../api/v1alpha1/zz_generated.deepcopy.go | 44 ++--
.../controllers/shardingsphere_chaos_controller.go | 242 +++++++++------------
.../pkg/reconcile/shardingspherechaos/job.go | 22 +-
.../shardingsphere_chaos_suite_test.go | 3 +-
.../shardingsphere_chaos_test.go | 19 +-
6 files changed, 151 insertions(+), 204 deletions(-)
diff --git a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
index 4470a4e..e1c5e90 100644
--- a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
+++ b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
@@ -85,28 +85,31 @@ const (
type ShardingSphereChaosStatus struct {
ChaosCondition ChaosCondition `json:"chaosCondition"`
Phase ChaosPhase `json:"phase"`
- Results []Result `json:"results"`
+ Result Result `json:"result"`
}
// Result represents the result of the ShardingSphereChaos
type Result struct {
- Success bool `json:"success"`
- Detail Detail `json:"detail"`
+ Steady Msg `json:"steady"`
+ Chaos Msg `json:"chaos"`
}
-type Detail struct {
- Time metav1.Time `json:"time"`
- Message string `json:"message"`
+type Metrics string
+
+type Msg struct {
+ Metrics Metrics `json:"metrics"`
+ Result string `json:"result"`
+ Duration string `json:"duration"`
+ FailureDetails string `json:"failureDetails"`
}
type ChaosPhase string
var (
- BeforeExperiment ChaosPhase = "BeforeReq"
- AfterExperiment ChaosPhase = "AfterReq"
- CreatedChaos ChaosPhase = "Created"
- InjectedChaos ChaosPhase = "Injected"
- RecoveredChaos ChaosPhase = "Recovered"
+ BeforeSteady ChaosPhase = "BeforeSteady"
+ AfterSteady ChaosPhase = "AfterSteady"
+ BeforeChaos ChaosPhase = "BeforeChaos"
+ AfterChaos ChaosPhase = "AfterChaos"
)
// PodChaosAction Specify the action type of pod Chaos
diff --git a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
index 7622078..466979d 100644
--- a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
+++ b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
@@ -550,22 +550,6 @@ func (in *DelayParams) DeepCopy() *DelayParams {
return out
}
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
-func (in *Detail) DeepCopyInto(out *Detail) {
- *out = *in
- in.Time.DeepCopyInto(&out.Time)
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Detail.
-func (in *Detail) DeepCopy() *Detail {
- if in == nil {
- return nil
- }
- out := new(Detail)
- in.DeepCopyInto(out)
- return out
-}
-
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DuplicationParams) DeepCopyInto(out *DuplicationParams) {
*out = *in
@@ -733,6 +717,21 @@ func (in *LossParams) DeepCopy() *LossParams {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Msg) DeepCopyInto(out *Msg) {
+ *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Msg.
+func (in *Msg) DeepCopy() *Msg {
+ if in == nil {
+ return nil
+ }
+ out := new(Msg)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MySQLDriver) DeepCopyInto(out *MySQLDriver) {
*out = *in
@@ -1298,7 +1297,8 @@ func (in *RepositoryConfig) DeepCopy() *RepositoryConfig {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Result) DeepCopyInto(out *Result) {
*out = *in
- in.Detail.DeepCopyInto(&out.Detail)
+ out.Steady = in.Steady
+ out.Chaos = in.Chaos
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Result.
@@ -1378,7 +1378,7 @@ func (in *ShardingSphereChaos) DeepCopyInto(out *ShardingSphereChaos) {
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
- in.Status.DeepCopyInto(&out.Status)
+ out.Status = in.Status
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardingSphereChaos.
@@ -1452,13 +1452,7 @@ func (in *ShardingSphereChaosSpec) DeepCopy() *ShardingSphereChaosSpec {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ShardingSphereChaosStatus) DeepCopyInto(out *ShardingSphereChaosStatus) {
*out = *in
- if in.Results != nil {
- in, out := &in.Results, &out.Results
- *out = make([]Result, len(*in))
- for i := range *in {
- (*in)[i].DeepCopyInto(&(*out)[i])
- }
- }
+ out.Result = in.Result
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardingSphereChaosStatus.
diff --git a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
index 43b0286..c3d6040 100644
--- a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
+++ b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
@@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
- "strings"
"time"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
@@ -36,7 +35,6 @@ import (
batchV1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
@@ -49,7 +47,6 @@ import (
const (
ShardingSphereChaosControllerName = "shardingsphere-chaos-controller"
- VerifyJobCheck = "Verify"
SSChaosFinalizeName = "shardingsphere.apache.org/finalizer"
)
@@ -100,26 +97,40 @@ func (r *ShardingSphereChaosReconciler) Reconcile(ctx context.Context, req ctrl.
logger.Info("start reconcile chaos")
//TODO: consider merge these events
+ var errors []error
if err := r.reconcileChaos(ctx, ssChaos); err != nil {
+ if err != nil {
+ errors = append(errors, err)
+ }
logger.Error(err, "reconcile shardingspherechaos error")
r.Events.Event(ssChaos, "Warning", "shardingspherechaos error", err.Error())
}
if err := r.reconcileConfigMap(ctx, ssChaos); err != nil {
+ if err != nil {
+ errors = append(errors, err)
+ }
logger.Error(err, "reconcile configmap error")
r.Events.Event(ssChaos, "Warning", "configmap error", err.Error())
}
if err := r.reconcileJob(ctx, ssChaos); err != nil {
+ if err != nil {
+ errors = append(errors, err)
+ }
logger.Error(err, "reconcile job error")
r.Events.Event(ssChaos, "Warning", "job error", err.Error())
}
if err := r.reconcileStatus(ctx, ssChaos); err != nil {
+ if err != nil {
+ errors = append(errors, err)
+ }
logger.Error(err, "failed to update status")
- r.Events.Event(ssChaos, "Warning", "update status error", err.Error())
}
-
+ if len(errors) > 0 {
+ return ctrl.Result{Requeue: true}, err
+ }
return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
}
@@ -204,7 +215,7 @@ func (r *ShardingSphereChaosReconciler) deleteNetworkChaos(ctx context.Context,
func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
logger := r.Log.WithValues("reconcile shardingspherechaos", fmt.Sprintf("%s/%s", chaos.Namespace, chaos.Name))
- if len(chaos.Status.Phase) == 0 || chaos.Status.Phase == v1alpha1.BeforeExperiment {
+ if chaos.Status.Phase == v1alpha1.BeforeSteady || chaos.Status.Phase == v1alpha1.AfterSteady {
return nil
}
@@ -326,22 +337,20 @@ func (r *ShardingSphereChaosReconciler) reconcileConfigMap(ctx context.Context,
}
func (r *ShardingSphereChaosReconciler) reconcileJob(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
- var nowInjectRequirement reconcile.InjectRequirement
+ var jobType reconcile.JobType
switch chaos.Status.Phase {
- case v1alpha1.InjectedChaos:
- nowInjectRequirement = reconcile.Pressure
- case v1alpha1.RecoveredChaos:
- nowInjectRequirement = reconcile.Verify
- case v1alpha1.BeforeExperiment, v1alpha1.AfterExperiment:
- fallthrough
+ case v1alpha1.BeforeChaos, v1alpha1.AfterChaos:
+ jobType = reconcile.InChaos
+ case v1alpha1.BeforeSteady, v1alpha1.AfterSteady:
+ jobType = reconcile.InSteady
default:
- nowInjectRequirement = reconcile.Experimental
+ jobType = reconcile.InSteady
}
namespaceName := types.NamespacedName{
Namespace: chaos.Namespace,
- Name: reconcile.MakeJobName(chaos.Name, nowInjectRequirement),
+ Name: reconcile.MakeJobName(chaos.Name, jobType),
}
job, err := r.getJobByNamespacedName(ctx, namespaceName)
@@ -350,10 +359,10 @@ func (r *ShardingSphereChaosReconciler) reconcileJob(ctx context.Context, chaos
}
if job != nil {
- return r.updateJob(ctx, nowInjectRequirement, chaos, job)
+ return r.updateJob(ctx, jobType, chaos, job)
}
- return r.createJob(ctx, nowInjectRequirement, chaos)
+ return r.createJob(ctx, jobType, chaos)
}
func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
@@ -368,31 +377,30 @@ func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, cha
}
return err
}
- r.setDefaultStatus(chaos)
+ setDefaultStatus(chaos)
- req := getInjectRequirement(chaos)
+ req := getJobType(chaos)
job, err := r.getJobByNamespacedName(ctx, types.NamespacedName{Namespace: chaos.Namespace, Name: reconcile.MakeJobName(chaos.Name, req)})
if err != nil || job == nil {
return err
}
- if chaos.Status.Phase == v1alpha1.BeforeExperiment && job.Status.Succeeded == 1 {
- chaos.Status.Phase = v1alpha1.AfterExperiment
- }
+ updatePhase(chaos, job)
condition := getJobCondition(job.Status.Conditions)
if condition == FailureJob {
r.Events.Event(chaos, "Warning", "failed", fmt.Sprintf("job: %s", job.Name))
}
- if chaos.Status.Phase == v1alpha1.RecoveredChaos {
- if err := r.updateRecoveredJob(ctx, chaos, job); err != nil {
+ //update result,when one part job finished,rely on current status
+ if chaos.Status.Phase == v1alpha1.AfterSteady || chaos.Status.Phase == v1alpha1.AfterChaos {
+ if err := r.collectJobMsg(ctx, chaos, job); err != nil {
r.Events.Event(chaos, "Warning", "getPodLog", err.Error())
return err
}
}
- if err := r.updatePhaseStart(ctx, chaos); err != nil {
+ if err := r.updateChaosCondition(ctx, chaos); err != nil {
return err
}
@@ -405,36 +413,43 @@ func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, cha
return r.Status().Update(ctx, rt)
}
-func (r *ShardingSphereChaosReconciler) setDefaultStatus(chaos *v1alpha1.ShardingSphereChaos) {
- if chaos.Status.Phase == "" {
- chaos.Status.Phase = v1alpha1.BeforeExperiment
+func updatePhase(chaos *v1alpha1.ShardingSphereChaos, job *batchV1.Job) {
+ switch {
+ //in this phase,update to next,and collect job msg
+ case chaos.Status.Phase == v1alpha1.BeforeSteady && job.Status.Succeeded == 1:
+ chaos.Status.Phase = v1alpha1.AfterSteady
+ //update in next reconcile,wait steady msg collection
+ case chaos.Status.Phase == v1alpha1.AfterSteady && chaos.Status.Result.Steady.Result != "":
+ chaos.Status.Phase = v1alpha1.BeforeChaos
+ case chaos.Status.Phase == v1alpha1.BeforeChaos && job.Status.Succeeded == 1:
+ chaos.Status.Phase = v1alpha1.AfterChaos
}
- if chaos.Status.Results == nil {
- chaos.Status.Results = []v1alpha1.Result{}
+
+}
+
+func setDefaultStatus(chaos *v1alpha1.ShardingSphereChaos) {
+ if chaos.Status.Phase == "" {
+ chaos.Status.Phase = v1alpha1.BeforeSteady
}
}
-// getInjectRequirement to get the coming job requirement
-// * BeforeExperiment: it hasn't been started, could start a new experiment
-// * AfterExperiment: it has been finished, could start a new experiment
+// getJobType to get the coming job requirement
+// * BeforeSteady: it hasn't been started, could start a new experiment
+// * AfterSteady: it has been finished, could start a new experiment
// * InjectChaos: it has been started, could start some pressure
// * recoveredChaos: it has been recovered, could start to verify
-func getInjectRequirement(ssChaos *v1alpha1.ShardingSphereChaos) reconcile.InjectRequirement {
- var jobName reconcile.InjectRequirement
-
- if ssChaos.Status.Phase == v1alpha1.BeforeExperiment || ssChaos.Status.Phase == v1alpha1.AfterExperiment {
- jobName = reconcile.Experimental
- }
+func getJobType(ssChaos *v1alpha1.ShardingSphereChaos) reconcile.JobType {
+ var jobType reconcile.JobType
- if ssChaos.Status.Phase == v1alpha1.InjectedChaos {
- jobName = reconcile.Pressure
+ if ssChaos.Status.Phase == v1alpha1.BeforeSteady || ssChaos.Status.Phase == v1alpha1.AfterSteady {
+ jobType = reconcile.InSteady
}
- if ssChaos.Status.Phase == v1alpha1.RecoveredChaos {
- jobName = reconcile.Verify
+ if ssChaos.Status.Phase == v1alpha1.BeforeChaos || ssChaos.Status.Phase == v1alpha1.AfterChaos {
+ jobType = reconcile.InChaos
}
- return jobName
+ return jobType
}
func (r *ShardingSphereChaosReconciler) getJobByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*batchV1.Job, error) {
@@ -464,138 +479,76 @@ func getJobCondition(conditions []batchV1.JobCondition) JobCondition {
return ret
}
-func isRecoveredJobType(rJob *batchV1.Job, requirement reconcile.InjectRequirement) bool {
- for i := range rJob.Spec.Template.Spec.Containers[0].Args {
- r := rJob.Spec.Template.Spec.Containers[0].Args[i]
- if strings.Contains(r, string(requirement)) {
- return true
- }
- }
- return false
-}
-
-func (r *ShardingSphereChaosReconciler) updateRecoveredJob(ctx context.Context, ssChaos *v1alpha1.ShardingSphereChaos, rJob *batchV1.Job) error {
- if !isRecoveredJobType(rJob, reconcile.Verify) {
+func (r *ShardingSphereChaosReconciler) collectJobMsg(ctx context.Context, ssChaos *v1alpha1.ShardingSphereChaos, rJob *batchV1.Job) error {
+ if isExistResult(ssChaos) {
return nil
}
- for i := range ssChaos.Status.Results {
- if strings.HasPrefix(ssChaos.Status.Results[i].Detail.Message, VerifyJobCheck) {
- return nil
- }
- }
-
- logOpts := &corev1.PodLogOptions{}
- pod, err := r.getPodHaveLog(ctx, rJob)
- if err != nil || pod == nil {
- return err
- }
- podNamespacedName := types.NamespacedName{
- Namespace: pod.Namespace,
- Name: pod.Name,
- }
condition := getJobCondition(rJob.Status.Conditions)
- result := &v1alpha1.Result{}
+ var result *v1alpha1.Msg
+ if ssChaos.Status.Phase == v1alpha1.AfterSteady {
+ result = &ssChaos.Status.Result.Steady
+ } else if ssChaos.Status.Phase == v1alpha1.AfterChaos {
+ result = &ssChaos.Status.Result.Chaos
+ }
if condition == CompleteJob {
- log, err := r.getPodLog(ctx, podNamespacedName, logOpts)
+ log, err := r.getPodLog(ctx, rJob)
if err != nil {
return err
}
- if ssChaos.Spec.Expect.Verify == "" || ssChaos.Spec.Expect.Verify == log {
- result.Success = true
- result.Detail = v1alpha1.Detail{
- Time: metav1.Time{Time: time.Now()},
- Message: fmt.Sprintf("%s: job succeeded", VerifyJobCheck),
- }
- } else {
- result.Success = false
- result.Detail = v1alpha1.Detail{
- Time: metav1.Time{Time: time.Now()},
- Message: fmt.Sprintf("%s: %s", VerifyJobCheck, log),
- }
- }
- ssChaos.Status.Results = updateResult(ssChaos.Status.Results, *result, VerifyJobCheck)
+ //todo: unpack msg with json
+ //if err := json.Unmarshal(log, result); err != nil {
+ // return err
+ //}
+ result.Result = string(log)
}
if condition == FailureJob {
- log, err := r.getPodLog(ctx, podNamespacedName, logOpts)
+ log, err := r.getPodLog(ctx, rJob)
if err != nil {
return err
}
- result.Success = false
- result.Detail = v1alpha1.Detail{
- Time: metav1.Time{Time: time.Now()},
- Message: fmt.Sprintf("%s: %s", VerifyJobCheck, log),
- }
- ssChaos.Status.Results = updateResult(ssChaos.Status.Results, *result, VerifyJobCheck)
+ result.FailureDetails = string(log)
}
return nil
}
-func (r *ShardingSphereChaosReconciler) getPodHaveLog(ctx context.Context, rJob *batchV1.Job) (*corev1.Pod, error) {
+func isExistResult(ssChaos *v1alpha1.ShardingSphereChaos) bool {
+ if ssChaos.Status.Phase == v1alpha1.AfterSteady && ssChaos.Status.Result.Steady.Result == "" {
+ return true
+ }
+ if ssChaos.Status.Phase == v1alpha1.AfterChaos && ssChaos.Status.Result.Chaos.Result == "" {
+ return true
+ }
+
+ return false
+}
+
+// FIXME: this will broke if the log is too long
+func (r *ShardingSphereChaosReconciler) getPodLog(ctx context.Context, rJob *batchV1.Job) ([]byte, error) {
pods := &corev1.PodList{}
if err := r.List(ctx, pods, client.MatchingLabels{"controller-uid": rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
return nil, err
}
- if pods.Items == nil {
+ if pods.Items == nil || len(pods.Items) == 0 {
return nil, nil
}
- //FIXME: get the first pod
- var pod *corev1.Pod
- for i := range pods.Items {
- pod = &pods.Items[i]
- break
- }
- return pod, nil
-}
-// FIXME: this will broke when the job count is more than one
-func updateResult(results []v1alpha1.Result, r v1alpha1.Result, check string) []v1alpha1.Result {
- for i := range results {
- msg := results[i].Detail.Message
- if strings.HasPrefix(msg, check) && strings.HasPrefix(r.Detail.Message, check) {
- results[i] = r
- return results
- }
- }
- results = append(results, r)
- return results
-}
+ pod := &pods.Items[0]
-// FIXME: this will broke if the log is too long
-func (r *ShardingSphereChaosReconciler) getPodLog(ctx context.Context, namespacedName types.NamespacedName, options *corev1.PodLogOptions) (string, error) {
- req := r.ClientSet.CoreV1().Pods(namespacedName.Namespace).GetLogs(namespacedName.Name, options)
+ req := r.ClientSet.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{})
res := req.Do(ctx)
if res.Error() != nil {
- return "", res.Error()
+ return []byte{}, res.Error()
}
- var ret []byte
ret, err := res.Raw()
if err != nil {
- return "", err
+ return []byte{}, err
}
- return string(ret), nil
-}
-
-func (r *ShardingSphereChaosReconciler) updatePhaseStart(ctx context.Context, ssChaos *v1alpha1.ShardingSphereChaos) error {
- if ssChaos.Status.Phase != v1alpha1.BeforeExperiment {
- if err := r.updateChaosCondition(ctx, ssChaos); err != nil {
- return err
- }
-
- if ssChaos.Status.ChaosCondition == v1alpha1.AllInjected && ssChaos.Status.Phase == v1alpha1.AfterExperiment {
- ssChaos.Status.Phase = v1alpha1.InjectedChaos
- }
-
- if ssChaos.Status.ChaosCondition == v1alpha1.AllRecovered && ssChaos.Status.Phase == v1alpha1.InjectedChaos {
- ssChaos.Status.Phase = v1alpha1.RecoveredChaos
- }
- }
-
- return nil
+ return ret, nil
}
func (r *ShardingSphereChaosReconciler) updateChaosCondition(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
@@ -652,6 +605,9 @@ func (r *ShardingSphereChaosReconciler) updateConfigMap(ctx context.Context, cha
func (r *ShardingSphereChaosReconciler) createConfigMap(ctx context.Context, chaos *v1alpha1.ShardingSphereChaos) error {
cm := r.ConfigMap.Build(ctx, chaos)
+ if err := ctrl.SetControllerReference(chaos, cm, r.Scheme); err != nil {
+ return err
+ }
err := r.Create(ctx, cm)
if err != nil && apierrors.IsAlreadyExists(err) {
@@ -662,7 +618,7 @@ func (r *ShardingSphereChaosReconciler) createConfigMap(ctx context.Context, cha
}
// TODO: consider a new job name pattern
-func (r *ShardingSphereChaosReconciler) updateJob(ctx context.Context, requirement reconcile.InjectRequirement, chao *v1alpha1.ShardingSphereChaos, cur *batchV1.Job) error {
+func (r *ShardingSphereChaosReconciler) updateJob(ctx context.Context, requirement reconcile.JobType, chao *v1alpha1.ShardingSphereChaos, cur *batchV1.Job) error {
isEqual, err := reconcile.IsJobChanged(chao, requirement, cur)
if err != nil {
return err
@@ -675,7 +631,7 @@ func (r *ShardingSphereChaosReconciler) updateJob(ctx context.Context, requireme
return nil
}
-func (r *ShardingSphereChaosReconciler) createJob(ctx context.Context, requirement reconcile.InjectRequirement, chao *v1alpha1.ShardingSphereChaos) error {
+func (r *ShardingSphereChaosReconciler) createJob(ctx context.Context, requirement reconcile.JobType, chao *v1alpha1.ShardingSphereChaos) error {
injectJob, err := reconcile.NewJob(chao, requirement)
if err != nil {
return err
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
index af42e91..855e0fe 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
@@ -51,20 +51,19 @@ const (
AnnoJobSuspend = "job.batch/suspend"
)
-type InjectRequirement string
+type JobType string
var (
//FIXME: pick another name for experimental
- Experimental InjectRequirement = "experimental"
- Pressure InjectRequirement = "pressure"
- Verify InjectRequirement = "verify"
+ InSteady JobType = "steady"
+ InChaos JobType = "chaos"
)
-func MakeJobName(name string, requirement InjectRequirement) string {
+func MakeJobName(name string, requirement JobType) string {
return fmt.Sprintf("%s-%s", name, string(requirement))
}
-func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement) (*v1.Job, error) {
+func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement JobType) (*v1.Job, error) {
jbd := NewJobBuilder()
jbd.SetNamespace(ssChaos.Namespace).SetLabels(ssChaos.Labels).SetName(MakeJobName(ssChaos.Name, requirement))
@@ -142,17 +141,14 @@ func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement
return rjob, nil
}
-func NewCmds(requirement InjectRequirement) []string {
+func NewCmds(requirement JobType) []string {
var cmds []string
- if requirement == Experimental {
+ if requirement == InSteady {
cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, configExperimental))
}
- if requirement == Pressure {
+ if requirement == InChaos {
cmds = append(cmds, fmt.Sprintf("%s/%s;%s/%s", DefaultWorkPath, configPressure, DefaultWorkPath, configExperimental))
}
- if requirement == Verify {
- cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, configVerify))
- }
return cmds
}
@@ -164,7 +160,7 @@ func MustInt32(s string) (int32, error) {
return int32(v), nil
}
-func IsJobChanged(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement, cur *v1.Job) (bool, error) {
+func IsJobChanged(ssChaos *v1alpha1.ShardingSphereChaos, requirement JobType, cur *v1.Job) (bool, error) {
now, err := NewJob(ssChaos, requirement)
if err != nil {
return false, err
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingsphere_chaos_suite_test.go b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingsphere_chaos_suite_test.go
index 94f2ab5..5a583cd 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingsphere_chaos_suite_test.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingsphere_chaos_suite_test.go
@@ -95,11 +95,12 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred())
clientset, err := clientset.NewForConfig(k8sManager.GetConfig())
Expect(err).ToNot(HaveOccurred())
+ mockChaosMesh := mockChaos.NewMockChaos(ctl)
err = (&controllers.ShardingSphereChaosReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Log: logf.Log,
- Chaos: mockChaos.NewMockChaos(ctl),
+ Chaos: mockChaosMesh,
Job: job.NewJob(k8sManager.GetClient()),
ConfigMap: configmap.NewConfigMapClient(k8sManager.GetClient()),
Events: k8sManager.GetEventRecorderFor("shardingsphere-chaos-controller"),
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingsphere_chaos_test.go b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingsphere_chaos_test.go
index 79dc378..62cf4d8 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingsphere_chaos_test.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingsphere_chaos_test.go
@@ -23,11 +23,8 @@ import (
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
- corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/types"
"math/rand"
- "time"
)
var _ = Describe("ShardingSphereChaos", func() {
@@ -78,14 +75,14 @@ var _ = Describe("ShardingSphereChaos", func() {
Expect(k8sClient.Delete(ctx, ssChaos)).To(BeNil())
})
- It("should create configmap", func() {
- configmap := &corev1.ConfigMap{}
- namespacedName := types.NamespacedName{Name: name, Namespace: namespace}
- Eventually(func() bool {
- err := k8sClient.Get(ctx, namespacedName, configmap)
- return err == nil
- }, time.Second*10, time.Millisecond*250).Should(BeTrue())
- })
+ //It("should create configmap", func() {
+ // configmap := &corev1.ConfigMap{}
+ // namespacedName := types.NamespacedName{Name: name, Namespace: namespace}
+ // Eventually(func() bool {
+ // err := k8sClient.Get(ctx, namespacedName, configmap)
+ // return err == nil
+ // }, time.Second*10, time.Millisecond*250).Should(BeTrue())
+ //})
})