You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/04/17 09:51:34 UTC
[shardingsphere-on-cloud] branch main updated: add event and status.phase to control resource create check (#312)
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 4c825d6 add event and status.phase to control resource create check (#312)
4c825d6 is described below
commit 4c825d6f85c57672fc59f013c6c164a64703059d
Author: moomman <85...@users.noreply.github.com>
AuthorDate: Mon Apr 17 17:51:26 2023 +0800
add event and status.phase to control resource create check (#312)
* add phase status, controller chaos
* fix phase logic,handle job and chaos create
* fix phase logic,handle job and chaos create
* fix phase logic,handle job and chaos create
---------
Co-authored-by: moonman <ag...@163.com>
---
.../api/v1alpha1/shardingsphere_chaos_types.go | 28 ++--
shardingsphere-operator/build/tools/Dockerfile | 6 +-
.../cmd/shardingsphere-operator/manager/option.go | 1 +
.../controllers/shardingsphere_chaos_controller.go | 175 +++++++++++++++++----
.../reconcile/shardingspherechaos/chaos-mesh.go | 14 +-
.../pkg/reconcile/shardingspherechaos/job.go | 9 +-
6 files changed, 169 insertions(+), 64 deletions(-)
diff --git a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
index 5b99a64..32c2126 100644
--- a/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
+++ b/shardingsphere-operator/api/v1alpha1/shardingsphere_chaos_types.go
@@ -44,8 +44,6 @@ type ShardingSphereChaos struct {
type ShardingSphereChaosSpec struct {
InjectJob JobSpec `json:"injectJob,omitempty"`
EmbedChaos `json:",inline"`
- //todo
- //Verify batchV1Beta1.JobTemplateSpec `json:"Verify,omitempty"`
}
// JobSpec Specifies the config of job to create
@@ -69,32 +67,28 @@ type EmbedChaos struct {
type ChaosCondition string
const (
- Creating ChaosCondition = "Creating"
AllRecovered ChaosCondition = "AllRecovered"
Paused ChaosCondition = "Paused"
AllInjected ChaosCondition = "AllInjected"
NoTarget ChaosCondition = "NoTarget"
- UnKnown ChaosCondition = "UnKnown"
-)
-
-// Jobschedule Show current job progress
-type Jobschedule string
-
-const (
- JobCreating Jobschedule = "JobCreating"
- JobFailed Jobschedule = "JobFailed"
- JobFinish Jobschedule = "JobFinish"
+ Unknown ChaosCondition = "Unknown"
)
// ShardingSphereChaosStatus defines the actual state of ShardingSphereChaos
type ShardingSphereChaosStatus struct {
ChaosCondition ChaosCondition `json:"chaosCondition"`
- //todo
- //InjectStatus Jobschedule `json:"InjectStatus"`
- //todo
- //VerifyStatus Jobschedule `json:"VerifyStatus"`
+ Phase Phase `json:"phase"`
}
+type Phase string
+
+var (
+ PhaseBeforeExperiment Phase = "BeforeReq"
+ PhaseAfterExperiment Phase = "AfterReq"
+ PhaseInChaos Phase = "Injected"
+ PhaseRecoveredChaos Phase = "Recovered"
+)
+
// PodChaosAction Specify the action type of pod Chaos
type PodChaosAction string
diff --git a/shardingsphere-operator/build/tools/Dockerfile b/shardingsphere-operator/build/tools/Dockerfile
index 17f3e4d..d73511b 100644
--- a/shardingsphere-operator/build/tools/Dockerfile
+++ b/shardingsphere-operator/build/tools/Dockerfile
@@ -25,7 +25,6 @@ ENV ZOOKEEPER_DIR /app/zookeeper
WORKDIR /app
RUN mkdir -p "/app/start" && chmod -R 777 /app/start
CMD ["tail -f /dev/null"]
-ENTRYPOINT ["sh","-c"]
RUN set -eux; \
\
apt-get update; \
@@ -43,7 +42,8 @@ RUN set -eux; \
wget -O zookeeper.tar.gz "${ZOOKEEPER_DOWNLOAD_URL}"; \
mkdir -p ${ZOOKEEPER_DIR}; \
tar -zxf zookeeper.tar.gz -C ${ZOOKEEPER_DIR} --strip-components 1; \
- rm zookeeper.tar.gz; \
- \
+ rm zookeeper.tar.gz;
+
+ENTRYPOINT ["sh","-c"]
diff --git a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index 53f4d16..10ff9f5 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -146,6 +146,7 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
Chaos: chaos.NewChaos(mgr.GetClient()),
Job: job.NewJob(mgr.GetClient()),
ConfigMap: configmap.NewConfigMap(mgr.GetClient()),
+ Events: mgr.GetEventRecorderFor("shardingsphere-chaos-controller"),
}).SetupWithManager(mgr); err != nil {
logger.Error(err, "unable to create controller", "controller", "ShardingSphereChaos")
return err
diff --git a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
index e44d97d..95dfa97 100644
--- a/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
+++ b/shardingsphere-operator/pkg/controllers/shardingsphere_chaos_controller.go
@@ -19,8 +19,11 @@ package controllers
import (
"context"
+ "fmt"
"time"
+ "k8s.io/client-go/tools/record"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap"
v1 "k8s.io/api/core/v1"
@@ -40,7 +43,7 @@ import (
const (
ShardingSphereChaosControllerName = "shardingsphere-chaos-controller"
- ssChaosDefaultEnqueueTime = 5 * time.Second
+ ssChaosDefaultEnqueueTime = 10 * time.Second
)
// ShardingSphereChaosReconciler is a controller for the ShardingSphereChaos
@@ -51,44 +54,77 @@ type ShardingSphereChaosReconciler struct { //
Chaos chaos.Chaos
Job job.Job
ConfigMap configmap.ConfigMap
+ Events record.EventRecorder
}
// Reconcile handles main function of this controller
func (r *ShardingSphereChaosReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Log.WithValues(ShardingSphereChaosControllerName, req.NamespacedName)
- var ssChaos sschaosv1alpha1.ShardingSphereChaos
- if err := r.Get(ctx, req.NamespacedName, &ssChaos); err != nil {
- logger.Error(err, "unable to fetch ShardingSphereChaos source")
- return ctrl.Result{}, client.IgnoreNotFound(err)
+ ssChaos, err := r.getRuntimeSSChaos(ctx, req.NamespacedName)
+ if err != nil {
+ return ctrl.Result{}, err
}
if !ssChaos.ObjectMeta.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}
-
logger.Info("start reconcile chaos")
- if err := r.reconcileChaos(ctx, &ssChaos); err != nil {
+ if err := r.reconcileChaos(ctx, ssChaos); err != nil {
+ if err == reconcile.ErrChangedSpec {
+ errHandle := r.handleChaosChange(ctx, req.NamespacedName)
+ return ctrl.Result{}, errHandle
+ }
logger.Error(err, " unable to reconcile chaos")
+ r.Events.Event(ssChaos, "Warning", "chaos err", err.Error())
return ctrl.Result{}, err
}
- if err := r.reconcileConfigMap(ctx, &ssChaos); err != nil {
+ if err := r.reconcileConfigMap(ctx, ssChaos); err != nil {
logger.Error(err, "unable to reconcile configmap")
+ r.Events.Event(ssChaos, "Warning", "configmap err", err.Error())
return ctrl.Result{}, err
}
- if err := r.reconcileJob(ctx, &ssChaos); err != nil {
+ if err := r.reconcileJob(ctx, ssChaos); err != nil {
logger.Error(err, "unable to reconcile job")
+ r.Events.Event(ssChaos, "Warning", "job err", err.Error())
return ctrl.Result{}, err
}
- if err := r.reconcileStatus(ctx, &ssChaos); err != nil {
+ if err := r.reconcileStatus(ctx, req.NamespacedName); err != nil {
+ r.Events.Event(ssChaos, "Warning", "update status error", err.Error())
logger.Error(err, "failed to update status")
}
return ctrl.Result{RequeueAfter: ssChaosDefaultEnqueueTime}, nil
}
+func (r *ShardingSphereChaosReconciler) handleChaosChange(ctx context.Context, name types.NamespacedName) error {
+
+ ssChaos, err := r.getRuntimeSSChaos(ctx, name)
+ if err != nil {
+ return err
+ }
+ if ssChaos.Status.Phase != sschaosv1alpha1.PhaseBeforeExperiment {
+ ssChaos.Status.Phase = sschaosv1alpha1.PhaseAfterExperiment
+ if err := r.Status().Update(ctx, ssChaos); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (r *ShardingSphereChaosReconciler) getRuntimeSSChaos(ctx context.Context, name types.NamespacedName) (*sschaosv1alpha1.ShardingSphereChaos, error) {
+ var rt = &sschaosv1alpha1.ShardingSphereChaos{}
+ err := r.Get(ctx, name, rt)
+ return rt, client.IgnoreNotFound(err)
+}
+
func (r *ShardingSphereChaosReconciler) reconcileChaos(ctx context.Context, ssChao *sschaosv1alpha1.ShardingSphereChaos) error {
logger := r.Log.WithValues("reconcile chaos", ssChao.Name)
+ if ssChao.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChao.Status.Phase == "" {
+ fmt.Println("reach here")
+ return nil
+ }
+ fmt.Println("reach here after")
namespaceName := types.NamespacedName{Namespace: ssChao.Namespace, Name: ssChao.Name}
if ssChao.Spec.EmbedChaos.PodChaos != nil {
chao, isExist, err := r.getPodChaosByNamespacedName(ctx, namespaceName)
@@ -133,49 +169,92 @@ func (r *ShardingSphereChaosReconciler) reconcileConfigMap(ctx context.Context,
func (r *ShardingSphereChaosReconciler) reconcileJob(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
logger := r.Log.WithValues("reconcile job", ssChaos.Name)
namespaceName := types.NamespacedName{Namespace: ssChaos.Namespace, Name: ssChaos.Name}
-
rJob, isExist, err := r.getJobByNamespacedName(ctx, namespaceName)
if err != nil {
logger.Error(err, "get job err")
return err
}
- //todo:update InjectRequirement by chaos status
+ var nowInjectRequirement reconcile.InjectRequirement
+ if ssChaos.Status.Phase == "" || ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment || ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment {
+ nowInjectRequirement = reconcile.Experimental
+ }
+ if ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos || ssChaos.Status.Phase == sschaosv1alpha1.PhaseRecoveredChaos {
+ nowInjectRequirement = reconcile.Pressure
+ }
if isExist {
- return r.updateJob(ctx, reconcile.Experimental, ssChaos, rJob)
+ return r.updateJob(ctx, nowInjectRequirement, ssChaos, rJob)
}
- return r.createJob(ctx, reconcile.Experimental, ssChaos)
+ return r.createJob(ctx, nowInjectRequirement, ssChaos)
}
-func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
- var (
- chaoCondition sschaosv1alpha1.ChaosCondition
- namespacedName = types.NamespacedName{
- Namespace: ssChaos.Namespace,
- Name: ssChaos.Name,
+func (r *ShardingSphereChaosReconciler) reconcileStatus(ctx context.Context, namespacedName types.NamespacedName) error {
+ ssChaos, err := r.getRuntimeSSChaos(ctx, namespacedName)
+ if err != nil {
+ return err
+ }
+ if ssChaos.Status.Phase == "" {
+ ssChaos.Status.Phase = sschaosv1alpha1.PhaseBeforeExperiment
+ }
+ rJob := &batchV1.Job{}
+ if err := r.Get(ctx, namespacedName, rJob); err != nil {
+ return err
+ }
+
+ if ssChaos.Status.Phase == sschaosv1alpha1.PhaseBeforeExperiment && rJob.Status.Succeeded == 1 {
+ ssChaos.Status.Phase = sschaosv1alpha1.PhaseAfterExperiment
+ }
+
+ if err := r.updatePhaseStart(ctx, ssChaos); err != nil {
+ return err
+ }
+
+ rt, err := r.getRuntimeSSChaos(ctx, namespacedName)
+ if err != nil {
+ return err
+ }
+ rt.Status = ssChaos.Status
+ return r.Status().Update(ctx, rt)
+}
+
+func (r *ShardingSphereChaosReconciler) updatePhaseStart(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
+ if ssChaos.Status.Phase != sschaosv1alpha1.PhaseBeforeExperiment {
+ if err := r.updateChaosCondition(ctx, ssChaos); err != nil {
+ return err
}
- )
+
+ if ssChaos.Status.ChaosCondition == sschaosv1alpha1.AllInjected && ssChaos.Status.Phase == sschaosv1alpha1.PhaseAfterExperiment {
+ ssChaos.Status.Phase = sschaosv1alpha1.PhaseInChaos
+ }
+
+ if ssChaos.Status.ChaosCondition == sschaosv1alpha1.AllRecovered && ssChaos.Status.Phase == sschaosv1alpha1.PhaseInChaos {
+ ssChaos.Status.Phase = sschaosv1alpha1.PhaseRecoveredChaos
+ }
+ }
+
+ return nil
+}
+
+func (r *ShardingSphereChaosReconciler) updateChaosCondition(ctx context.Context, ssChaos *sschaosv1alpha1.ShardingSphereChaos) error {
+ namespacedName := types.NamespacedName{
+ Namespace: ssChaos.Namespace,
+ Name: ssChaos.Name,
+ }
if ssChaos.Spec.EmbedChaos.PodChaos != nil {
chao, err := r.Chaos.GetPodChaosByNamespacedName(ctx, namespacedName)
if err != nil {
return err
}
- chaoCondition = r.Chaos.ConvertChaosStatus(ctx, ssChaos, chao)
+ ssChaos.Status.ChaosCondition = r.Chaos.ConvertChaosStatus(ctx, ssChaos, chao)
} else if ssChaos.Spec.EmbedChaos.NetworkChaos != nil {
chao, err := r.Chaos.GetNetworkChaosByNamespacedName(ctx, namespacedName)
if err != nil {
return err
}
- chaoCondition = r.Chaos.ConvertChaosStatus(ctx, ssChaos, chao)
+ ssChaos.Status.ChaosCondition = r.Chaos.ConvertChaosStatus(ctx, ssChaos, chao)
}
- var rt sschaosv1alpha1.ShardingSphereChaos
- if err := r.Get(ctx, namespacedName, &rt); err != nil {
- return err
- }
- ssChaos.Status.ChaosCondition = chaoCondition
- rt.Status = ssChaos.Status
- return r.Status().Update(ctx, &rt)
+ return nil
}
func (r *ShardingSphereChaosReconciler) getNetworkChaosByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (reconcile.NetworkChaos, bool, error) {
@@ -277,11 +356,20 @@ func (r *ShardingSphereChaosReconciler) createJob(ctx context.Context, requireme
if err == nil && apierrors.IsAlreadyExists(err) {
return nil
}
+
return err
}
func (r *ShardingSphereChaosReconciler) updatePodChaos(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos, podChaos reconcile.PodChaos) error {
- return r.Chaos.UpdatePodChaos(ctx, chao, podChaos)
+ err := r.Chaos.UpdatePodChaos(ctx, chao, podChaos)
+ if err != nil {
+ if err == reconcile.ErrNotChanged {
+ return nil
+ }
+ return err
+ }
+ r.Events.Event(chao, "Normal", "applied", fmt.Sprintf("podChaos %s", "new changes updated"))
+ return reconcile.ErrChangedSpec
}
func (r *ShardingSphereChaosReconciler) CreatePodChaos(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos) error {
@@ -289,11 +377,24 @@ func (r *ShardingSphereChaosReconciler) CreatePodChaos(ctx context.Context, chao
if err != nil {
return err
}
- return r.Chaos.CreatePodChaos(ctx, podChaos)
+ err = r.Chaos.CreatePodChaos(ctx, podChaos)
+ if err != nil {
+ return err
+ }
+ r.Events.Event(chao, "Normal", "created", fmt.Sprintf("podChaos %s", " is created successfully"))
+ return nil
}
func (r *ShardingSphereChaosReconciler) updateNetWorkChaos(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos, netWorkChaos reconcile.NetworkChaos) error {
- return r.Chaos.UpdateNetworkChaos(ctx, chao, netWorkChaos)
+ err := r.Chaos.UpdateNetworkChaos(ctx, chao, netWorkChaos)
+ if err != nil {
+ if err == reconcile.ErrNotChanged {
+ return nil
+ }
+ return err
+ }
+ r.Events.Event(chao, "Normal", "applied", fmt.Sprintf("networkChaos %s", "new changes updated"))
+ return reconcile.ErrChangedSpec
}
func (r *ShardingSphereChaosReconciler) CreateNetworkChaos(ctx context.Context, chao *sschaosv1alpha1.ShardingSphereChaos) error {
@@ -301,7 +402,13 @@ func (r *ShardingSphereChaosReconciler) CreateNetworkChaos(ctx context.Context,
if err != nil {
return err
}
- return r.Chaos.CreateNetworkChaos(ctx, networkChaos)
+ err = r.Chaos.CreateNetworkChaos(ctx, networkChaos)
+ if err != nil {
+ return err
+ }
+
+ r.Events.Event(chao, "Normal", "created", fmt.Sprintf("networkChaos %s", " is created successfully"))
+ return nil
}
// SetupWithManager sets up the controller with the Manager.
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/chaos-mesh.go b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/chaos-mesh.go
index 459a744..21bc8f6 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/chaos-mesh.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/chaos-mesh.go
@@ -50,7 +50,9 @@ const (
)
var (
- ErrConvert = errors.New("can not convert chaos interface to specify struct")
+ ErrConvert = errors.New("can not convert chaos interface to specify struct")
+ ErrNotChanged = errors.New("object not changed")
+ ErrChangedSpec = errors.New("change spec")
)
type chaosMeshHandler struct {
@@ -67,13 +69,13 @@ func (c *chaosMeshHandler) ConvertChaosStatus(ctx context.Context, ssChaos *v1al
if podChao, ok := chaos.(*chaosv1alpha1.PodChaos); ok && podChao != nil {
status = *podChao.GetStatus()
} else {
- return v1alpha1.UnKnown
+ return v1alpha1.Unknown
}
} else if ssChaos.Spec.EmbedChaos.NetworkChaos != nil {
if networkChaos, ok := chaos.(*chaosv1alpha1.NetworkChaos); ok && networkChaos != nil {
status = *networkChaos.GetStatus()
}
- return v1alpha1.UnKnown
+ return v1alpha1.Unknown
}
var conditions = map[chaosv1alpha1.ChaosConditionType]bool{}
for i := range status.Conditions {
@@ -102,7 +104,7 @@ func judgeCondition(condition map[chaosv1alpha1.ChaosConditionType]bool, phase c
}
}
- return v1alpha1.UnKnown
+ return v1alpha1.Unknown
}
func (c *chaosMeshHandler) CreatePodChaos(ctx context.Context, chao PodChaos) error {
@@ -302,7 +304,7 @@ func (c *chaosMeshHandler) UpdateNetworkChaos(ctx context.Context, ssChaos *v1al
}
isEqual := reflect.DeepEqual(reExp.Spec, reCur.Spec)
if isEqual {
- return nil
+ return ErrNotChanged
}
if err := c.r.Create(ctx, reCur); err != nil {
@@ -331,7 +333,7 @@ func (c *chaosMeshHandler) UpdatePodChaos(ctx context.Context, ssChaos *v1alpha1
}
isEqual := reflect.DeepEqual(reExp.Spec, reCur.Spec)
if isEqual {
- return nil
+ return ErrNotChanged
}
if err := c.r.Delete(ctx, reCur); err != nil {
diff --git a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
index 06a3bfc..9f0ac74 100644
--- a/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
+++ b/shardingsphere-operator/pkg/reconcile/shardingspherechaos/job.go
@@ -121,7 +121,7 @@ func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement
cbd.SetImage(DefaultImageName)
cbd.SetName(DefaultContainerName)
cbd.SetVolumeMount(vm)
- cbd.SetCommand([]string{"sh", "-c"})
+ cbd.SetCommand([]string{"sh"})
container := cbd.Build()
container.Args = NewCmds(requirement)
jbd.SetContainers(container)
@@ -129,15 +129,16 @@ func NewJob(ssChaos *v1alpha1.ShardingSphereChaos, requirement InjectRequirement
return rjob, nil
}
-func NewCmds(requirement InjectRequirement) (cmds []string) {
-
+func NewCmds(requirement InjectRequirement) []string {
+ var cmds []string
+ cmds = append(cmds, "-c")
if requirement == Experimental {
cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, configExperimental))
}
if requirement == Pressure {
cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, configExperimental), fmt.Sprintf("%s/%s", DefaultWorkPath, configPressure))
}
- return
+ return cmds
}
func MustInt32(s string) (int32, error) {