You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/04/24 08:59:55 UTC
[shardingsphere-on-cloud] branch main updated: add reconcile logic and verfity Spec
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 0632954 add reconcile logic and verfity Spec
new ef582e2 Merge pull request #316 from moomman/main
0632954 is described below
commit 063295453bcc3f3c629f5e02c598bdcfba5a87ab
Author: moonman <ag...@163.com>
AuthorDate: Fri Apr 21 15:25:54 2023 +0800
add reconcile logic and verfity Spec
---
.../api/v1alpha1/shardingsphere_chaos_types.go | 20 ++
.../api/v1alpha1/zz_generated.deepcopy.go | 57 ++-
shardingsphere-operator/build/tools/Dockerfile | 2 +
.../cmd/shardingsphere-operator/manager/manager.go | 1 -
.../cmd/shardingsphere-operator/manager/option.go | 7 +
.../controllers/shardingsphere_chaos_controller.go | 391 +++++++++++++++++----
.../pkg/reconcile/shardingspherechaos/configmap.go | 9 +-
.../pkg/reconcile/shardingspherechaos/job.go | 47 ++-
.../shardingspherechaos_suite_test.go | 7 +
9 files changed, 460 insertions(+), 81 deletions(-)
diff --git a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
index 32c2126..50ac996 100644
--- a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
+++ b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
@@ -44,6 +44,11 @@ type ShardingSphereChaos struct {
type ShardingSphereChaosSpec struct {
InjectJob JobSpec `json:"injectJob,omitempty"`
EmbedChaos `json:",inline"`
+ Expect Expect `json:"expect,omitempty"`
+}
+
+type Expect struct {
+ Verify string `json:"verify,omitempty"`
}
// JobSpec Specifies the config of job to create
@@ -54,6 +59,8 @@ type JobSpec struct {
Pressure string `json:"pressure,omitempty"`
// +optional
Position string `json:"position,omitempty"`
+ // +optional
+ Verify string `json:"verify,omitempty"`
}
type EmbedChaos struct {
@@ -78,6 +85,18 @@ const (
type ShardingSphereChaosStatus struct {
ChaosCondition ChaosCondition `json:"chaosCondition"`
Phase Phase `json:"phase"`
+ Result []Result `json:"result"`
+}
+
+// Result represents the result of the ShardingSphereChaos
+type Result struct {
+ Success bool `json:"success"`
+ Detail Detail `json:"details"`
+}
+
+type Detail struct {
+ Time metav1.Time `json:"time"`
+ Msg string `json:"message"`
}
type Phase string
@@ -85,6 +104,7 @@ type Phase string
var (
PhaseBeforeExperiment Phase = "BeforeReq"
PhaseAfterExperiment Phase = "AfterReq"
+ PhaseCreatingChaos Phase = "Creating"
PhaseInChaos Phase = "Injected"
PhaseRecoveredChaos Phase = "Recovered"
)
diff --git a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
index 9f4063b..5f9622f 100644
--- a/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
+++ b/shardingsphere-operator/api/v1alpha1/zz_generated.deepcopy.go
@@ -491,6 +491,22 @@ func (in *DelayActionParams) DeepCopy() *DelayActionParams {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Detail) DeepCopyInto(out *Detail) {
+ *out = *in
+ in.Time.DeepCopyInto(&out.Time)
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Detail.
+func (in *Detail) DeepCopy() *Detail {
+ if in == nil {
+ return nil
+ }
+ out := new(Detail)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DuplicateActionParams) DeepCopyInto(out *DuplicateActionParams) {
*out = *in
@@ -531,6 +547,21 @@ func (in *EmbedChaos) DeepCopy() *EmbedChaos {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Expect) DeepCopyInto(out *Expect) {
+ *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Expect.
+func (in *Expect) DeepCopy() *Expect {
+ if in == nil {
+ return nil
+ }
+ out := new(Expect)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *JobSpec) DeepCopyInto(out *JobSpec) {
*out = *in
@@ -1158,6 +1189,22 @@ func (in *RepositoryConfig) DeepCopy() *RepositoryConfig {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Result) DeepCopyInto(out *Result) {
+ *out = *in
+ in.Detail.DeepCopyInto(&out.Detail)
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Result.
+func (in *Result) DeepCopy() *Result {
+ if in == nil {
+ return nil
+ }
+ out := new(Result)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ServerConfig) DeepCopyInto(out *ServerConfig) {
*out = *in
@@ -1225,7 +1272,7 @@ func (in *ShardingSphereChaos) DeepCopyInto(out *ShardingSphereChaos) {
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
- out.Status = in.Status
+ in.Status.DeepCopyInto(&out.Status)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardingSphereChaos.
@@ -1283,6 +1330,7 @@ func (in *ShardingSphereChaosSpec) DeepCopyInto(out *ShardingSphereChaosSpec) {
*out = *in
out.InjectJob = in.InjectJob
in.EmbedChaos.DeepCopyInto(&out.EmbedChaos)
+ out.Expect = in.Expect
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardingSphereChaosSpec.
@@ -1298,6 +1346,13 @@ func (in *ShardingSphereChaosSpec) DeepCopy() *ShardingSphereChaosSpec {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ShardingSphereChaosStatus) DeepCopyInto(out *ShardingSphereChaosStatus) {
*out = *in
+ if in.Result != nil {
+ in, out := &in.Result, &out.Result
+ *out = make([]Result, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardingSphereChaosStatus.
diff --git a/shardingsphere-operator/build/tools/Dockerfile b/shardingsphere-operator/build/tools/Dockerfile
index d73511b..fc3ad0f 100644
--- a/shardingsphere-operator/build/tools/Dockerfile
+++ b/shardingsphere-operator/build/tools/Dockerfile
@@ -33,6 +33,8 @@ RUN set -eux; \
apt-get install -y --no-install-recommends libncursesw6 libreadline8 libssh-4; \
apt-get install -y --no-install-recommends openjdk-11-jre-headless; \
echo "export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11-amd64" >> /etc/profile; \
+ echo "alias mysql=\"mysqlsh\"" >> /etc/profile; \
+ . /etc/profile; \
\
rm -rf /var/lib/apt/lists/*; \
\
diff --git a/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go b/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go
index 06166a9..05681f0 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/manager.go
@@ -49,7 +49,6 @@ func SetupWithOptions(opts *Options) *Manager {
logger.Error(err, "unable to start manager")
os.Exit(1)
}
-
if err = (&controllers.ProxyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
diff --git a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index 10ff9f5..e1e1fe5 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -21,6 +21,8 @@ import (
"flag"
"strings"
+ clientset "k8s.io/client-go/kubernetes"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/job"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/chaos"
@@ -139,6 +141,10 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
return nil
},
"ShardingSphereChaos": func(mgr manager.Manager) error {
+ clientset, err := clientset.NewForConfig(mgr.GetConfig())
+ if err != nil {
+ return err
+ }
if err := (&controllers.ShardingSphereChaosReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
@@ -147,6 +153,7 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
Job: job.NewJob(mgr.GetClient()),
ConfigMap: configmap.NewConfigMap(mgr.GetClient()),
Events: mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
+ ClientSet: clientset,
}).SetupWithManager(mgr); err != nil {
logger.Error(err, "unable to create controller", "controller", "ShardingSphereChaos")
return err
diff --git a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
index 95dfa97..b84b0be 100644
--- a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
+++ b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
@@ -19,9 +19,18 @@ package controllers
import (
"context"
+ "errors"
"fmt"
+ "strings"
"time"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/client-go/util/retry"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ clientset "k8s.io/client-go/kubernetes"
+
"k8s.io/client-go/tools/record"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap"
@@ -44,6 +53,20 @@ import (
const (
ShardingSphereChaosControllerName = "shardingsphere-chaos-controller"
ssChaosDefaultEnqueueTime = 10 * time.Second
+ VerifyJobCheck = "Verify"
+)
+
+var (
+ ErrNoPod = errors.New("no pod in list")
+)
+
+type JobCondition string
+
+var (
+ CompleteJob JobCondition = "complete"
+ FailureJob JobCondition = "failure"
+ SuspendJob JobCondition = "suspend"
+ ActiveJob JobCondition = "active"
)
// ShardingSphereChaosReconciler is a controller for the ShardingSphereChaos
@@ -55,6 +78,7 @@ type ShardingSphereChaosReconciler struct { //
Job job.Job
ConfigMap configmap.ConfigMap
Events record.EventRecorder
+ ClientSet *clientset.Clientset
}
// Reconcile handles main function of this controller
@@ -120,72 +144,92 @@ func (r *ShardingSphereChaosReconciler) getRuntimeSSChaos(ctx context.Context, n
func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx context.Context, ssChao *sschaosv1alpha1.ShardingSphereChaos) error {
logger := r.Log.WithValues("reconcile chaos", ssChao.Name)
+
if ssChao.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChao.Status.Phase == "" {
- fmt.Println("reach here")
return nil
}
- fmt.Println("reach here after")
namespaceName := types.NamespacedName{Namespace: ssChao.Namespace, Name: ssChao.Name}
+
if ssChao.Spec.EmbedChaos.PodChaos != nil {
- chao, isExist, err := r.getPodChaosByNamespacedName(ctx, namespaceName)
+ chao, err := r.getPodChaosByNamespacedName(ctx, namespaceName)
if err != nil {
logger.Error(err, "pod chaos err")
return err
}
- if isExist {
+ if chao != nil {
return r.updatePodChaos(ctx, ssChao, chao)
}
return r.CreatePodChaos(ctx, ssChao)
} else if ssChao.Spec.EmbedChaos.NetworkChaos != nil {
- chao, isExist, err := r.getNetworkChaosByNamespacedName(ctx, namespaceName)
+ chao, err := r.getNetworkChaosByNamespacedName(ctx, namespaceName)
if err != nil {
logger.Error(err, "network chao err")
return err
}
- if isExist {
+ if chao != nil {
return r.updateNetWorkChaos(ctx, ssChao, chao)
}
return r.CreateNetworkChaos(ctx, ssChao)
}
+
return nil
}
func (r *ShardingSphereChaosReconciler) reconcileConfigMap(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
logger := r.Log.WithValues("reconcile configmap", ssChaos.Name)
namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, Name: ssChaos.Name}
- rConfigmap, isExist, err := r.getConfigMapByNamespacedName(ctx, namespaceName)
+ rConfigmap, err := r.getConfigMapByNamespacedName(ctx, namespaceName)
if err != nil {
logger.Error(err, "get configmap error")
return err
}
- if isExist {
+ if rConfigmap != nil {
return r.updateConfigMap(ctx, ssChaos, rConfigmap)
}
- return r.CreateConfigMap(ctx, ssChaos)
+ err = r.CreateConfigMap(ctx, ssChaos)
+ if err != nil {
+ r.Events.Event(ssChaos, "Warning", "Created", fmt.Sprintf("configmap created fail %s", err))
+ return err
+ }
+
+ r.Events.Event(ssChaos, "Normal", "Created", "configmap created successfully")
+ return nil
}
func (r *ShardingSphereChaosReconciler) reconcileJob(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
logger := r.Log.WithValues("reconcile job", ssChaos.Name)
- namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, Name: ssChaos.Name}
- rJob, isExist, err := r.getJobByNamespacedName(ctx, namespaceName)
- if err != nil {
- logger.Error(err, "get job err")
- return err
- }
+
var nowInjectRequirement reconcile.InjectRequirement
- if ssChaos.Status.Phase == "" || ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment {
+ switch {
+ case ssChaos.Status.Phase == "" || ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment:
nowInjectRequirement = reconcile.Experimental
- }
- if ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos || ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
+ case ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos:
nowInjectRequirement = reconcile.Pressure
+ case ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos:
+ nowInjectRequirement = reconcile.Verify
+ }
+
+ namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, Name: reconcile.SetJobNamespaceName(ssChaos.Name, nowInjectRequirement)}
+
+ rJob, err := r.getJobByNamespacedName(ctx, namespaceName)
+ if err != nil {
+ logger.Error(err, "get job err")
+ return err
}
- if isExist {
+
+ if rJob != nil {
return r.updateJob(ctx, nowInjectRequirement, ssChaos, rJob)
}
- return r.createJob(ctx, nowInjectRequirement, ssChaos)
+ err = r.createJob(ctx, nowInjectRequirement, ssChaos)
+ if err != nil {
+ return err
+ }
+
+ r.Events.Event(ssChaos, "Normal", "Created", fmt.Sprintf("%s job created successfully", string(nowInjectRequirement)))
+ return nil
}
func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, namespacedName types.NamespacedName) error {
@@ -193,17 +237,30 @@ func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, nam
if err != nil {
return err
}
- if ssChaos.Status.Phase == "" {
- ssChaos.Status.Phase = sschaosv1alpha1.PhaseBeforeExperiment
- }
- rJob := &batchV1.Job{}
- if err := r.Get(ctx, namespacedName, rJob); err != nil {
+
+ setDefault(ssChaos)
+
+ jobName := getRequirement(ssChaos)
+ rJob, err := r.getJobByNamespacedName(ctx, types.NamespacedName{Namespace: ssChaos.Namespace, Name: reconcile.SetJobNamespaceName(ssChaos.Name, jobName)})
+ if err != nil || rJob == nil {
return err
}
if ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment && rJob.Status.Succeeded == 1 {
ssChaos.Status.Phase = sschaosv1alpha1.PhaseAfterExperiment
}
+ jobConditions := rJob.Status.Conditions
+ condition := getJobCondition(jobConditions)
+
+ if condition == FailureJob {
+ r.Events.Event(ssChaos, "Warning", "failed", fmt.Sprintf("job: %s", rJob.Name))
+ }
+ if ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
+ if err := r.updateRecoveredJob(ctx, ssChaos, rJob); err != nil {
+ r.Events.Event(ssChaos, "Warning", "getPodLog", err.Error())
+ return err
+ }
+ }
if err := r.updatePhaseStart(ctx, ssChaos); err != nil {
return err
@@ -213,10 +270,180 @@ func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, nam
if err != nil {
return err
}
- rt.Status = ssChaos.Status
+ setRtStatus(rt, ssChaos)
return r.Status().Update(ctx, rt)
}
+func getRequirement(ssChaos *sschaosv1alpha1.ShardingSphereChaos) reconcile.InjectRequirement {
+ var jobName reconcile.InjectRequirement
+ if ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment {
+ jobName = reconcile.Experimental
+ }
+ if ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos {
+ jobName = reconcile.Pressure
+ }
+ if ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
+ jobName = reconcile.Verify
+ }
+ return jobName
+}
+
+func getJobCondition(conditions []batchV1.JobCondition) JobCondition {
+ var ret = ActiveJob
+ for i := range conditions {
+ p := &conditions[i]
+ switch {
+ case p.Type == batchV1.JobComplete && p.Status == v1.ConditionTrue:
+ ret = CompleteJob
+ case p.Type == batchV1.JobFailed && p.Status == v1.ConditionTrue:
+ ret = FailureJob
+ case p.Type == batchV1.JobSuspended && p.Status == v1.ConditionTrue:
+ ret = SuspendJob
+ case p.Type == batchV1.JobFailureTarget:
+ ret = FailureJob
+ }
+ }
+ return ret
+}
+
+func setDefault(ssChaos *sschaosv1alpha1.ShardingSphereChaos) {
+ if ssChaos.Status.Phase == "" {
+ ssChaos.Status.Phase = sschaosv1alpha1.PhaseBeforeExperiment
+ }
+ if ssChaos.Status.Result == nil {
+ ssChaos.Status.Result = []sschaosv1alpha1.Result{}
+ }
+}
+
+func setRtStatus(rt *sschaosv1alpha1.ShardingSphereChaos, ssChaos *sschaosv1alpha1.ShardingSphereChaos) {
+ rt.Status.Result = []sschaosv1alpha1.Result{}
+ for i := range ssChaos.Status.Result {
+ r := &ssChaos.Status.Result[i]
+ rt.Status.Result = append(rt.Status.Result, sschaosv1alpha1.Result{
+ Success: r.Success,
+ Detail: sschaosv1alpha1.Detail{
+ Time: metav1.Time{Time: time.Now()},
+ Msg: r.Detail.Msg,
+ },
+ })
+ }
+
+ rt.Status.Phase = ssChaos.Status.Phase
+ rt.Status.ChaosCondition = ssChaos.Status.ChaosCondition
+}
+
+func (r *ShardingSphereChaosReconciler) updateRecoveredJob(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos, rJob *batchV1.Job) error {
+ if isResultExist(rJob) {
+ return nil
+ }
+
+ for i := range ssChaos.Status.Result {
+ r := &ssChaos.Status.Result[i]
+ if strings.HasPrefix(r.Detail.Msg, VerifyJobCheck) {
+ return nil
+ }
+ }
+
+ logOpts := &v1.PodLogOptions{}
+ pod, err := r.getPodHaveLog(ctx, rJob)
+ if err != nil || pod == nil {
+ return err
+ }
+ podNamespacedName := types.NamespacedName{
+ Namespace: pod.Namespace,
+ Name: pod.Name,
+ }
+ condition := getJobCondition(rJob.Status.Conditions)
+ result := &sschaosv1alpha1.Result{}
+
+ if condition == CompleteJob {
+ log, err := r.getPodLog(ctx, podNamespacedName, logOpts)
+ if err != nil {
+ return err
+ }
+ if ssChaos.Spec.Expect.Verify == "" || ssChaos.Spec.Expect.Verify == log {
+ result.Success = true
+ result.Detail = sschaosv1alpha1.Detail{
+ Time: metav1.Time{Time: time.Now()},
+ Msg: fmt.Sprintf("%s: job succeeded", VerifyJobCheck),
+ }
+ } else {
+ result.Success = false
+ result.Detail = sschaosv1alpha1.Detail{
+ Time: metav1.Time{Time: time.Now()},
+ Msg: fmt.Sprintf("%s: %s", VerifyJobCheck, log),
+ }
+ }
+ }
+
+ if condition == FailureJob {
+ log, err := r.getPodLog(ctx, podNamespacedName, logOpts)
+ if err != nil {
+ return err
+ }
+ result.Success = false
+ result.Detail = sschaosv1alpha1.Detail{
+ Time: metav1.Time{Time: time.Now()},
+ Msg: fmt.Sprintf("%s: %s", VerifyJobCheck, log),
+ }
+ }
+
+ ssChaos.Status.Result = updateResult(ssChaos.Status.Result, *result, VerifyJobCheck)
+
+ return nil
+}
+
+func (r *ShardingSphereChaosReconciler) getPodHaveLog(ctx context.Context, rJob *batchV1.Job) (*v1.Pod, error) {
+ pods := &v1.PodList{}
+ if err := r.List(ctx, pods, client.MatchingLabels{"controller-uid": rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
+ return nil, err
+ }
+ if pods.Items == nil {
+ return nil, nil
+ }
+ var pod *v1.Pod
+ for i := range pods.Items {
+ pod = &pods.Items[i]
+ break
+ }
+ return pod, nil
+}
+
+func isResultExist(rJob *batchV1.Job) bool {
+ for _, cmd := range rJob.Spec.Template.Spec.Containers[0].Args {
+ if strings.Contains(cmd, string(reconcile.Verify)) {
+ return true
+ }
+ }
+ return false
+}
+
+func updateResult(results []sschaosv1alpha1.Result, r sschaosv1alpha1.Result, check string) []sschaosv1alpha1.Result {
+ for i := range results {
+ msg := results[i].Detail.Msg
+ if strings.HasPrefix(msg, check) && strings.HasPrefix(r.Detail.Msg, check) {
+ results[i] = r
+ return results
+ }
+ }
+ results = append(results, r)
+ return results
+}
+
+func (r *ShardingSphereChaosReconciler) getPodLog(ctx context.Context, namespacedName types.NamespacedName, options *v1.PodLogOptions) (string, error) {
+ req := r.ClientSet.CoreV1().Pods(namespacedName.Namespace).GetLogs(namespacedName.Name, options)
+ res := req.Do(ctx)
+ if res.Error() != nil {
+ return "", res.Error()
+ }
+ var ret []byte
+ ret, err := res.Raw()
+ if err != nil {
+ return "", err
+ }
+ return string(ret), nil
+}
+
func (r *ShardingSphereChaosReconciler) updatePhaseStart(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
if ssChaos.Status.Phase != sschaosv1alpha1.PhaseBeforeExperiment {
if err := r.updateChaosCondition(ctx, ssChaos); err != nil {
@@ -257,50 +484,37 @@ func (r *ShardingSphereChaosReconciler) updateChaosCondition(ctx context.Context
return nil
}
-func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (reconcile.NetworkChaos, bool, error) {
+func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (reconcile.NetworkChaos, error) {
nc, err := r.Chaos.GetNetworkChaosByNamespacedName(ctx, namespacedName)
if err != nil {
- return nil, false, err
- }
- if nc == nil {
- return nil, false, nil
+ return nil, err
}
- return nc, true, nil
+ return nc, nil
}
-func (r *ShardingSphereChaosReconciler) getPodChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (reconcile.PodChaos, bool, error) {
+func (r *ShardingSphereChaosReconciler) getPodChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (reconcile.PodChaos, error) {
pc, err := r.Chaos.GetPodChaosByNamespacedName(ctx, namespacedName)
if err != nil {
- return nil, false, err
- }
- if pc == nil {
- return nil, false, nil
+ return nil, err
}
- return pc, true, nil
+ return pc, nil
}
-func (r *ShardingSphereChaosReconciler) getConfigMapByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*v1.ConfigMap, bool, error) {
+func (r *ShardingSphereChaosReconciler) getConfigMapByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*v1.ConfigMap, error) {
config, err := r.ConfigMap.GetByNamespacedName(ctx, namespacedName)
if err != nil {
- return nil, false, err
- }
- if config == nil {
- return nil, false, nil
+ return nil, err
}
- return config, true, nil
+ return config, nil
}
-func (r *ShardingSphereChaosReconciler) getJobByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*batchV1.Job, bool, error) {
+func (r *ShardingSphereChaosReconciler) getJobByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*batchV1.Job, error) {
injectJob, err := r.Job.GetByNamespacedName(ctx, namespacedName)
if err != nil {
- return nil, false, err
- }
- if injectJob == nil {
- return nil, false, nil
+ return nil, err
}
-
- return injectJob, true, nil
+ return injectJob, nil
}
func (r *ShardingSphereChaosReconciler) updateConfigMap(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos, cur *v1.ConfigMap) error {
@@ -325,39 +539,87 @@ func (r *ShardingSphereChaosReconciler) CreateConfigMap(ctx context.Context, cha
}
func (r *ShardingSphereChaosReconciler) updateJob(ctx context.Context, requirement reconcile.InjectRequirement, chao *sschaosv1alpha1.ShardingSphereChaos, cur *batchV1.Job) error {
- exp, err := reconcile.UpdateJob(chao, requirement, cur)
+ isEqual, err := reconcile.IsJobChanged(chao, requirement, cur)
if err != nil {
return err
}
- if exp != nil {
- if err := r.Delete(ctx, cur); err != nil {
- return err
- }
- if err := ctrl.SetControllerReference(chao, exp, r.Scheme); err != nil {
- return err
- }
- if err := r.Create(ctx, exp); err != nil {
+ if !isEqual {
+ if err := r.Delete(ctx, cur); err != nil && !apierrors.IsNotFound(err) {
return err
}
+ r.Events.Event(chao, "Normal", "Updated", "job Updated")
}
return nil
}
-// todo:
func (r *ShardingSphereChaosReconciler) createJob(ctx context.Context, requirement reconcile.InjectRequirement, chao *sschaosv1alpha1.ShardingSphereChaos) error {
injectJob, err := reconcile.NewJob(chao, requirement)
+ if err != nil {
+ return err
+ }
if err := ctrl.SetControllerReference(chao, injectJob, r.Scheme); err != nil {
return err
}
+
+ err = r.Create(ctx, injectJob)
if err != nil {
+ return client.IgnoreAlreadyExists(err)
+ }
+
+ rJob := &batchV1.Job{}
+ backoff := wait.Backoff{
+ Steps: 6,
+ Duration: 500 * time.Millisecond,
+ Factor: 5.0,
+ Jitter: 0.1,
+ }
+
+ if err := retry.OnError(backoff, func(e error) bool {
+ return true
+ }, func() error {
+ return r.Get(ctx, types.NamespacedName{Namespace: chao.Namespace, Name: reconcile.SetJobNamespaceName(chao.Name, requirement)}, rJob)
+ }); err != nil {
return err
}
- err = r.Create(ctx, injectJob)
- if err == nil && apierrors.IsAlreadyExists(err) {
+
+ podList := &v1.PodList{}
+ if err := retry.OnError(backoff, func(e error) bool {
+ return e != nil
+ }, func() error {
+ if err := r.List(ctx, podList, client.MatchingLabels{"controller-uid": rJob.Spec.Template.Labels["controller-uid"]}); err != nil {
+ return err
+ }
+ if len(podList.Items) == 0 {
+ return ErrNoPod
+ }
return nil
+ }); err != nil {
+ return err
}
- return err
+ for i := range podList.Items {
+ rPod := &podList.Items[i]
+ if err := ctrl.SetControllerReference(rJob, rPod, r.Scheme); err != nil {
+ return err
+ }
+
+ exp := rPod.DeepCopy()
+ updateBackoff := wait.Backoff{
+ Steps: 5,
+ Duration: 30 * time.Millisecond,
+ Factor: 5.0,
+ Jitter: 0.1,
+ }
+ if err := retry.RetryOnConflict(updateBackoff, func() error {
+ if err := r.Update(ctx, exp); err != nil {
+ return err
+ }
+ return nil
+ }); err != nil {
+ return err
+ }
+ }
+ return nil
}
func (r *ShardingSphereChaosReconciler) updatePodChaos(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos, podChaos reconcile.PodChaos) error {
@@ -417,6 +679,7 @@ func (r *ShardingSphereChaosReconciler) SetupWithManager(mgr ctrl.Manager) error
For(&sschaosv1alpha1.ShardingSphereChaos{}).
Owns(&chaosv1alpha1.PodChaos{}).
Owns(&chaosv1alpha1.NetworkChaos{}).
+ Owns(&v1.ConfigMap{}).
Owns(&batchV1.Job{}).
Complete(r)
}
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go
index f3054be..48d2f20 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/configmap.go
@@ -29,6 +29,7 @@ import (
const (
configExperimental = "experimental.sh"
configPressure = "pressure.sh"
+ configVerify = "verify.sh"
)
const (
@@ -40,7 +41,7 @@ func NewSSConfigMap(chaos *v1alpha1.ShardingSphereChaos) *v1.ConfigMap {
cmb.SetName(chaos.Name).SetNamespace(chaos.Namespace).SetLabels(chaos.Labels)
- cmb.SetExperimental(chaos.Spec.InjectJob.Experimental).SetPressure(chaos.Spec.InjectJob.Pressure)
+ cmb.SetExperimental(chaos.Spec.InjectJob.Experimental).SetPressure(chaos.Spec.InjectJob.Pressure).SetVerify(chaos.Spec.InjectJob.Verify)
return cmb.Build()
}
@@ -50,6 +51,7 @@ type SSConfigMapBuilder interface {
common.ConfigMapBuilder
SetExperimental(string) SSConfigMapBuilder
SetPressure(string) SSConfigMapBuilder
+ SetVerify(string) SSConfigMapBuilder
}
type configmapBuilder struct {
@@ -75,6 +77,11 @@ func (c *configmapBuilder) SetPressure(cmd string) SSConfigMapBuilder {
return c
}
+func (c *configmapBuilder) SetVerify(cmd string) SSConfigMapBuilder {
+ c.configmap.Data[configVerify] = cmd
+ return c
+}
+
// defaultConfigMap returns a ConfigMap filling with default expected values
func defaultConfigMap() *v1.ConfigMap {
return &v1.ConfigMap{
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
index 9f0ac74..273a8a9 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
@@ -30,12 +30,16 @@ import (
)
const (
- DefaultImageName = "agoiyanzsa/tools-runtime:1.0"
+ DefaultImageName = "agoiyanzsa/tools-runtime:2.0"
DefaultContainerName = "tools-runtime"
DefaultWorkPath = "/app/start"
DefaultConfigName = "cmd-conf"
)
+var (
+ DefaultTTLSecondsAfterFinished int32 = 300
+)
+
var DefaultFileMode int32 = 493
const (
@@ -52,11 +56,16 @@ type InjectRequirement string
var (
Experimental InjectRequirement = "experimental"
Pressure InjectRequirement = "pressure"
+ Verify InjectRequirement = "verify"
)
+func SetJobNamespaceName(name string, requirement InjectRequirement) string {
+ return fmt.Sprintf("%s-%s", name, string(requirement))
+}
+
func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement) (*v1.Job, error) {
jbd := NewJobBuilder()
- jbd.SetNamespace(ssChaos.Namespace).SetLabels(ssChaos.Labels).SetName(ssChaos.Name)
+ jbd.SetNamespace(ssChaos.Namespace).SetLabels(ssChaos.Labels).SetName(SetJobNamespaceName(ssChaos.Name, requirement))
if v, ok := ssChaos.Annotations[completions]; ok {
value, err := MustInt32(v)
@@ -117,11 +126,10 @@ func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement
vm := &corev1.VolumeMount{Name: DefaultConfigName, MountPath: DefaultWorkPath}
cbd := common.NewContainerBuilder()
- //todo: replace as DefaultImageName
cbd.SetImage(DefaultImageName)
cbd.SetName(DefaultContainerName)
cbd.SetVolumeMount(vm)
- cbd.SetCommand([]string{"sh"})
+ cbd.SetCommand([]string{"sh", "-c"})
container := cbd.Build()
container.Args = NewCmds(requirement)
jbd.SetContainers(container)
@@ -131,12 +139,14 @@ func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement
func NewCmds(requirement InjectRequirement) []string {
var cmds []string
- cmds = append(cmds, "-c")
if requirement == Experimental {
cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, configExperimental))
}
if requirement == Pressure {
- cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, configExperimental), fmt.Sprintf("%s/%s", DefaultWorkPath, configPressure))
+ cmds = append(cmds, fmt.Sprintf("%s/%s;%s/%s", DefaultWorkPath, configPressure, DefaultWorkPath, configExperimental))
+ }
+ if requirement == Verify {
+ cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, configVerify))
}
return cmds
}
@@ -149,17 +159,17 @@ func MustInt32(s string) (int32, error) {
return int32(v), nil
}
-func UpdateJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement, cur *v1.Job) (*v1.Job, error) {
+func IsJobChanged(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement, cur *v1.Job) (bool, error) {
now, err := NewJob(ssChaos, requirement)
if err != nil {
- return nil, err
+ return false, err
}
isEqual := judgeJobEqual(cur, now)
if isEqual {
- return nil, nil
+ return true, nil
}
- return now, nil
+ return false, nil
}
func judgeJobEqual(now *v1.Job, exp *v1.Job) bool {
@@ -194,10 +204,15 @@ func judgeJobConfigEqual(now *v1.Job, exp *v1.Job) bool {
return true
}
func judgeTTLSecondsAfterFinished(cur *int32, exp *int32) bool {
- if exp != nil && *cur != *exp {
- return false
+ if cur == nil && exp == nil {
+ return true
}
- return true
+ if cur != nil && exp != nil {
+ if *cur == *exp {
+ return true
+ }
+ }
+ return false
}
func judgeActiveDeadlineSeconds(cur *int64, exp *int64) bool {
if exp != nil && *cur != *exp {
@@ -212,6 +227,9 @@ func judgeContainerEqual(now *corev1.Container, exp *corev1.Container) bool {
if !reflect.DeepEqual(now.Command, exp.Command) {
return false
}
+ if !reflect.DeepEqual(now.Args, exp.Args) {
+ return false
+ }
if now.Image != exp.Image {
return false
}
@@ -300,7 +318,8 @@ func (j *jobBuilder) SetContainers(container *corev1.Container) JobBuilder {
}
func (j *jobBuilder) SetTTLSecondsAfterFinished(i int32) JobBuilder {
- j.job.Spec.TTLSecondsAfterFinished = &i
+ ret := i
+ j.job.Spec.TTLSecondsAfterFinished = &ret
return j
}
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go
index 33a6a42..cd10e4f 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/shardingspherechaos_suite_test.go
@@ -26,3 +26,10 @@ func TestShardingSphereChaos(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "ShardingSphereChaos Suite")
}
+
+//func installChaosMesh() {
+// var (
+// installCmd = "curl -sSL https://mirrors.chaos-mesh.org/v2.5.1/install.sh | bash -s -- --docker-mirror"
+// )
+//
+//}