You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ca...@apache.org on 2022/11/03 11:34:11 UTC

[rocketmq-operator] branch master updated: [ISSUE #135]Support Broker clusters in Conntroller mode (#136)

This is an automated email from the ASF dual-hosted git repository.

caigy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-operator.git


The following commit(s) were added to refs/heads/master by this push:
     new bad939d  [ISSUE #135]Support Broker clusters in Conntroller mode (#136)
bad939d is described below

commit bad939d3003c7de2a261d3a733ef3fdb55216169
Author: caigy <ca...@apache.org>
AuthorDate: Thu Nov 3 19:34:06 2022 +0800

    [ISSUE #135]Support Broker clusters in Conntroller mode (#136)
    
    * Support Conntroller mode cluster
    
    * add description for ClusterMode of Broker
---
 Makefile                                           |    2 +
 deploy/crds/rocketmq.apache.org_brokers.yaml       |    8 +-
 ...s.yaml => rocketmq.apache.org_controllers.yaml} | 1548 +-------------------
 deploy/role.yaml                                   |   38 +
 deploy/storage/hostpath/prepare-host-path.sh       |    5 +-
 example/rocketmq_v1alpha1_controller_cr.yaml       |   47 +
 images/broker/alpine/brokerGenConfig.sh            |    5 +
 images/broker/alpine/build-broker-image.sh         |    4 +-
 images/controller/alpine/Dockerfile                |   66 +
 .../alpine/build-controller-image.sh}              |    6 +-
 .../alpine/runserver-customize.sh                  |   28 +-
 images/namesrv/alpine/build-namesrv-image.sh       |    4 +-
 images/namesrv/alpine/runserver-customize.sh       |    8 +
 main.go                                            |    9 +-
 pkg/apis/rocketmq/v1alpha1/broker_types.go         |    6 +
 .../{broker_types.go => controller_types.go}       |   54 +-
 pkg/constants/constants.go                         |   19 +
 pkg/controller/broker/broker_controller.go         |   53 +-
 pkg/controller/controller/dledger_controller.go    |  447 ++++++
 pkg/share/share.go                                 |    3 +
 pkg/{share/share.go => tool/resource_name.go}      |   24 +-
 21 files changed, 763 insertions(+), 1621 deletions(-)

diff --git a/Makefile b/Makefile
index 8eb6254..c727305 100644
--- a/Makefile
+++ b/Makefile
@@ -124,6 +124,7 @@ endif
 
 .PHONY: install
 install: manifests kustomize ## Install CRDs into the K8s cluster specified in ~/.kube/config.
+	kubectl apply -f deploy/crds/rocketmq.apache.org_controllers.yaml
 	kubectl create -f deploy/crds/rocketmq.apache.org_brokers.yaml
 	kubectl create -f deploy/crds/rocketmq.apache.org_nameservices.yaml
 	kubectl create -f deploy/crds/rocketmq.apache.org_consoles.yaml
@@ -132,6 +133,7 @@ install: manifests kustomize ## Install CRDs into the K8s cluster specified in ~
 .PHONY: uninstall
 uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified in ~/.kube/config. Call with ignore-not-found=true to ignore resource not found errors during deletion.
 	kubectl delete --ignore-not-found=$(ignore-not-found) -f deploy/crds/rocketmq.apache.org_brokers.yaml
+	kubectl delete --ignore-not-found=$(ignore-not-found) -f deploy/crds/rocketmq.apache.org_controllers.yaml
 	kubectl delete --ignore-not-found=$(ignore-not-found) -f deploy/crds/rocketmq.apache.org_nameservices.yaml
 	kubectl delete --ignore-not-found=$(ignore-not-found) -f deploy/crds/rocketmq.apache.org_consoles.yaml
 	kubectl delete --ignore-not-found=$(ignore-not-found) -f deploy/crds/rocketmq.apache.org_topictransfers.yaml
diff --git a/deploy/crds/rocketmq.apache.org_brokers.yaml b/deploy/crds/rocketmq.apache.org_brokers.yaml
index 6e80d29..76c25cf 100644
--- a/deploy/crds/rocketmq.apache.org_brokers.yaml
+++ b/deploy/crds/rocketmq.apache.org_brokers.yaml
@@ -880,6 +880,11 @@ spec:
               brokerImage:
                 description: BaseImage is the broker image to use for the Pods
                 type: string
+              clusterMode:
+                description: 'ClusterMode defines the way to be a broker cluster,
+                  valid values can be one of the following: STATIC(default), CONTROLLER,
+                  CONTAINER'
+                type: string
               containerSecurityContext:
                 description: Container Security Context
                 properties:
@@ -1360,7 +1365,8 @@ spec:
                 description: 'INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
                   Important: Run "operator-sdk generate k8s" to regenerate code after
                   modifying this file Add custom validation using kubebuilder tags:
-                  https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html'
+                  https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
+                  Size of broker clusters'
                 type: integer
               storageMode:
                 description: StorageMode can be EmptyDir, HostPath, StorageClass
diff --git a/deploy/crds/rocketmq.apache.org_brokers.yaml b/deploy/crds/rocketmq.apache.org_controllers.yaml
similarity index 51%
copy from deploy/crds/rocketmq.apache.org_brokers.yaml
copy to deploy/crds/rocketmq.apache.org_controllers.yaml
index 6e80d29..4f98506 100644
--- a/deploy/crds/rocketmq.apache.org_brokers.yaml
+++ b/deploy/crds/rocketmq.apache.org_controllers.yaml
@@ -6,33 +6,27 @@ metadata:
   annotations:
     controller-gen.kubebuilder.io/version: v0.7.0
   creationTimestamp: null
-  name: brokers.rocketmq.apache.org
+  name: controllers.rocketmq.apache.org
 spec:
   group: rocketmq.apache.org
   names:
-    kind: Broker
-    listKind: BrokerList
-    plural: brokers
-    singular: broker
+    kind: Controller
+    listKind: ControllerList
+    plural: controllers
+    singular: controller
   scope: Namespaced
   versions:
   - additionalPrinterColumns:
     - jsonPath: .spec.size
       name: Size
       type: integer
-    - jsonPath: .spec.replicaPerGroup
-      name: Replica-Per-Group
-      type: integer
-    - jsonPath: .spec.allowRestart
-      name: Allow-Restart
-      type: boolean
     - jsonPath: .metadata.creationTimestamp
       name: AGE
       type: date
     name: v1alpha1
     schema:
       openAPIV3Schema:
-        description: Broker is the Schema for the brokers API
+        description: Controller is the Schema for the Controllers API
         properties:
           apiVersion:
             description: 'APIVersion defines the versioned schema of this representation
@@ -47,7 +41,7 @@ spec:
           metadata:
             type: object
           spec:
-            description: BrokerSpec defines the desired state of Broker
+            description: ControllerSpec defines the desired state of Controller
             properties:
               affinity:
                 description: Affinity the pod's scheduling constraints
@@ -874,12 +868,6 @@ spec:
                         type: array
                     type: object
                 type: object
-              allowRestart:
-                description: AllowRestart defines whether allow pod restart
-                type: boolean
-              brokerImage:
-                description: BaseImage is the broker image to use for the Pods
-                type: string
               containerSecurityContext:
                 description: Container Security Context
                 properties:
@@ -1029,8 +1017,11 @@ spec:
                         type: string
                     type: object
                 type: object
+              controllerImage:
+                description: BaseImage is the controller image to use for the Pods
+                type: string
               env:
-                description: Env defines custom env, e.g. BROKER_MEM
+                description: Env defines custom env
                 items:
                   description: EnvVar represents an environment variable present in
                     a Container.
@@ -1134,9 +1125,6 @@ spec:
                   - name
                   type: object
                 type: array
-              hostNetwork:
-                description: HostNetwork can be true or false
-                type: boolean
               hostPath:
                 description: HostPath is the local path to store data
                 type: string
@@ -1155,9 +1143,6 @@ spec:
                       type: string
                   type: object
                 type: array
-              nameServers:
-                description: NameServers defines the name service list e.g. 192.168.1.1:9876;192.168.1.2:9876
-                type: string
               nodeSelector:
                 additionalProperties:
                   type: string
@@ -1167,9 +1152,6 @@ spec:
               priorityClassName:
                 description: PriorityClassName indicates the pod's priority
                 type: string
-              replicaPerGroup:
-                description: ReplicaPerGroup each broker cluster's replica number
-                type: integer
               resources:
                 description: Resources describes the compute resource requirements
                 properties:
@@ -1196,9 +1178,6 @@ spec:
                       to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/'
                     type: object
                 type: object
-              scalePodName:
-                description: The name of pod where the metadata from
-                type: string
               securityContext:
                 description: Pod Security Context
                 properties:
@@ -1357,10 +1336,7 @@ spec:
                 description: ServiceAccountName
                 type: string
               size:
-                description: 'INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
-                  Important: Run "operator-sdk generate k8s" to regenerate code after
-                  modifying this file Add custom validation using kubebuilder tags:
-                  https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html'
+                description: size of controller
                 type: integer
               storageMode:
                 description: StorageMode can be EmptyDir, HostPath, StorageClass
@@ -1670,1512 +1646,16 @@ spec:
                       type: object
                   type: object
                 type: array
-              volumes:
-                description: Volumes define the broker.conf
-                items:
-                  description: Volume represents a named volume in a pod that may
-                    be accessed by any container in the pod.
-                  properties:
-                    awsElasticBlockStore:
-                      description: 'AWSElasticBlockStore represents an AWS Disk resource
-                        that is attached to a kubelet''s host machine and then exposed
-                        to the pod. More info: https://kubernetes.io/docs/concepts/storage/volumes#awselasticblockstore'
-                      properties:
-                        fsType:
-                          description: 'Filesystem type of the volume that you want
-                            to mount. Tip: Ensure that the filesystem type is supported
-                            by the host operating system. Examples: "ext4", "xfs",
-                            "ntfs". Implicitly inferred to be "ext4" if unspecified.
-                            More info: https://kubernetes.io/docs/concepts/storage/volumes#awselasticblockstore
-                            TODO: how do we prevent errors in the filesystem from
-                            compromising the machine'
-                          type: string
-                        partition:
-                          description: 'The partition in the volume that you want
-                            to mount. If omitted, the default is to mount by volume
-                            name. Examples: For volume /dev/sda1, you specify the
-                            partition as "1". Similarly, the volume partition for
-                            /dev/sda is "0" (or you can leave the property empty).'
-                          format: int32
-                          type: integer
-                        readOnly:
-                          description: 'Specify "true" to force and set the ReadOnly
-                            property in VolumeMounts to "true". If omitted, the default
-                            is "false". More info: https://kubernetes.io/docs/concepts/storage/volumes#awselasticblockstore'
-                          type: boolean
-                        volumeID:
-                          description: 'Unique ID of the persistent disk resource
-                            in AWS (Amazon EBS volume). More info: https://kubernetes.io/docs/concepts/storage/volumes#awselasticblockstore'
-                          type: string
-                      required:
-                      - volumeID
-                      type: object
-                    azureDisk:
-                      description: AzureDisk represents an Azure Data Disk mount on
-                        the host and bind mount to the pod.
-                      properties:
-                        cachingMode:
-                          description: 'Host Caching mode: None, Read Only, Read Write.'
-                          type: string
-                        diskName:
-                          description: The Name of the data disk in the blob storage
-                          type: string
-                        diskURI:
-                          description: The URI the data disk in the blob storage
-                          type: string
-                        fsType:
-                          description: Filesystem type to mount. Must be a filesystem
-                            type supported by the host operating system. Ex. "ext4",
-                            "xfs", "ntfs". Implicitly inferred to be "ext4" if unspecified.
-                          type: string
-                        kind:
-                          description: 'Expected values Shared: multiple blob disks
-                            per storage account  Dedicated: single blob disk per storage
-                            account  Managed: azure managed data disk (only in managed
-                            availability set). defaults to shared'
-                          type: string
-                        readOnly:
-                          description: Defaults to false (read/write). ReadOnly here
-                            will force the ReadOnly setting in VolumeMounts.
-                          type: boolean
-                      required:
-                      - diskName
-                      - diskURI
-                      type: object
-                    azureFile:
-                      description: AzureFile represents an Azure File Service mount
-                        on the host and bind mount to the pod.
-                      properties:
-                        readOnly:
-                          description: Defaults to false (read/write). ReadOnly here
-                            will force the ReadOnly setting in VolumeMounts.
-                          type: boolean
-                        secretName:
-                          description: the name of secret that contains Azure Storage
-                            Account Name and Key
-                          type: string
-                        shareName:
-                          description: Share Name
-                          type: string
-                      required:
-                      - secretName
-                      - shareName
-                      type: object
-                    cephfs:
-                      description: CephFS represents a Ceph FS mount on the host that
-                        shares a pod's lifetime
-                      properties:
-                        monitors:
-                          description: 'Required: Monitors is a collection of Ceph
-                            monitors More info: https://examples.k8s.io/volumes/cephfs/README.md#how-to-use-it'
-                          items:
-                            type: string
-                          type: array
-                        path:
-                          description: 'Optional: Used as the mounted root, rather
-                            than the full Ceph tree, default is /'
-                          type: string
-                        readOnly:
-                          description: 'Optional: Defaults to false (read/write).
-                            ReadOnly here will force the ReadOnly setting in VolumeMounts.
-                            More info: https://examples.k8s.io/volumes/cephfs/README.md#how-to-use-it'
-                          type: boolean
-                        secretFile:
-                          description: 'Optional: SecretFile is the path to key ring
-                            for User, default is /etc/ceph/user.secret More info:
-                            https://examples.k8s.io/volumes/cephfs/README.md#how-to-use-it'
-                          type: string
-                        secretRef:
-                          description: 'Optional: SecretRef is reference to the authentication
-                            secret for User, default is empty. More info: https://examples.k8s.io/volumes/cephfs/README.md#how-to-use-it'
-                          properties:
-                            name:
-                              description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
-                                TODO: Add other useful fields. apiVersion, kind, uid?'
-                              type: string
-                          type: object
-                        user:
-                          description: 'Optional: User is the rados user name, default
-                            is admin More info: https://examples.k8s.io/volumes/cephfs/README.md#how-to-use-it'
-                          type: string
-                      required:
-                      - monitors
-                      type: object
-                    cinder:
-                      description: 'Cinder represents a cinder volume attached and
-                        mounted on kubelets host machine. More info: https://examples.k8s.io/mysql-cinder-pd/README.md'
-                      properties:
-                        fsType:
-                          description: 'Filesystem type to mount. Must be a filesystem
-                            type supported by the host operating system. Examples:
-                            "ext4", "xfs", "ntfs". Implicitly inferred to be "ext4"
-                            if unspecified. More info: https://examples.k8s.io/mysql-cinder-pd/README.md'
-                          type: string
-                        readOnly:
-                          description: 'Optional: Defaults to false (read/write).
-                            ReadOnly here will force the ReadOnly setting in VolumeMounts.
-                            More info: https://examples.k8s.io/mysql-cinder-pd/README.md'
-                          type: boolean
-                        secretRef:
-                          description: 'Optional: points to a secret object containing
-                            parameters used to connect to OpenStack.'
-                          properties:
-                            name:
-                              description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
-                                TODO: Add other useful fields. apiVersion, kind, uid?'
-                              type: string
-                          type: object
-                        volumeID:
-                          description: 'volume id used to identify the volume in cinder.
-                            More info: https://examples.k8s.io/mysql-cinder-pd/README.md'
-                          type: string
-                      required:
-                      - volumeID
-                      type: object
-                    configMap:
-                      description: ConfigMap represents a configMap that should populate
-                        this volume
-                      properties:
-                        defaultMode:
-                          description: 'Optional: mode bits used to set permissions
-                            on created files by default. Must be an octal value between
-                            0000 and 0777 or a decimal value between 0 and 511. YAML
-                            accepts both octal and decimal values, JSON requires decimal
-                            values for mode bits. Defaults to 0644. Directories within
-                            the path are not affected by this setting. This might
-                            be in conflict with other options that affect the file
-                            mode, like fsGroup, and the result can be other mode bits
-                            set.'
-                          format: int32
-                          type: integer
-                        items:
-                          description: If unspecified, each key-value pair in the
-                            Data field of the referenced ConfigMap will be projected
-                            into the volume as a file whose name is the key and content
-                            is the value. If specified, the listed keys will be projected
-                            into the specified paths, and unlisted keys will not be
-                            present. If a key is specified which is not present in
-                            the ConfigMap, the volume setup will error unless it is
-                            marked optional. Paths must be relative and may not contain
-                            the '..' path or start with '..'.
-                          items:
-                            description: Maps a string key to a path within a volume.
-                            properties:
-                              key:
-                                description: The key to project.
-                                type: string
-                              mode:
-                                description: 'Optional: mode bits used to set permissions
-                                  on this file. Must be an octal value between 0000
-                                  and 0777 or a decimal value between 0 and 511. YAML
-                                  accepts both octal and decimal values, JSON requires
-                                  decimal values for mode bits. If not specified,
-                                  the volume defaultMode will be used. This might
-                                  be in conflict with other options that affect the
-                                  file mode, like fsGroup, and the result can be other
-                                  mode bits set.'
-                                format: int32
-                                type: integer
-                              path:
-                                description: The relative path of the file to map
-                                  the key to. May not be an absolute path. May not
-                                  contain the path element '..'. May not start with
-                                  the string '..'.
-                                type: string
-                            required:
-                            - key
-                            - path
-                            type: object
-                          type: array
-                        name:
-                          description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
-                            TODO: Add other useful fields. apiVersion, kind, uid?'
-                          type: string
-                        optional:
-                          description: Specify whether the ConfigMap or its keys must
-                            be defined
-                          type: boolean
-                      type: object
-                    csi:
-                      description: CSI (Container Storage Interface) represents ephemeral
-                        storage that is handled by certain external CSI drivers (Beta
-                        feature).
-                      properties:
-                        driver:
-                          description: Driver is the name of the CSI driver that handles
-                            this volume. Consult with your admin for the correct name
-                            as registered in the cluster.
-                          type: string
-                        fsType:
-                          description: Filesystem type to mount. Ex. "ext4", "xfs",
-                            "ntfs". If not provided, the empty value is passed to
-                            the associated CSI driver which will determine the default
-                            filesystem to apply.
-                          type: string
-                        nodePublishSecretRef:
-                          description: NodePublishSecretRef is a reference to the
-                            secret object containing sensitive information to pass
-                            to the CSI driver to complete the CSI NodePublishVolume
-                            and NodeUnpublishVolume calls. This field is optional,
-                            and  may be empty if no secret is required. If the secret
-                            object contains more than one secret, all secret references
-                            are passed.
-                          properties:
-                            name:
-                              description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
-                                TODO: Add other useful fields. apiVersion, kind, uid?'
-                              type: string
-                          type: object
-                        readOnly:
-                          description: Specifies a read-only configuration for the
-                            volume. Defaults to false (read/write).
-                          type: boolean
-                        volumeAttributes:
-                          additionalProperties:
-                            type: string
-                          description: VolumeAttributes stores driver-specific properties
-                            that are passed to the CSI driver. Consult your driver's
-                            documentation for supported values.
-                          type: object
-                      required:
-                      - driver
-                      type: object
-                    downwardAPI:
-                      description: DownwardAPI represents downward API about the pod
-                        that should populate this volume
-                      properties:
-                        defaultMode:
-                          description: 'Optional: mode bits to use on created files
-                            by default. Must be a Optional: mode bits used to set
-                            permissions on created files by default. Must be an octal
-                            value between 0000 and 0777 or a decimal value between
-                            0 and 511. YAML accepts both octal and decimal values,
-                            JSON requires decimal values for mode bits. Defaults to
-                            0644. Directories within the path are not affected by
-                            this setting. This might be in conflict with other options
-                            that affect the file mode, like fsGroup, and the result
-                            can be other mode bits set.'
-                          format: int32
-                          type: integer
-                        items:
-                          description: Items is a list of downward API volume file
-                          items:
-                            description: DownwardAPIVolumeFile represents information
-                              to create the file containing the pod field
-                            properties:
-                              fieldRef:
-                                description: 'Required: Selects a field of the pod:
-                                  only annotations, labels, name and namespace are
-                                  supported.'
-                                properties:
-                                  apiVersion:
-                                    description: Version of the schema the FieldPath
-                                      is written in terms of, defaults to "v1".
-                                    type: string
-                                  fieldPath:
-                                    description: Path of the field to select in the
-                                      specified API version.
-                                    type: string
-                                required:
-                                - fieldPath
-                                type: object
-                              mode:
-                                description: 'Optional: mode bits used to set permissions
-                                  on this file, must be an octal value between 0000
-                                  and 0777 or a decimal value between 0 and 511. YAML
-                                  accepts both octal and decimal values, JSON requires
-                                  decimal values for mode bits. If not specified,
-                                  the volume defaultMode will be used. This might
-                                  be in conflict with other options that affect the
-                                  file mode, like fsGroup, and the result can be other
-                                  mode bits set.'
-                                format: int32
-                                type: integer
-                              path:
-                                description: 'Required: Path is  the relative path
-                                  name of the file to be created. Must not be absolute
-                                  or contain the ''..'' path. Must be utf-8 encoded.
-                                  The first item of the relative path must not start
-                                  with ''..'''
-                                type: string
-                              resourceFieldRef:
-                                description: 'Selects a resource of the container:
-                                  only resources limits and requests (limits.cpu,
-                                  limits.memory, requests.cpu and requests.memory)
-                                  are currently supported.'
-                                properties:
-                                  containerName:
-                                    description: 'Container name: required for volumes,
-                                      optional for env vars'
-                                    type: string
-                                  divisor:
-                                    anyOf:
-                                    - type: integer
-                                    - type: string
-                                    description: Specifies the output format of the
-                                      exposed resources, defaults to "1"
-                                    pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
-                                    x-kubernetes-int-or-string: true
-                                  resource:
-                                    description: 'Required: resource to select'
-                                    type: string
-                                required:
-                                - resource
-                                type: object
-                            required:
-                            - path
-                            type: object
-                          type: array
-                      type: object
-                    emptyDir:
-                      description: 'EmptyDir represents a temporary directory that
-                        shares a pod''s lifetime. More info: https://kubernetes.io/docs/concepts/storage/volumes#emptydir'
-                      properties:
-                        medium:
-                          description: 'What type of storage medium should back this
-                            directory. The default is "" which means to use the node''s
-                            default medium. Must be an empty string (default) or Memory.
-                            More info: https://kubernetes.io/docs/concepts/storage/volumes#emptydir'
-                          type: string
-                        sizeLimit:
-                          anyOf:
-                          - type: integer
-                          - type: string
-                          description: 'Total amount of local storage required for
-                            this EmptyDir volume. The size limit is also applicable
-                            for memory medium. The maximum usage on memory medium
-                            EmptyDir would be the minimum value between the SizeLimit
-                            specified here and the sum of memory limits of all containers
-                            in a pod. The default is nil which means that the limit
-                            is undefined. More info: http://kubernetes.io/docs/user-guide/volumes#emptydir'
-                          pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
-                          x-kubernetes-int-or-string: true
-                      type: object
-                    ephemeral:
-                      description: "Ephemeral represents a volume that is handled
-                        by a cluster storage driver. The volume's lifecycle is tied
-                        to the pod that defines it - it will be created before the
-                        pod starts, and deleted when the pod is removed. \n Use this
-                        if: a) the volume is only needed while the pod runs, b) features
-                        of normal volumes like restoring from snapshot or capacity
-                        \   tracking are needed, c) the storage driver is specified
-                        through a storage class, and d) the storage driver supports
-                        dynamic volume provisioning through    a PersistentVolumeClaim
-                        (see EphemeralVolumeSource for more    information on the
-                        connection between this volume type    and PersistentVolumeClaim).
-                        \n Use PersistentVolumeClaim or one of the vendor-specific
-                        APIs for volumes that persist for longer than the lifecycle
-                        of an individual pod. \n Use CSI for light-weight local ephemeral
-                        volumes if the CSI driver is meant to be used that way - see
-                        the documentation of the driver for more information. \n A
-                        pod can use both types of ephemeral volumes and persistent
-                        volumes at the same time. \n This is a beta feature and only
-                        available when the GenericEphemeralVolume feature gate is
-                        enabled."
-                      properties:
-                        volumeClaimTemplate:
-                          description: "Will be used to create a stand-alone PVC to
-                            provision the volume. The pod in which this EphemeralVolumeSource
-                            is embedded will be the owner of the PVC, i.e. the PVC
-                            will be deleted together with the pod.  The name of the
-                            PVC will be `<pod name>-<volume name>` where `<volume
-                            name>` is the name from the `PodSpec.Volumes` array entry.
-                            Pod validation will reject the pod if the concatenated
-                            name is not valid for a PVC (for example, too long). \n
-                            An existing PVC with that name that is not owned by the
-                            pod will *not* be used for the pod to avoid using an unrelated
-                            volume by mistake. Starting the pod is then blocked until
-                            the unrelated PVC is removed. If such a pre-created PVC
-                            is meant to be used by the pod, the PVC has to updated
-                            with an owner reference to the pod once the pod exists.
-                            Normally this should not be necessary, but it may be useful
-                            when manually reconstructing a broken cluster. \n This
-                            field is read-only and no changes will be made by Kubernetes
-                            to the PVC after it has been created. \n Required, must
-                            not be nil."
-                          properties:
-                            metadata:
-                              description: May contain labels and annotations that
-                                will be copied into the PVC when creating it. No other
-                                fields are allowed and will be rejected during validation.
-                              properties:
-                                annotations:
-                                  additionalProperties:
-                                    type: string
-                                  type: object
-                                finalizers:
-                                  items:
-                                    type: string
-                                  type: array
-                                labels:
-                                  additionalProperties:
-                                    type: string
-                                  type: object
-                                name:
-                                  type: string
-                                namespace:
-                                  type: string
-                              type: object
-                            spec:
-                              description: The specification for the PersistentVolumeClaim.
-                                The entire content is copied unchanged into the PVC
-                                that gets created from this template. The same fields
-                                as in a PersistentVolumeClaim are also valid here.
-                              properties:
-                                accessModes:
-                                  description: 'AccessModes contains the desired access
-                                    modes the volume should have. More info: https://kubernetes.io/docs/concepts/storage/persistent-volumes#access-modes-1'
-                                  items:
-                                    type: string
-                                  type: array
-                                dataSource:
-                                  description: 'This field can be used to specify
-                                    either: * An existing VolumeSnapshot object (snapshot.storage.k8s.io/VolumeSnapshot)
-                                    * An existing PVC (PersistentVolumeClaim) If the
-                                    provisioner or an external controller can support
-                                    the specified data source, it will create a new
-                                    volume based on the contents of the specified
-                                    data source. If the AnyVolumeDataSource feature
-                                    gate is enabled, this field will always have the
-                                    same contents as the DataSourceRef field.'
-                                  properties:
-                                    apiGroup:
-                                      description: APIGroup is the group for the resource
-                                        being referenced. If APIGroup is not specified,
-                                        the specified Kind must be in the core API
-                                        group. For any other third-party types, APIGroup
-                                        is required.
-                                      type: string
-                                    kind:
-                                      description: Kind is the type of resource being
-                                        referenced
-                                      type: string
-                                    name:
-                                      description: Name is the name of resource being
-                                        referenced
-                                      type: string
-                                  required:
-                                  - kind
-                                  - name
-                                  type: object
-                                dataSourceRef:
-                                  description: 'Specifies the object from which to
-                                    populate the volume with data, if a non-empty
-                                    volume is desired. This may be any local object
-                                    from a non-empty API group (non core object) or
-                                    a PersistentVolumeClaim object. When this field
-                                    is specified, volume binding will only succeed
-                                    if the type of the specified object matches some
-                                    installed volume populator or dynamic provisioner.
-                                    This field will replace the functionality of the
-                                    DataSource field and as such if both fields are
-                                    non-empty, they must have the same value. For
-                                    backwards compatibility, both fields (DataSource
-                                    and DataSourceRef) will be set to the same value
-                                    automatically if one of them is empty and the
-                                    other is non-empty. There are two important differences
-                                    between DataSource and DataSourceRef: * While
-                                    DataSource only allows two specific types of objects,
-                                    DataSourceRef   allows any non-core object, as
-                                    well as PersistentVolumeClaim objects. * While
-                                    DataSource ignores disallowed values (dropping
-                                    them), DataSourceRef   preserves all values, and
-                                    generates an error if a disallowed value is   specified.
-                                    (Alpha) Using this field requires the AnyVolumeDataSource
-                                    feature gate to be enabled.'
-                                  properties:
-                                    apiGroup:
-                                      description: APIGroup is the group for the resource
-                                        being referenced. If APIGroup is not specified,
-                                        the specified Kind must be in the core API
-                                        group. For any other third-party types, APIGroup
-                                        is required.
-                                      type: string
-                                    kind:
-                                      description: Kind is the type of resource being
-                                        referenced
-                                      type: string
-                                    name:
-                                      description: Name is the name of resource being
-                                        referenced
-                                      type: string
-                                  required:
-                                  - kind
-                                  - name
-                                  type: object
-                                resources:
-                                  description: 'Resources represents the minimum resources
-                                    the volume should have. More info: https://kubernetes.io/docs/concepts/storage/persistent-volumes#resources'
-                                  properties:
-                                    limits:
-                                      additionalProperties:
-                                        anyOf:
-                                        - type: integer
-                                        - type: string
-                                        pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
-                                        x-kubernetes-int-or-string: true
-                                      description: 'Limits describes the maximum amount
-                                        of compute resources allowed. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/'
-                                      type: object
-                                    requests:
-                                      additionalProperties:
-                                        anyOf:
-                                        - type: integer
-                                        - type: string
-                                        pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
-                                        x-kubernetes-int-or-string: true
-                                      description: 'Requests describes the minimum
-                                        amount of compute resources required. If Requests
-                                        is omitted for a container, it defaults to
-                                        Limits if that is explicitly specified, otherwise
-                                        to an implementation-defined value. More info:
-                                        https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/'
-                                      type: object
-                                  type: object
-                                selector:
-                                  description: A label query over volumes to consider
-                                    for binding.
-                                  properties:
-                                    matchExpressions:
-                                      description: matchExpressions is a list of label
-                                        selector requirements. The requirements are
-                                        ANDed.
-                                      items:
-                                        description: A label selector requirement
-                                          is a selector that contains values, a key,
-                                          and an operator that relates the key and
-                                          values.
-                                        properties:
-                                          key:
-                                            description: key is the label key that
-                                              the selector applies to.
-                                            type: string
-                                          operator:
-                                            description: operator represents a key's
-                                              relationship to a set of values. Valid
-                                              operators are In, NotIn, Exists and
-                                              DoesNotExist.
-                                            type: string
-                                          values:
-                                            description: values is an array of string
-                                              values. If the operator is In or NotIn,
-                                              the values array must be non-empty.
-                                              If the operator is Exists or DoesNotExist,
-                                              the values array must be empty. This
-                                              array is replaced during a strategic
-                                              merge patch.
-                                            items:
-                                              type: string
-                                            type: array
-                                        required:
-                                        - key
-                                        - operator
-                                        type: object
-                                      type: array
-                                    matchLabels:
-                                      additionalProperties:
-                                        type: string
-                                      description: matchLabels is a map of {key,value}
-                                        pairs. A single {key,value} in the matchLabels
-                                        map is equivalent to an element of matchExpressions,
-                                        whose key field is "key", the operator is
-                                        "In", and the values array contains only "value".
-                                        The requirements are ANDed.
-                                      type: object
-                                  type: object
-                                storageClassName:
-                                  description: 'Name of the StorageClass required
-                                    by the claim. More info: https://kubernetes.io/docs/concepts/storage/persistent-volumes#class-1'
-                                  type: string
-                                volumeMode:
-                                  description: volumeMode defines what type of volume
-                                    is required by the claim. Value of Filesystem
-                                    is implied when not included in claim spec.
-                                  type: string
-                                volumeName:
-                                  description: VolumeName is the binding reference
-                                    to the PersistentVolume backing this claim.
-                                  type: string
-                              type: object
-                          required:
-                          - spec
-                          type: object
-                      type: object
-                    fc:
-                      description: FC represents a Fibre Channel resource that is
-                        attached to a kubelet's host machine and then exposed to the
-                        pod.
-                      properties:
-                        fsType:
-                          description: 'Filesystem type to mount. Must be a filesystem
-                            type supported by the host operating system. Ex. "ext4",
-                            "xfs", "ntfs". Implicitly inferred to be "ext4" if unspecified.
-                            TODO: how do we prevent errors in the filesystem from
-                            compromising the machine'
-                          type: string
-                        lun:
-                          description: 'Optional: FC target lun number'
-                          format: int32
-                          type: integer
-                        readOnly:
-                          description: 'Optional: Defaults to false (read/write).
-                            ReadOnly here will force the ReadOnly setting in VolumeMounts.'
-                          type: boolean
-                        targetWWNs:
-                          description: 'Optional: FC target worldwide names (WWNs)'
-                          items:
-                            type: string
-                          type: array
-                        wwids:
-                          description: 'Optional: FC volume world wide identifiers
-                            (wwids) Either wwids or combination of targetWWNs and
-                            lun must be set, but not both simultaneously.'
-                          items:
-                            type: string
-                          type: array
-                      type: object
-                    flexVolume:
-                      description: FlexVolume represents a generic volume resource
-                        that is provisioned/attached using an exec based plugin.
-                      properties:
-                        driver:
-                          description: Driver is the name of the driver to use for
-                            this volume.
-                          type: string
-                        fsType:
-                          description: Filesystem type to mount. Must be a filesystem
-                            type supported by the host operating system. Ex. "ext4",
-                            "xfs", "ntfs". The default filesystem depends on FlexVolume
-                            script.
-                          type: string
-                        options:
-                          additionalProperties:
-                            type: string
-                          description: 'Optional: Extra command options if any.'
-                          type: object
-                        readOnly:
-                          description: 'Optional: Defaults to false (read/write).
-                            ReadOnly here will force the ReadOnly setting in VolumeMounts.'
-                          type: boolean
-                        secretRef:
-                          description: 'Optional: SecretRef is reference to the secret
-                            object containing sensitive information to pass to the
-                            plugin scripts. This may be empty if no secret object
-                            is specified. If the secret object contains more than
-                            one secret, all secrets are passed to the plugin scripts.'
-                          properties:
-                            name:
-                              description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
-                                TODO: Add other useful fields. apiVersion, kind, uid?'
-                              type: string
-                          type: object
-                      required:
-                      - driver
-                      type: object
-                    flocker:
-                      description: Flocker represents a Flocker volume attached to
-                        a kubelet's host machine. This depends on the Flocker control
-                        service being running
-                      properties:
-                        datasetName:
-                          description: Name of the dataset stored as metadata -> name
-                            on the dataset for Flocker should be considered as deprecated
-                          type: string
-                        datasetUUID:
-                          description: UUID of the dataset. This is unique identifier
-                            of a Flocker dataset
-                          type: string
-                      type: object
-                    gcePersistentDisk:
-                      description: 'GCEPersistentDisk represents a GCE Disk resource
-                        that is attached to a kubelet''s host machine and then exposed
-                        to the pod. More info: https://kubernetes.io/docs/concepts/storage/volumes#gcepersistentdisk'
-                      properties:
-                        fsType:
-                          description: 'Filesystem type of the volume that you want
-                            to mount. Tip: Ensure that the filesystem type is supported
-                            by the host operating system. Examples: "ext4", "xfs",
-                            "ntfs". Implicitly inferred to be "ext4" if unspecified.
-                            More info: https://kubernetes.io/docs/concepts/storage/volumes#gcepersistentdisk
-                            TODO: how do we prevent errors in the filesystem from
-                            compromising the machine'
-                          type: string
-                        partition:
-                          description: 'The partition in the volume that you want
-                            to mount. If omitted, the default is to mount by volume
-                            name. Examples: For volume /dev/sda1, you specify the
-                            partition as "1". Similarly, the volume partition for
-                            /dev/sda is "0" (or you can leave the property empty).
-                            More info: https://kubernetes.io/docs/concepts/storage/volumes#gcepersistentdisk'
-                          format: int32
-                          type: integer
-                        pdName:
-                          description: 'Unique name of the PD resource in GCE. Used
-                            to identify the disk in GCE. More info: https://kubernetes.io/docs/concepts/storage/volumes#gcepersistentdisk'
-                          type: string
-                        readOnly:
-                          description: 'ReadOnly here will force the ReadOnly setting
-                            in VolumeMounts. Defaults to false. More info: https://kubernetes.io/docs/concepts/storage/volumes#gcepersistentdisk'
-                          type: boolean
-                      required:
-                      - pdName
-                      type: object
-                    gitRepo:
-                      description: 'GitRepo represents a git repository at a particular
-                        revision. DEPRECATED: GitRepo is deprecated. To provision
-                        a container with a git repo, mount an EmptyDir into an InitContainer
-                        that clones the repo using git, then mount the EmptyDir into
-                        the Pod''s container.'
-                      properties:
-                        directory:
-                          description: Target directory name. Must not contain or
-                            start with '..'.  If '.' is supplied, the volume directory
-                            will be the git repository.  Otherwise, if specified,
-                            the volume will contain the git repository in the subdirectory
-                            with the given name.
-                          type: string
-                        repository:
-                          description: Repository URL
-                          type: string
-                        revision:
-                          description: Commit hash for the specified revision.
-                          type: string
-                      required:
-                      - repository
-                      type: object
-                    glusterfs:
-                      description: 'Glusterfs represents a Glusterfs mount on the
-                        host that shares a pod''s lifetime. More info: https://examples.k8s.io/volumes/glusterfs/README.md'
-                      properties:
-                        endpoints:
-                          description: 'EndpointsName is the endpoint name that details
-                            Glusterfs topology. More info: https://examples.k8s.io/volumes/glusterfs/README.md#create-a-pod'
-                          type: string
-                        path:
-                          description: 'Path is the Glusterfs volume path. More info:
-                            https://examples.k8s.io/volumes/glusterfs/README.md#create-a-pod'
-                          type: string
-                        readOnly:
-                          description: 'ReadOnly here will force the Glusterfs volume
-                            to be mounted with read-only permissions. Defaults to
-                            false. More info: https://examples.k8s.io/volumes/glusterfs/README.md#create-a-pod'
-                          type: boolean
-                      required:
-                      - endpoints
-                      - path
-                      type: object
-                    hostPath:
-                      description: 'HostPath represents a pre-existing file or directory
-                        on the host machine that is directly exposed to the container.
-                        This is generally used for system agents or other privileged
-                        things that are allowed to see the host machine. Most containers
-                        will NOT need this. More info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath
-                        --- TODO(jonesdl) We need to restrict who can use host directory
-                        mounts and who can/can not mount host directories as read/write.'
-                      properties:
-                        path:
-                          description: 'Path of the directory on the host. If the
-                            path is a symlink, it will follow the link to the real
-                            path. More info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath'
-                          type: string
-                        type:
-                          description: 'Type for HostPath Volume Defaults to "" More
-                            info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath'
-                          type: string
-                      required:
-                      - path
-                      type: object
-                    iscsi:
-                      description: 'ISCSI represents an ISCSI Disk resource that is
-                        attached to a kubelet''s host machine and then exposed to
-                        the pod. More info: https://examples.k8s.io/volumes/iscsi/README.md'
-                      properties:
-                        chapAuthDiscovery:
-                          description: whether support iSCSI Discovery CHAP authentication
-                          type: boolean
-                        chapAuthSession:
-                          description: whether support iSCSI Session CHAP authentication
-                          type: boolean
-                        fsType:
-                          description: 'Filesystem type of the volume that you want
-                            to mount. Tip: Ensure that the filesystem type is supported
-                            by the host operating system. Examples: "ext4", "xfs",
-                            "ntfs". Implicitly inferred to be "ext4" if unspecified.
-                            More info: https://kubernetes.io/docs/concepts/storage/volumes#iscsi
-                            TODO: how do we prevent errors in the filesystem from
-                            compromising the machine'
-                          type: string
-                        initiatorName:
-                          description: Custom iSCSI Initiator Name. If initiatorName
-                            is specified with iscsiInterface simultaneously, new iSCSI
-                            interface <target portal>:<volume name> will be created
-                            for the connection.
-                          type: string
-                        iqn:
-                          description: Target iSCSI Qualified Name.
-                          type: string
-                        iscsiInterface:
-                          description: iSCSI Interface Name that uses an iSCSI transport.
-                            Defaults to 'default' (tcp).
-                          type: string
-                        lun:
-                          description: iSCSI Target Lun number.
-                          format: int32
-                          type: integer
-                        portals:
-                          description: iSCSI Target Portal List. The portal is either
-                            an IP or ip_addr:port if the port is other than default
-                            (typically TCP ports 860 and 3260).
-                          items:
-                            type: string
-                          type: array
-                        readOnly:
-                          description: ReadOnly here will force the ReadOnly setting
-                            in VolumeMounts. Defaults to false.
-                          type: boolean
-                        secretRef:
-                          description: CHAP Secret for iSCSI target and initiator
-                            authentication
-                          properties:
-                            name:
-                              description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
-                                TODO: Add other useful fields. apiVersion, kind, uid?'
-                              type: string
-                          type: object
-                        targetPortal:
-                          description: iSCSI Target Portal. The Portal is either an
-                            IP or ip_addr:port if the port is other than default (typically
-                            TCP ports 860 and 3260).
-                          type: string
-                      required:
-                      - iqn
-                      - lun
-                      - targetPortal
-                      type: object
-                    name:
-                      description: 'Volume''s name. Must be a DNS_LABEL and unique
-                        within the pod. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
-                      type: string
-                    nfs:
-                      description: 'NFS represents an NFS mount on the host that shares
-                        a pod''s lifetime More info: https://kubernetes.io/docs/concepts/storage/volumes#nfs'
-                      properties:
-                        path:
-                          description: 'Path that is exported by the NFS server. More
-                            info: https://kubernetes.io/docs/concepts/storage/volumes#nfs'
-                          type: string
-                        readOnly:
-                          description: 'ReadOnly here will force the NFS export to
-                            be mounted with read-only permissions. Defaults to false.
-                            More info: https://kubernetes.io/docs/concepts/storage/volumes#nfs'
-                          type: boolean
-                        server:
-                          description: 'Server is the hostname or IP address of the
-                            NFS server. More info: https://kubernetes.io/docs/concepts/storage/volumes#nfs'
-                          type: string
-                      required:
-                      - path
-                      - server
-                      type: object
-                    persistentVolumeClaim:
-                      description: 'PersistentVolumeClaimVolumeSource represents a
-                        reference to a PersistentVolumeClaim in the same namespace.
-                        More info: https://kubernetes.io/docs/concepts/storage/persistent-volumes#persistentvolumeclaims'
-                      properties:
-                        claimName:
-                          description: 'ClaimName is the name of a PersistentVolumeClaim
-                            in the same namespace as the pod using this volume. More
-                            info: https://kubernetes.io/docs/concepts/storage/persistent-volumes#persistentvolumeclaims'
-                          type: string
-                        readOnly:
-                          description: Will force the ReadOnly setting in VolumeMounts.
-                            Default false.
-                          type: boolean
-                      required:
-                      - claimName
-                      type: object
-                    photonPersistentDisk:
-                      description: PhotonPersistentDisk represents a PhotonController
-                        persistent disk attached and mounted on kubelets host machine
-                      properties:
-                        fsType:
-                          description: Filesystem type to mount. Must be a filesystem
-                            type supported by the host operating system. Ex. "ext4",
-                            "xfs", "ntfs". Implicitly inferred to be "ext4" if unspecified.
-                          type: string
-                        pdID:
-                          description: ID that identifies Photon Controller persistent
-                            disk
-                          type: string
-                      required:
-                      - pdID
-                      type: object
-                    portworxVolume:
-                      description: PortworxVolume represents a portworx volume attached
-                        and mounted on kubelets host machine
-                      properties:
-                        fsType:
-                          description: FSType represents the filesystem type to mount
-                            Must be a filesystem type supported by the host operating
-                            system. Ex. "ext4", "xfs". Implicitly inferred to be "ext4"
-                            if unspecified.
-                          type: string
-                        readOnly:
-                          description: Defaults to false (read/write). ReadOnly here
-                            will force the ReadOnly setting in VolumeMounts.
-                          type: boolean
-                        volumeID:
-                          description: VolumeID uniquely identifies a Portworx volume
-                          type: string
-                      required:
-                      - volumeID
-                      type: object
-                    projected:
-                      description: Items for all in one resources secrets, configmaps,
-                        and downward API
-                      properties:
-                        defaultMode:
-                          description: Mode bits used to set permissions on created
-                            files by default. Must be an octal value between 0000
-                            and 0777 or a decimal value between 0 and 511. YAML accepts
-                            both octal and decimal values, JSON requires decimal values
-                            for mode bits. Directories within the path are not affected
-                            by this setting. This might be in conflict with other
-                            options that affect the file mode, like fsGroup, and the
-                            result can be other mode bits set.
-                          format: int32
-                          type: integer
-                        sources:
-                          description: list of volume projections
-                          items:
-                            description: Projection that may be projected along with
-                              other supported volume types
-                            properties:
-                              configMap:
-                                description: information about the configMap data
-                                  to project
-                                properties:
-                                  items:
-                                    description: If unspecified, each key-value pair
-                                      in the Data field of the referenced ConfigMap
-                                      will be projected into the volume as a file
-                                      whose name is the key and content is the value.
-                                      If specified, the listed keys will be projected
-                                      into the specified paths, and unlisted keys
-                                      will not be present. If a key is specified which
-                                      is not present in the ConfigMap, the volume
-                                      setup will error unless it is marked optional.
-                                      Paths must be relative and may not contain the
-                                      '..' path or start with '..'.
-                                    items:
-                                      description: Maps a string key to a path within
-                                        a volume.
-                                      properties:
-                                        key:
-                                          description: The key to project.
-                                          type: string
-                                        mode:
-                                          description: 'Optional: mode bits used to
-                                            set permissions on this file. Must be
-                                            an octal value between 0000 and 0777 or
-                                            a decimal value between 0 and 511. YAML
-                                            accepts both octal and decimal values,
-                                            JSON requires decimal values for mode
-                                            bits. If not specified, the volume defaultMode
-                                            will be used. This might be in conflict
-                                            with other options that affect the file
-                                            mode, like fsGroup, and the result can
-                                            be other mode bits set.'
-                                          format: int32
-                                          type: integer
-                                        path:
-                                          description: The relative path of the file
-                                            to map the key to. May not be an absolute
-                                            path. May not contain the path element
-                                            '..'. May not start with the string '..'.
-                                          type: string
-                                      required:
-                                      - key
-                                      - path
-                                      type: object
-                                    type: array
-                                  name:
-                                    description: 'Name of the referent. More info:
-                                      https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
-                                      TODO: Add other useful fields. apiVersion, kind,
-                                      uid?'
-                                    type: string
-                                  optional:
-                                    description: Specify whether the ConfigMap or
-                                      its keys must be defined
-                                    type: boolean
-                                type: object
-                              downwardAPI:
-                                description: information about the downwardAPI data
-                                  to project
-                                properties:
-                                  items:
-                                    description: Items is a list of DownwardAPIVolume
-                                      file
-                                    items:
-                                      description: DownwardAPIVolumeFile represents
-                                        information to create the file containing
-                                        the pod field
-                                      properties:
-                                        fieldRef:
-                                          description: 'Required: Selects a field
-                                            of the pod: only annotations, labels,
-                                            name and namespace are supported.'
-                                          properties:
-                                            apiVersion:
-                                              description: Version of the schema the
-                                                FieldPath is written in terms of,
-                                                defaults to "v1".
-                                              type: string
-                                            fieldPath:
-                                              description: Path of the field to select
-                                                in the specified API version.
-                                              type: string
-                                          required:
-                                          - fieldPath
-                                          type: object
-                                        mode:
-                                          description: 'Optional: mode bits used to
-                                            set permissions on this file, must be
-                                            an octal value between 0000 and 0777 or
-                                            a decimal value between 0 and 511. YAML
-                                            accepts both octal and decimal values,
-                                            JSON requires decimal values for mode
-                                            bits. If not specified, the volume defaultMode
-                                            will be used. This might be in conflict
-                                            with other options that affect the file
-                                            mode, like fsGroup, and the result can
-                                            be other mode bits set.'
-                                          format: int32
-                                          type: integer
-                                        path:
-                                          description: 'Required: Path is  the relative
-                                            path name of the file to be created. Must
-                                            not be absolute or contain the ''..''
-                                            path. Must be utf-8 encoded. The first
-                                            item of the relative path must not start
-                                            with ''..'''
-                                          type: string
-                                        resourceFieldRef:
-                                          description: 'Selects a resource of the
-                                            container: only resources limits and requests
-                                            (limits.cpu, limits.memory, requests.cpu
-                                            and requests.memory) are currently supported.'
-                                          properties:
-                                            containerName:
-                                              description: 'Container name: required
-                                                for volumes, optional for env vars'
-                                              type: string
-                                            divisor:
-                                              anyOf:
-                                              - type: integer
-                                              - type: string
-                                              description: Specifies the output format
-                                                of the exposed resources, defaults
-                                                to "1"
-                                              pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
-                                              x-kubernetes-int-or-string: true
-                                            resource:
-                                              description: 'Required: resource to
-                                                select'
-                                              type: string
-                                          required:
-                                          - resource
-                                          type: object
-                                      required:
-                                      - path
-                                      type: object
-                                    type: array
-                                type: object
-                              secret:
-                                description: information about the secret data to
-                                  project
-                                properties:
-                                  items:
-                                    description: If unspecified, each key-value pair
-                                      in the Data field of the referenced Secret will
-                                      be projected into the volume as a file whose
-                                      name is the key and content is the value. If
-                                      specified, the listed keys will be projected
-                                      into the specified paths, and unlisted keys
-                                      will not be present. If a key is specified which
-                                      is not present in the Secret, the volume setup
-                                      will error unless it is marked optional. Paths
-                                      must be relative and may not contain the '..'
-                                      path or start with '..'.
-                                    items:
-                                      description: Maps a string key to a path within
-                                        a volume.
-                                      properties:
-                                        key:
-                                          description: The key to project.
-                                          type: string
-                                        mode:
-                                          description: 'Optional: mode bits used to
-                                            set permissions on this file. Must be
-                                            an octal value between 0000 and 0777 or
-                                            a decimal value between 0 and 511. YAML
-                                            accepts both octal and decimal values,
-                                            JSON requires decimal values for mode
-                                            bits. If not specified, the volume defaultMode
-                                            will be used. This might be in conflict
-                                            with other options that affect the file
-                                            mode, like fsGroup, and the result can
-                                            be other mode bits set.'
-                                          format: int32
-                                          type: integer
-                                        path:
-                                          description: The relative path of the file
-                                            to map the key to. May not be an absolute
-                                            path. May not contain the path element
-                                            '..'. May not start with the string '..'.
-                                          type: string
-                                      required:
-                                      - key
-                                      - path
-                                      type: object
-                                    type: array
-                                  name:
-                                    description: 'Name of the referent. More info:
-                                      https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
-                                      TODO: Add other useful fields. apiVersion, kind,
-                                      uid?'
-                                    type: string
-                                  optional:
-                                    description: Specify whether the Secret or its
-                                      key must be defined
-                                    type: boolean
-                                type: object
-                              serviceAccountToken:
-                                description: information about the serviceAccountToken
-                                  data to project
-                                properties:
-                                  audience:
-                                    description: Audience is the intended audience
-                                      of the token. A recipient of a token must identify
-                                      itself with an identifier specified in the audience
-                                      of the token, and otherwise should reject the
-                                      token. The audience defaults to the identifier
-                                      of the apiserver.
-                                    type: string
-                                  expirationSeconds:
-                                    description: ExpirationSeconds is the requested
-                                      duration of validity of the service account
-                                      token. As the token approaches expiration, the
-                                      kubelet volume plugin will proactively rotate
-                                      the service account token. The kubelet will
-                                      start trying to rotate the token if the token
-                                      is older than 80 percent of its time to live
-                                      or if the token is older than 24 hours.Defaults
-                                      to 1 hour and must be at least 10 minutes.
-                                    format: int64
-                                    type: integer
-                                  path:
-                                    description: Path is the path relative to the
-                                      mount point of the file to project the token
-                                      into.
-                                    type: string
-                                required:
-                                - path
-                                type: object
-                            type: object
-                          type: array
-                      type: object
-                    quobyte:
-                      description: Quobyte represents a Quobyte mount on the host
-                        that shares a pod's lifetime
-                      properties:
-                        group:
-                          description: Group to map volume access to Default is no
-                            group
-                          type: string
-                        readOnly:
-                          description: ReadOnly here will force the Quobyte volume
-                            to be mounted with read-only permissions. Defaults to
-                            false.
-                          type: boolean
-                        registry:
-                          description: Registry represents a single or multiple Quobyte
-                            Registry services specified as a string as host:port pair
-                            (multiple entries are separated with commas) which acts
-                            as the central registry for volumes
-                          type: string
-                        tenant:
-                          description: Tenant owning the given Quobyte volume in the
-                            Backend Used with dynamically provisioned Quobyte volumes,
-                            value is set by the plugin
-                          type: string
-                        user:
-                          description: User to map volume access to Defaults to serivceaccount
-                            user
-                          type: string
-                        volume:
-                          description: Volume is a string that references an already
-                            created Quobyte volume by name.
-                          type: string
-                      required:
-                      - registry
-                      - volume
-                      type: object
-                    rbd:
-                      description: 'RBD represents a Rados Block Device mount on the
-                        host that shares a pod''s lifetime. More info: https://examples.k8s.io/volumes/rbd/README.md'
-                      properties:
-                        fsType:
-                          description: 'Filesystem type of the volume that you want
-                            to mount. Tip: Ensure that the filesystem type is supported
-                            by the host operating system. Examples: "ext4", "xfs",
-                            "ntfs". Implicitly inferred to be "ext4" if unspecified.
-                            More info: https://kubernetes.io/docs/concepts/storage/volumes#rbd
-                            TODO: how do we prevent errors in the filesystem from
-                            compromising the machine'
-                          type: string
-                        image:
-                          description: 'The rados image name. More info: https://examples.k8s.io/volumes/rbd/README.md#how-to-use-it'
-                          type: string
-                        keyring:
-                          description: 'Keyring is the path to key ring for RBDUser.
-                            Default is /etc/ceph/keyring. More info: https://examples.k8s.io/volumes/rbd/README.md#how-to-use-it'
-                          type: string
-                        monitors:
-                          description: 'A collection of Ceph monitors. More info:
-                            https://examples.k8s.io/volumes/rbd/README.md#how-to-use-it'
-                          items:
-                            type: string
-                          type: array
-                        pool:
-                          description: 'The rados pool name. Default is rbd. More
-                            info: https://examples.k8s.io/volumes/rbd/README.md#how-to-use-it'
-                          type: string
-                        readOnly:
-                          description: 'ReadOnly here will force the ReadOnly setting
-                            in VolumeMounts. Defaults to false. More info: https://examples.k8s.io/volumes/rbd/README.md#how-to-use-it'
-                          type: boolean
-                        secretRef:
-                          description: 'SecretRef is name of the authentication secret
-                            for RBDUser. If provided overrides keyring. Default is
-                            nil. More info: https://examples.k8s.io/volumes/rbd/README.md#how-to-use-it'
-                          properties:
-                            name:
-                              description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
-                                TODO: Add other useful fields. apiVersion, kind, uid?'
-                              type: string
-                          type: object
-                        user:
-                          description: 'The rados user name. Default is admin. More
-                            info: https://examples.k8s.io/volumes/rbd/README.md#how-to-use-it'
-                          type: string
-                      required:
-                      - image
-                      - monitors
-                      type: object
-                    scaleIO:
-                      description: ScaleIO represents a ScaleIO persistent volume
-                        attached and mounted on Kubernetes nodes.
-                      properties:
-                        fsType:
-                          description: Filesystem type to mount. Must be a filesystem
-                            type supported by the host operating system. Ex. "ext4",
-                            "xfs", "ntfs". Default is "xfs".
-                          type: string
-                        gateway:
-                          description: The host address of the ScaleIO API Gateway.
-                          type: string
-                        protectionDomain:
-                          description: The name of the ScaleIO Protection Domain for
-                            the configured storage.
-                          type: string
-                        readOnly:
-                          description: Defaults to false (read/write). ReadOnly here
-                            will force the ReadOnly setting in VolumeMounts.
-                          type: boolean
-                        secretRef:
-                          description: SecretRef references to the secret for ScaleIO
-                            user and other sensitive information. If this is not provided,
-                            Login operation will fail.
-                          properties:
-                            name:
-                              description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
-                                TODO: Add other useful fields. apiVersion, kind, uid?'
-                              type: string
-                          type: object
-                        sslEnabled:
-                          description: Flag to enable/disable SSL communication with
-                            Gateway, default false
-                          type: boolean
-                        storageMode:
-                          description: Indicates whether the storage for a volume
-                            should be ThickProvisioned or ThinProvisioned. Default
-                            is ThinProvisioned.
-                          type: string
-                        storagePool:
-                          description: The ScaleIO Storage Pool associated with the
-                            protection domain.
-                          type: string
-                        system:
-                          description: The name of the storage system as configured
-                            in ScaleIO.
-                          type: string
-                        volumeName:
-                          description: The name of a volume already created in the
-                            ScaleIO system that is associated with this volume source.
-                          type: string
-                      required:
-                      - gateway
-                      - secretRef
-                      - system
-                      type: object
-                    secret:
-                      description: 'Secret represents a secret that should populate
-                        this volume. More info: https://kubernetes.io/docs/concepts/storage/volumes#secret'
-                      properties:
-                        defaultMode:
-                          description: 'Optional: mode bits used to set permissions
-                            on created files by default. Must be an octal value between
-                            0000 and 0777 or a decimal value between 0 and 511. YAML
-                            accepts both octal and decimal values, JSON requires decimal
-                            values for mode bits. Defaults to 0644. Directories within
-                            the path are not affected by this setting. This might
-                            be in conflict with other options that affect the file
-                            mode, like fsGroup, and the result can be other mode bits
-                            set.'
-                          format: int32
-                          type: integer
-                        items:
-                          description: If unspecified, each key-value pair in the
-                            Data field of the referenced Secret will be projected
-                            into the volume as a file whose name is the key and content
-                            is the value. If specified, the listed keys will be projected
-                            into the specified paths, and unlisted keys will not be
-                            present. If a key is specified which is not present in
-                            the Secret, the volume setup will error unless it is marked
-                            optional. Paths must be relative and may not contain the
-                            '..' path or start with '..'.
-                          items:
-                            description: Maps a string key to a path within a volume.
-                            properties:
-                              key:
-                                description: The key to project.
-                                type: string
-                              mode:
-                                description: 'Optional: mode bits used to set permissions
-                                  on this file. Must be an octal value between 0000
-                                  and 0777 or a decimal value between 0 and 511. YAML
-                                  accepts both octal and decimal values, JSON requires
-                                  decimal values for mode bits. If not specified,
-                                  the volume defaultMode will be used. This might
-                                  be in conflict with other options that affect the
-                                  file mode, like fsGroup, and the result can be other
-                                  mode bits set.'
-                                format: int32
-                                type: integer
-                              path:
-                                description: The relative path of the file to map
-                                  the key to. May not be an absolute path. May not
-                                  contain the path element '..'. May not start with
-                                  the string '..'.
-                                type: string
-                            required:
-                            - key
-                            - path
-                            type: object
-                          type: array
-                        optional:
-                          description: Specify whether the Secret or its keys must
-                            be defined
-                          type: boolean
-                        secretName:
-                          description: 'Name of the secret in the pod''s namespace
-                            to use. More info: https://kubernetes.io/docs/concepts/storage/volumes#secret'
-                          type: string
-                      type: object
-                    storageos:
-                      description: StorageOS represents a StorageOS volume attached
-                        and mounted on Kubernetes nodes.
-                      properties:
-                        fsType:
-                          description: Filesystem type to mount. Must be a filesystem
-                            type supported by the host operating system. Ex. "ext4",
-                            "xfs", "ntfs". Implicitly inferred to be "ext4" if unspecified.
-                          type: string
-                        readOnly:
-                          description: Defaults to false (read/write). ReadOnly here
-                            will force the ReadOnly setting in VolumeMounts.
-                          type: boolean
-                        secretRef:
-                          description: SecretRef specifies the secret to use for obtaining
-                            the StorageOS API credentials.  If not specified, default
-                            values will be attempted.
-                          properties:
-                            name:
-                              description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
-                                TODO: Add other useful fields. apiVersion, kind, uid?'
-                              type: string
-                          type: object
-                        volumeName:
-                          description: VolumeName is the human-readable name of the
-                            StorageOS volume.  Volume names are only unique within
-                            a namespace.
-                          type: string
-                        volumeNamespace:
-                          description: VolumeNamespace specifies the scope of the
-                            volume within StorageOS.  If no namespace is specified
-                            then the Pod's namespace will be used.  This allows the
-                            Kubernetes name scoping to be mirrored within StorageOS
-                            for tighter integration. Set VolumeName to any name to
-                            override the default behaviour. Set to "default" if you
-                            are not using namespaces within StorageOS. Namespaces
-                            that do not pre-exist within StorageOS will be created.
-                          type: string
-                      type: object
-                    vsphereVolume:
-                      description: VsphereVolume represents a vSphere volume attached
-                        and mounted on kubelets host machine
-                      properties:
-                        fsType:
-                          description: Filesystem type to mount. Must be a filesystem
-                            type supported by the host operating system. Ex. "ext4",
-                            "xfs", "ntfs". Implicitly inferred to be "ext4" if unspecified.
-                          type: string
-                        storagePolicyID:
-                          description: Storage Policy Based Management (SPBM) profile
-                            ID associated with the StoragePolicyName.
-                          type: string
-                        storagePolicyName:
-                          description: Storage Policy Based Management (SPBM) profile
-                            name.
-                          type: string
-                        volumePath:
-                          description: Path that identifies vSphere volume vmdk
-                          type: string
-                      required:
-                      - volumePath
-                      type: object
-                  required:
-                  - name
-                  type: object
-                type: array
             required:
-            - allowRestart
-            - brokerImage
-            - env
+            - controllerImage
             - hostPath
-            - imagePullPolicy
-            - replicaPerGroup
             - resources
-            - scalePodName
             - size
             - storageMode
             - volumeClaimTemplates
-            - volumes
             type: object
           status:
-            description: BrokerStatus defines the observed state of Broker
+            description: ControllerStatus defines the observed state of Controller
             properties:
               nodes:
                 description: 'INSERT ADDITIONAL STATUS FIELD - define observed state
diff --git a/deploy/role.yaml b/deploy/role.yaml
index eaaca0c..f9531a7 100644
--- a/deploy/role.yaml
+++ b/deploy/role.yaml
@@ -63,6 +63,18 @@ rules:
   - patch
   - update
   - watch
+- apiGroups:
+  - ""
+  resources:
+  - services
+  verbs:
+  - create
+  - delete
+  - get
+  - list
+  - patch
+  - update
+  - watch
 - apiGroups:
   - apps
   resources:
@@ -151,6 +163,32 @@ rules:
   - get
   - patch
   - update
+- apiGroups:
+  - rocketmq.apache.org
+  resources:
+  - controllers
+  verbs:
+  - create
+  - delete
+  - get
+  - list
+  - patch
+  - update
+  - watch
+- apiGroups:
+  - rocketmq.apache.org
+  resources:
+  - controllers/finalizers
+  verbs:
+  - update
+- apiGroups:
+  - rocketmq.apache.org
+  resources:
+  - controllers/status
+  verbs:
+  - get
+  - patch
+  - update
 - apiGroups:
   - rocketmq.apache.org
   resources:
diff --git a/deploy/storage/hostpath/prepare-host-path.sh b/deploy/storage/hostpath/prepare-host-path.sh
index 8071dd6..e41cdcb 100755
--- a/deploy/storage/hostpath/prepare-host-path.sh
+++ b/deploy/storage/hostpath/prepare-host-path.sh
@@ -24,6 +24,8 @@
 NAME_SERVER_DATA_PATH=/data/rocketmq/nameserver
 # BROKER_DATA_PATH should be the same with the hostPath field in rocketmq_v1alpha1_broker_cr.yaml
 BROKER_DATA_PATH=/data/rocketmq/broker
+# CONTROLLER_DATA_PATH should be the same with the hostPath field in rocketmq_v1alpha1_controller_cr.yaml
+CONTROLLER_DATA_PATH=/data/rocketmq/controller
 # ROCKETMQ_UID and ROCKETMQ_GID should be the same with docker image settings
 ROCKETMQ_UID=3000
 ROCKETMQ_GID=3000
@@ -40,7 +42,8 @@ prepare_dir()
 
 prepare_dir $BROKER_DATA_PATH $ROCKETMQ_UID $ROCKETMQ_GID
 prepare_dir $NAME_SERVER_DATA_PATH $ROCKETMQ_UID $ROCKETMQ_GID
+prepare_dir $CONTROLLER_DATA_PATH $ROCKETMQ_UID $ROCKETMQ_GID
 
 echo "Changed hostPath $NAME_SERVER_DATA_PATH uid to $ROCKETMQ_UID, gid to $ROCKETMQ_GID"
 echo "Changed hostPath $BROKER_DATA_PATH uid to $ROCKETMQ_UID, gid to $ROCKETMQ_GID"
-
+echo "Changed hostPath $CONTROLLER_DATA_PATH uid to $ROCKETMQ_UID, gid to $ROCKETMQ_GID"
diff --git a/example/rocketmq_v1alpha1_controller_cr.yaml b/example/rocketmq_v1alpha1_controller_cr.yaml
new file mode 100644
index 0000000..f1fec82
--- /dev/null
+++ b/example/rocketmq_v1alpha1_controller_cr.yaml
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+apiVersion: rocketmq.apache.org/v1alpha1
+kind: Controller
+metadata:
+  # name of controller cluster
+  name: controller
+spec:
+  # size is the number of controllers.
+  size: 1
+  # controllerImage is the customized docker image repo of the RocketMQ Controller
+  controllerImage: apacherocketmq/rocketmq-nameserver:5.0.0-alpine-operator-0.4.0
+  # imagePullPolicy is the image pull policy
+  imagePullPolicy: IfNotPresent
+  # resources describes the compute resource requirements and limits
+  resources:
+    requests:
+      memory: "2Gi"
+    limits:
+      memory: "2Gi"
+  # storageMode can be EmptyDir, HostPath, StorageClass
+  storageMode: HostPath
+  # hostPath is the local path to store data
+  hostPath: /data/rocketmq/controller
+  # volumeClaimTemplates defines the storageClass
+  volumeClaimTemplates:
+    - metadata:
+        name: controller-storage
+      spec:
+        accessModes: [ "ReadWriteOnce" ]
+        resources:
+          requests:
+            storage: 8Gi
diff --git a/images/broker/alpine/brokerGenConfig.sh b/images/broker/alpine/brokerGenConfig.sh
index a4ba21b..033dda7 100755
--- a/images/broker/alpine/brokerGenConfig.sh
+++ b/images/broker/alpine/brokerGenConfig.sh
@@ -31,6 +31,11 @@ function create_config() {
     if [ $BROKER_ID != 0 ]; then
         sed -i 's/brokerRole=.*/brokerRole=SLAVE/g' $BROKER_CONFIG_FILE
     fi
+
+    if [ "${enableControllerMode}" = "true" ]; then
+        echo "enableControllerMode=true" >> $BROKER_CONFIG_FILE
+        echo "controllerAddr=${controllerAddr}" >> $BROKER_CONFIG_FILE
+    fi
 }
 
 create_config
diff --git a/images/broker/alpine/build-broker-image.sh b/images/broker/alpine/build-broker-image.sh
index 7d424cd..7a20063 100755
--- a/images/broker/alpine/build-broker-image.sh
+++ b/images/broker/alpine/build-broker-image.sh
@@ -34,10 +34,10 @@ fi
 
 ROCKETMQ_VERSION=$1
 DOCKERHUB_REPO=apacherocketmq/rocketmq-broker
-IMAGE_NAME=${DOCKERHUB_REPO}:${ROCKETMQ_VERSION}-alpine-operator-0.3.0
+IMAGE_NAME=${DOCKERHUB_REPO}:${ROCKETMQ_VERSION}-alpine-operator-0.4.0
 
 checkVersion $ROCKETMQ_VERSION
 
 docker build -t $IMAGE_NAME --build-arg version=${ROCKETMQ_VERSION} .
 
-docker push $IMAGE_NAME
+#docker push $IMAGE_NAME
diff --git a/images/controller/alpine/Dockerfile b/images/controller/alpine/Dockerfile
new file mode 100644
index 0000000..dadae8d
--- /dev/null
+++ b/images/controller/alpine/Dockerfile
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
+
+RUN apk add --no-cache bash gettext nmap-ncat openssl busybox-extras
+
+ARG version
+
+# Rocketmq version
+ENV ROCKETMQ_VERSION ${version}
+
+# Rocketmq home
+ENV ROCKETMQ_HOME  /root/rocketmq/controller
+
+WORKDIR  ${ROCKETMQ_HOME}
+
+# Install
+RUN set -eux; \
+    apk add --virtual .build-deps curl gnupg unzip; \
+    curl https://archive.apache.org/dist/rocketmq/${ROCKETMQ_VERSION}/rocketmq-all-${ROCKETMQ_VERSION}-bin-release.zip -o rocketmq.zip; \
+    curl https://archive.apache.org/dist/rocketmq/${ROCKETMQ_VERSION}/rocketmq-all-${ROCKETMQ_VERSION}-bin-release.zip.asc -o rocketmq.zip.asc; \
+	curl -L https://www.apache.org/dist/rocketmq/KEYS -o KEYS; \
+	gpg --import KEYS; \
+    gpg --batch --verify rocketmq.zip.asc rocketmq.zip; \
+    unzip rocketmq.zip; \
+	mv rocketmq-all*/* . ; \
+	rmdir rocketmq-all* ; \
+	rm rocketmq.zip ; \
+    rm rocketmq.zip.asc KEYS; \
+	apk del .build-deps ; \
+    rm -rf /var/cache/apk/* ; \
+    rm -rf /tmp/*
+
+# Copy customized scripts
+COPY runserver-customize.sh ${ROCKETMQ_HOME}/bin/
+
+# Expose controller port
+EXPOSE 9878
+
+# Override customized scripts for controller
+# Export Java options
+# Add ${JAVA_HOME}/lib/ext as java.ext.dirs
+RUN mv ${ROCKETMQ_HOME}/bin/runserver-customize.sh ${ROCKETMQ_HOME}/bin/runserver.sh \
+ && chmod a+x ${ROCKETMQ_HOME}/bin/runserver.sh \
+ && chmod a+x ${ROCKETMQ_HOME}/bin/mqnamesrv \
+ && export JAVA_OPT=" -Duser.home=/opt" \
+ && sed -i 's/${JAVA_HOME}\/jre\/lib\/ext/${JAVA_HOME}\/jre\/lib\/ext:${JAVA_HOME}\/lib\/ext/' ${ROCKETMQ_HOME}/bin/tools.sh
+
+WORKDIR ${ROCKETMQ_HOME}/bin
+
+CMD /bin/bash ./mqcontroller -c ${ROCKETMQ_HOME}/conf/controller/controller.conf
diff --git a/images/broker/alpine/build-broker-image.sh b/images/controller/alpine/build-controller-image.sh
similarity index 94%
copy from images/broker/alpine/build-broker-image.sh
copy to images/controller/alpine/build-controller-image.sh
index 7d424cd..e843e2f 100755
--- a/images/broker/alpine/build-broker-image.sh
+++ b/images/controller/alpine/build-controller-image.sh
@@ -33,11 +33,11 @@ if [ $# -lt 1 ]; then
 fi
 
 ROCKETMQ_VERSION=$1
-DOCKERHUB_REPO=apacherocketmq/rocketmq-broker
-IMAGE_NAME=${DOCKERHUB_REPO}:${ROCKETMQ_VERSION}-alpine-operator-0.3.0
+DOCKERHUB_REPO=apacherocketmq/rocketmq-controller
+IMAGE_NAME=${DOCKERHUB_REPO}:${ROCKETMQ_VERSION}-alpine-operator-0.4.0
 
 checkVersion $ROCKETMQ_VERSION
 
 docker build -t $IMAGE_NAME --build-arg version=${ROCKETMQ_VERSION} .
 
-docker push $IMAGE_NAME
+#docker push $IMAGE_NAME
\ No newline at end of file
diff --git a/images/namesrv/alpine/runserver-customize.sh b/images/controller/alpine/runserver-customize.sh
similarity index 82%
copy from images/namesrv/alpine/runserver-customize.sh
copy to images/controller/alpine/runserver-customize.sh
index 1716407..38e2526 100755
--- a/images/namesrv/alpine/runserver-customize.sh
+++ b/images/controller/alpine/runserver-customize.sh
@@ -136,6 +136,14 @@ calculate_heap_sizes
 Xms=$MAX_HEAP_SIZE
 Xmx=$MAX_HEAP_SIZE
 Xmn=$HEAP_NEWSIZE
+
+# For controller deployed in K8s
+if test -n "$MY_POD_NAME" ;then
+    echo "MY_POD_NAME=$MY_POD_NAME"
+    controllerDLegerSelfIdValue=$(echo "$MY_POD_NAME" | tr -d -)
+    export controllerDLegerSelfId=$controllerDLegerSelfIdValue
+fi
+
 # Set for `JAVA_OPT`.
 JAVA_OPT="${JAVA_OPT} -server -Xms${Xms} -Xmx${Xmx} -Xmn${Xmn}"
 JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8  -XX:-UseParNewGC"
@@ -143,8 +151,26 @@ JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc.log -XX:+PrintGCDe
 JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
 JAVA_OPT="${JAVA_OPT}  -XX:-UseLargePages"
 JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
-#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
 JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
 JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
 
+echo "=============env beg============="
+env
+echo "=============env end============="
+
+#TODO cp from config map
+cp ${ROCKETMQ_HOME}/conf/controller/controller-standalone.conf ${ROCKETMQ_HOME}/conf/controller/controller.conf
+
+
+sed -i "s/^controllerDLegerGroup.*$/controllerDLegerGroup=${controllerDLegerGroup}/" ${ROCKETMQ_HOME}/conf/controller/controller.conf
+
+sed -i "s/^controllerDLegerPeers.*$/controllerDLegerPeers=${controllerDLegerPeers}/" ${ROCKETMQ_HOME}/conf/controller/controller.conf
+
+sed -i "s/^controllerDLegerSelfId.*$/controllerDLegerSelfId=${controllerDLegerSelfId}/" ${ROCKETMQ_HOME}/conf/controller/controller.conf
+
+#perl not exists, sed can't process 
+#perl -pi -e "s|controllerStorePath.*$|controllerStorePath=${controllerStorePath}|g" ${ROCKETMQ_HOME}/conf/controller/controller.conf
+echo "" >> ${ROCKETMQ_HOME}/conf/controller/controller.conf
+echo "controllerStorePath=${controllerStorePath}" >> ${ROCKETMQ_HOME}/conf/controller/controller.conf
+
 $JAVA ${JAVA_OPT} $@
diff --git a/images/namesrv/alpine/build-namesrv-image.sh b/images/namesrv/alpine/build-namesrv-image.sh
index c8dce11..9cc58cf 100755
--- a/images/namesrv/alpine/build-namesrv-image.sh
+++ b/images/namesrv/alpine/build-namesrv-image.sh
@@ -34,10 +34,10 @@ fi
 
 ROCKETMQ_VERSION=$1
 DOCKERHUB_REPO=apacherocketmq/rocketmq-nameserver
-IMAGE_NAME=${DOCKERHUB_REPO}:${ROCKETMQ_VERSION}-alpine-operator-0.3.0
+IMAGE_NAME=${DOCKERHUB_REPO}:${ROCKETMQ_VERSION}-alpine-operator-0.4.0
 
 checkVersion $ROCKETMQ_VERSION
 
 docker build -t $IMAGE_NAME --build-arg version=${ROCKETMQ_VERSION} .
 
-docker push $IMAGE_NAME
\ No newline at end of file
+#docker push $IMAGE_NAME
\ No newline at end of file
diff --git a/images/namesrv/alpine/runserver-customize.sh b/images/namesrv/alpine/runserver-customize.sh
index 1716407..eaf21ff 100755
--- a/images/namesrv/alpine/runserver-customize.sh
+++ b/images/namesrv/alpine/runserver-customize.sh
@@ -136,6 +136,14 @@ calculate_heap_sizes
 Xms=$MAX_HEAP_SIZE
 Xmx=$MAX_HEAP_SIZE
 Xmn=$HEAP_NEWSIZE
+
+# For controller deployed in K8s
+if test -n "$MY_POD_NAME" ;then
+    echo "MY_POD_NAME=$MY_POD_NAME"
+    controllerDLegerSelfIdValue=$(echo "$MY_POD_NAME" | tr -d -)
+    export controllerDLegerSelfId=$controllerDLegerSelfIdValue
+fi
+
 # Set for `JAVA_OPT`.
 JAVA_OPT="${JAVA_OPT} -server -Xms${Xms} -Xmx${Xmx} -Xmn${Xmn}"
 JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8  -XX:-UseParNewGC"
diff --git a/main.go b/main.go
index 9e97f4c..7aae21a 100644
--- a/main.go
+++ b/main.go
@@ -19,12 +19,14 @@ package main
 
 import (
 	"flag"
+	"os"
+
 	rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1"
 	"github.com/apache/rocketmq-operator/pkg/controller/broker"
 	"github.com/apache/rocketmq-operator/pkg/controller/console"
+	rmqcontroller "github.com/apache/rocketmq-operator/pkg/controller/controller"
 	"github.com/apache/rocketmq-operator/pkg/controller/nameservice"
 	"github.com/apache/rocketmq-operator/pkg/controller/topictransfer"
-	"os"
 
 	// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
 	// to ensure that exec-entrypoint and run can make use of them.
@@ -106,6 +108,11 @@ func main() {
 		os.Exit(1)
 	}
 
+	if err := rmqcontroller.SetupWithManager(mgr); err != nil {
+		setupLog.Error(err, "unable to add dledger controller to manager")
+		os.Exit(1)
+	}
+
 	//+kubebuilder:scaffold:builder
 
 	if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
diff --git a/pkg/apis/rocketmq/v1alpha1/broker_types.go b/pkg/apis/rocketmq/v1alpha1/broker_types.go
index 12c47a0..1647f9a 100644
--- a/pkg/apis/rocketmq/v1alpha1/broker_types.go
+++ b/pkg/apis/rocketmq/v1alpha1/broker_types.go
@@ -31,9 +31,15 @@ type BrokerSpec struct {
 	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
 	// Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
 	// Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
+	// Size of broker clusters
 	Size int `json:"size"`
 	// NameServers defines the name service list e.g. 192.168.1.1:9876;192.168.1.2:9876
 	NameServers string `json:"nameServers,omitempty"`
+	// ClusterMode defines the way to be a broker cluster, valid values can be one of the following:
+	// - STATIC: default clusters with static broker roles
+	// - CONTROLLER: clusters with DLedger Controller since RocketMQ 5.0
+	// - CONTAINER: [NOT implemented yet] enabling broker containers since RocketMQ 5.0
+	ClusterMode string `json:"clusterMode,omitempty"`
 	// ReplicaPerGroup each broker cluster's replica number
 	ReplicaPerGroup int `json:"replicaPerGroup"`
 	// BaseImage is the broker image to use for the Pods
diff --git a/pkg/apis/rocketmq/v1alpha1/broker_types.go b/pkg/apis/rocketmq/v1alpha1/controller_types.go
similarity index 71%
copy from pkg/apis/rocketmq/v1alpha1/broker_types.go
copy to pkg/apis/rocketmq/v1alpha1/controller_types.go
index 12c47a0..5e8152c 100644
--- a/pkg/apis/rocketmq/v1alpha1/broker_types.go
+++ b/pkg/apis/rocketmq/v1alpha1/controller_types.go
@@ -25,39 +25,33 @@ import (
 // EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
 // NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.
 
-// BrokerSpec defines the desired state of Broker
+// ControllerSpec defines the desired state of Controller
 // +k8s:openapi-gen=true
-type BrokerSpec struct {
+type ControllerSpec struct {
 	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
 	// Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
 	// Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
+
+	// size of controller
 	Size int `json:"size"`
-	// NameServers defines the name service list e.g. 192.168.1.1:9876;192.168.1.2:9876
-	NameServers string `json:"nameServers,omitempty"`
-	// ReplicaPerGroup each broker cluster's replica number
-	ReplicaPerGroup int `json:"replicaPerGroup"`
-	// BaseImage is the broker image to use for the Pods
-	BrokerImage string `json:"brokerImage"`
+
+	// BaseImage is the controller image to use for the Pods
+	ControllerImage string `json:"controllerImage"`
 	// ImagePullPolicy defines how the image is pulled
-	ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy"`
-	// HostNetwork can be true or false
-	HostNetwork bool `json:"hostNetwork,omitempty"`
-	// AllowRestart defines whether allow pod restart
-	AllowRestart bool `json:"allowRestart"`
+	ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
+
 	// Resources describes the compute resource requirements
 	Resources corev1.ResourceRequirements `json:"resources"`
 	// StorageMode can be EmptyDir, HostPath, StorageClass
 	StorageMode string `json:"storageMode"`
 	// HostPath is the local path to store data
 	HostPath string `json:"hostPath"`
-	// Env defines custom env, e.g. BROKER_MEM
-	Env []corev1.EnvVar `json:"env"`
-	// Volumes define the broker.conf
-	Volumes []corev1.Volume `json:"volumes"`
+	// Env defines custom env
+	Env []corev1.EnvVar `json:"env,omitempty"`
+
 	// VolumeClaimTemplates defines the StorageClass
 	VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates"`
-	// The name of pod where the metadata from
-	ScalePodName string `json:"scalePodName"`
+
 	// Pod Security Context
 	PodSecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`
 	// Container Security Context
@@ -76,9 +70,9 @@ type BrokerSpec struct {
 	ServiceAccountName string `json:"serviceAccountName,omitempty"`
 }
 
-// BrokerStatus defines the observed state of Broker
+// ControllerStatus defines the observed state of Controller
 // +k8s:openapi-gen=true
-type BrokerStatus struct {
+type ControllerStatus struct {
 	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
 	// Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
 	// Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
@@ -88,30 +82,28 @@ type BrokerStatus struct {
 
 // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
 
-// Broker is the Schema for the brokers API
+// Controller is the Schema for the Controllers API
 // +k8s:openapi-gen=true
 // +kubebuilder:printcolumn:name="Size",type="integer",JSONPath=".spec.size"
-// +kubebuilder:printcolumn:name="Replica-Per-Group",type="integer",JSONPath=".spec.replicaPerGroup"
-// +kubebuilder:printcolumn:name="Allow-Restart",type="boolean",JSONPath=".spec.allowRestart"
 // +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp"
 // +kubebuilder:subresource:status
-type Broker struct {
+type Controller struct {
 	metav1.TypeMeta   `json:",inline"`
 	metav1.ObjectMeta `json:"metadata,omitempty"`
 
-	Spec   BrokerSpec   `json:"spec,omitempty"`
-	Status BrokerStatus `json:"status,omitempty"`
+	Spec   ControllerSpec   `json:"spec,omitempty"`
+	Status ControllerStatus `json:"status,omitempty"`
 }
 
 // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
 
-// BrokerList contains a list of Broker
-type BrokerList struct {
+// ControllerList contains a list of Controller
+type ControllerList struct {
 	metav1.TypeMeta `json:",inline"`
 	metav1.ListMeta `json:"metadata,omitempty"`
-	Items           []Broker `json:"items"`
+	Items           []Controller `json:"items"`
 }
 
 func init() {
-	SchemeBuilder.Register(&Broker{}, &BrokerList{})
+	SchemeBuilder.Register(&Controller{}, &ControllerList{})
 }
diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go
index 947edce..3c3999c 100644
--- a/pkg/constants/constants.go
+++ b/pkg/constants/constants.go
@@ -147,4 +147,23 @@ const (
 
 	// TopicListConsumerGroup is the consumer group field index of the output when using command check topic list
 	TopicListConsumerGroup = 2
+
+	// ControllerContainerName is the name of Controller container
+	ControllerContainerName = "controller"
+
+	// EnvControllerDLegerPeers is the container environment variable name of DLeger peers
+	// Format: {unique id}-{IP of that controller}
+	EnvControllerDLegerPeers = "controllerDLegerPeers"
+
+	// the container environment variable name of persistent storage directory
+	EnvControllerStorePath = "controllerStorePath"
+
+	// the container environment variable name of DLeger group
+	EnvControllerDLegerGroup = "controllerDLegerGroup"
+
+	// the container environment variable name of in Broker
+	EnvEnableControllerMode = "enableControllerMode"
+
+	// EnvControllerAddr is the container environment variable name of Controller address in Broker
+	EnvControllerAddr = "controllerAddr"
 )
diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go
index fb3e76d..030a3d6 100644
--- a/pkg/controller/broker/broker_controller.go
+++ b/pkg/controller/broker/broker_controller.go
@@ -20,12 +20,13 @@ package broker
 
 import (
 	"context"
-	"github.com/google/uuid"
 	"reflect"
 	"strconv"
 	"strings"
 	"time"
 
+	"github.com/google/uuid"
+
 	rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1"
 	cons "github.com/apache/rocketmq-operator/pkg/constants"
 	"github.com/apache/rocketmq-operator/pkg/share"
@@ -48,7 +49,6 @@ import (
 )
 
 var log = logf.Log.WithName("controller_broker")
-var isInitial = true
 var cmd = []string{"/bin/bash", "-c", "echo Initial broker"}
 
 /**
@@ -156,6 +156,15 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque
 		share.NameServersStr = broker.Spec.NameServers
 	}
 
+	if broker.Spec.ClusterMode == "" {
+		broker.Spec.ClusterMode = "STATIC"
+	}
+
+	if broker.Spec.ClusterMode == "CONTROLLER" && share.ControllerAccessPoint == "" {
+		log.Info("Broker Waiting for Controller ready...")
+		return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
+	}
+
 	share.BrokerClusterName = broker.Name
 	replicaPerGroup := broker.Spec.ReplicaPerGroup
 	reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup))
@@ -395,10 +404,14 @@ func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker,
 	var a int32 = 1
 	var c = &a
 	var statefulSetName string
-	if replicaIndex == 0 {
-		statefulSetName = broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-master"
-	} else {
-		statefulSetName = broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-replica-" + strconv.Itoa(replicaIndex)
+	if broker.Spec.ClusterMode == "STATIC" {
+		if replicaIndex == 0 {
+			statefulSetName = broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-master"
+		} else {
+			statefulSetName = broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-replica-" + strconv.Itoa(replicaIndex)
+		}
+	} else if broker.Spec.ClusterMode == "CONTROLLER" {
+		statefulSetName = broker.Name + "-" + strconv.Itoa(brokerGroupIndex) + "-" + strconv.Itoa(replicaIndex)
 	}
 
 	// After CustomResourceDefinition version upgraded from v1beta1 to v1
@@ -428,6 +441,7 @@ func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker,
 				Spec: corev1.PodSpec{
 					ServiceAccountName: broker.Spec.ServiceAccountName,
 					HostNetwork:        broker.Spec.HostNetwork,
+					DNSPolicy:          corev1.DNSClusterFirstWithHostNet,
 					Affinity:           broker.Spec.Affinity,
 					Tolerations:        broker.Spec.Tolerations,
 					NodeSelector:       broker.Spec.NodeSelector,
@@ -499,6 +513,10 @@ func getENV(broker *rocketmqv1alpha1.Broker, replicaIndex int, brokerGroupIndex
 		Name:  cons.EnvBrokerName,
 		Value: broker.Name + "-" + strconv.Itoa(brokerGroupIndex),
 	}}
+	if broker.Spec.ClusterMode == "CONTROLLER" {
+		envs = append(envs, corev1.EnvVar{Name: cons.EnvEnableControllerMode, Value: "true"})
+		envs = append(envs, corev1.EnvVar{Name: cons.EnvControllerAddr, Value: share.ControllerAccessPoint})
+	}
 	envs = append(envs, broker.Spec.Env...)
 	return envs
 }
@@ -578,26 +596,3 @@ func getPodNames(pods []corev1.Pod) []string {
 	}
 	return podNames
 }
-
-func contains(item string, arr []string) bool {
-	for _, value := range arr {
-		if reflect.DeepEqual(value, item) {
-			return true
-		}
-	}
-	return false
-}
-
-func checkAndCopyMetadata(newPodNames []string, dir string, sourcePodName string, namespace string, k8s *tool.K8sClient) {
-	cmdOpts := buildInputCommand(dir)
-	jsonStr, _ := exec(cmdOpts, sourcePodName, k8s, namespace) // TODO handler error
-	if len(jsonStr) < cons.MinMetadataJsonFileSize {
-		log.Info("The file " + dir + " is abnormally too short to execute metadata transmission, please check whether the source broker pod " + sourcePodName + " is correct")
-	} else {
-		// for each new pod, copy the metadata from the scale source pod
-		for _, newPodName := range newPodNames {
-			cmdOpts = buildOutputCommand(jsonStr, dir)
-			exec(cmdOpts, newPodName, k8s, namespace)
-		}
-	}
-}
diff --git a/pkg/controller/controller/dledger_controller.go b/pkg/controller/controller/dledger_controller.go
new file mode 100644
index 0000000..1056f24
--- /dev/null
+++ b/pkg/controller/controller/dledger_controller.go
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package controller contains the implementation of the Controller CRD reconcile function
+package controller
+
+import (
+	"context"
+	"reflect"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/google/uuid"
+
+	rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1"
+	cons "github.com/apache/rocketmq-operator/pkg/constants"
+	"github.com/apache/rocketmq-operator/pkg/share"
+	"github.com/apache/rocketmq-operator/pkg/tool"
+	appsv1 "k8s.io/api/apps/v1"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/errors"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/labels"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/types"
+	"k8s.io/apimachinery/pkg/util/intstr"
+	"sigs.k8s.io/controller-runtime/pkg/client"
+	"sigs.k8s.io/controller-runtime/pkg/controller"
+	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+	"sigs.k8s.io/controller-runtime/pkg/handler"
+	logf "sigs.k8s.io/controller-runtime/pkg/log"
+	"sigs.k8s.io/controller-runtime/pkg/manager"
+	"sigs.k8s.io/controller-runtime/pkg/reconcile"
+	"sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+var log = logf.Log.WithName("dledger_controller")
+
+// SetupWithManager creates a new Controller and adds it to the Manager. The Manager will set fields on the Controller
+// and Start it when the Manager is Started.
+func SetupWithManager(mgr manager.Manager) error {
+	return add(mgr, newReconciler(mgr))
+}
+
+// newReconciler returns a new reconcile.Reconciler
+func newReconciler(mgr manager.Manager) reconcile.Reconciler {
+	return &ReconcileController{client: mgr.GetClient(), scheme: mgr.GetScheme()}
+}
+
+// add adds a new Controller to mgr with r as the reconcile.Reconciler
+func add(mgr manager.Manager, r reconcile.Reconciler) error {
+	// Create a new controller
+	c, err := controller.New("dledger-controller", mgr, controller.Options{Reconciler: r})
+	if err != nil {
+		return err
+	}
+
+	// Watch for changes to primary resource Controller
+	err = c.Watch(&source.Kind{Type: &rocketmqv1alpha1.Controller{}}, &handler.EnqueueRequestForObject{})
+	if err != nil {
+		return err
+	}
+
+	// TODO(user): Modify this to be the types you create that are owned by the primary resource
+	// Watch for changes to secondary resource Pods and requeue the owner Controller
+	err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
+		IsController: true,
+		OwnerType:    &rocketmqv1alpha1.Controller{},
+	})
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+//+kubebuilder:rbac:groups=rocketmq.apache.org,resources=controllers,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups=rocketmq.apache.org,resources=controllers/status,verbs=get;update;patch
+//+kubebuilder:rbac:groups=rocketmq.apache.org,resources=controllers/finalizers,verbs=update
+//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups="",resources=pods/exec,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
+
+// ReconcileController reconciles a Controller object
+type ReconcileController struct {
+	// This client, initialized using mgr.Client() above, is a split client
+	// that reads objects from the cache and writes to the apiserver
+	client client.Client
+	scheme *runtime.Scheme
+}
+
+// Reconcile reads that state of the cluster for a Controller object and makes changes based on the state read
+// and what is in the Controller.Spec
+// Note:
+// The Controller will requeue the Request to be processed again if the returned error is non-nil or
+// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
+func (r *ReconcileController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
+	reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
+	reqLogger.Info("Reconciling Controller.")
+
+	// Fetch the Controller instance
+	controller := &rocketmqv1alpha1.Controller{}
+	err := r.client.Get(context.TODO(), request.NamespacedName, controller)
+	if err != nil {
+		if errors.IsNotFound(err) {
+			// Request object not found, could have been deleted after reconcile request.
+			// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
+			// Return and don't requeue
+			reqLogger.Info("Controller resource not found. Ignoring since object must be deleted.")
+			return reconcile.Result{}, nil
+		}
+		// Error reading the object - requeue the request.
+		reqLogger.Error(err, "Failed to get Controller.")
+		return reconcile.Result{RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, err
+	}
+
+	//create headless svc
+	headlessSvc := &corev1.Service{}
+	err = r.client.Get(context.TODO(), types.NamespacedName{Name: tool.BuildHeadlessSvcResourceName(request.Name), Namespace: request.Namespace}, headlessSvc)
+	if err != nil {
+		if errors.IsNotFound(err) {
+			// create;
+			consoleSvc := r.generateHeadlessSvc(controller)
+			err = r.client.Create(context.TODO(), consoleSvc)
+			if err != nil {
+				reqLogger.Error(err, "Failed to create controller headless svc")
+				return reconcile.Result{}, err
+			} else {
+				reqLogger.Info("Successfully create controller headless svc")
+			}
+		} else {
+			return reconcile.Result{}, err
+		}
+	}
+
+	sts := r.getControllerStatefulSet(controller)
+	// Check if the statefulSet already exists, if not create a new one
+	found := &appsv1.StatefulSet{}
+	err = r.client.Get(context.TODO(), types.NamespacedName{Name: sts.Name, Namespace: sts.Namespace}, found)
+	if err != nil && errors.IsNotFound(err) {
+		reqLogger.Info("Creating a new Controller StatefulSet.", "StatefulSet.Namespace", sts.Namespace, "StatefulSet.Name", sts.Name)
+		err = r.client.Create(context.TODO(), sts)
+		if err != nil {
+			reqLogger.Error(err, "Failed to create new Controller StatefulSet", "StatefulSet.Namespace", sts.Namespace, "StatefulSet.Name", sts.Name)
+		}
+	} else if err != nil {
+		reqLogger.Error(err, "Failed to list Controller StatefulSet.")
+	}
+
+	// List the pods for this controller's statefulSet
+	podList := &corev1.PodList{}
+	labelSelector := labels.SelectorFromSet(labelsForController(controller.Name))
+	listOps := &client.ListOptions{
+		Namespace:     controller.Namespace,
+		LabelSelector: labelSelector,
+	}
+	err = r.client.List(context.TODO(), podList, listOps)
+	if err != nil {
+		reqLogger.Error(err, "Failed to list pods.", "Controller.Namespace", controller.Namespace, "Controller.Name", controller.Name)
+		return reconcile.Result{}, err
+	}
+	podNames := getPodNames(podList.Items)
+	log.Info("controller.Status.Nodes length = " + strconv.Itoa(len(controller.Status.Nodes)))
+	log.Info("podNames length = " + strconv.Itoa(len(podNames)))
+	// Ensure every pod is in running phase
+	for _, pod := range podList.Items {
+		if !reflect.DeepEqual(pod.Status.Phase, corev1.PodRunning) {
+			log.Info("pod " + pod.Name + " phase is " + string(pod.Status.Phase) + ", wait for a moment...")
+		}
+	}
+
+	// Update status.Size if needed
+	if controller.Spec.Size != controller.Status.Size {
+		log.Info("controller.Status.Size = " + strconv.Itoa(controller.Status.Size))
+		log.Info("controller.Spec.Size = " + strconv.Itoa(controller.Spec.Size))
+		controller.Status.Size = controller.Spec.Size
+		err = r.client.Status().Update(context.TODO(), controller)
+		if err != nil {
+			reqLogger.Error(err, "Failed to update Controller Size status.")
+		}
+	}
+
+	// Update status.Nodes if needed
+	if !reflect.DeepEqual(podNames, controller.Status.Nodes) {
+		controller.Status.Nodes = podNames
+		err = r.client.Status().Update(context.TODO(), controller)
+		if err != nil {
+			reqLogger.Error(err, "Failed to update Controller Nodes status.")
+		}
+	}
+
+	//create svc
+	controllerSvc := &corev1.Service{}
+	controllerSvcName := tool.BuildSvcResourceName(request.Name)
+	err = r.client.Get(context.TODO(), types.NamespacedName{Name: controllerSvcName, Namespace: request.Namespace}, controllerSvc)
+	if err != nil {
+		if errors.IsNotFound(err) {
+			// create;
+			svcToCreate := r.generateSvc(controller)
+			err = r.client.Create(context.TODO(), svcToCreate)
+			if err != nil {
+				reqLogger.Error(err, "Failed to create controller svc")
+				return reconcile.Result{}, err
+			} else {
+				reqLogger.Info("Successfully create controller svc")
+			}
+		} else {
+			return reconcile.Result{}, err
+		}
+	}
+	share.ControllerAccessPoint = controllerSvcName + ":9878"
+
+	return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
+}
+
+// returns a controller StatefulSet object
+func (r *ReconcileController) getControllerStatefulSet(controller *rocketmqv1alpha1.Controller) *appsv1.StatefulSet {
+	ls := labelsForController(controller.Name)
+
+	// After CustomResourceDefinition version upgraded from v1beta1 to v1
+	// `controller.spec.VolumeClaimTemplates.metadata` declared in yaml will not be stored by kubernetes.
+	// Here is a temporary repair method: to generate a random name
+	if strings.EqualFold(controller.Spec.VolumeClaimTemplates[0].Name, "") {
+		controller.Spec.VolumeClaimTemplates[0].Name = uuid.New().String()
+	}
+
+	var replica = int32(controller.Spec.Size)
+	dep := &appsv1.StatefulSet{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      controller.Name,
+			Namespace: controller.Namespace,
+		},
+		Spec: appsv1.StatefulSetSpec{
+			ServiceName: tool.BuildHeadlessSvcResourceName(controller.Name),
+			Replicas:    &replica,
+			Selector: &metav1.LabelSelector{
+				MatchLabels: ls,
+			},
+			UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
+				Type: appsv1.RollingUpdateStatefulSetStrategyType,
+			},
+			Template: corev1.PodTemplateSpec{
+				ObjectMeta: metav1.ObjectMeta{
+					Labels: ls,
+				},
+				Spec: corev1.PodSpec{
+
+					ServiceAccountName: controller.Spec.ServiceAccountName,
+					Affinity:           controller.Spec.Affinity,
+					Tolerations:        controller.Spec.Tolerations,
+					NodeSelector:       controller.Spec.NodeSelector,
+					PriorityClassName:  controller.Spec.PriorityClassName,
+					ImagePullSecrets:   controller.Spec.ImagePullSecrets,
+					Containers: []corev1.Container{{
+						Resources:       controller.Spec.Resources,
+						Image:           controller.Spec.ControllerImage,
+						Name:            cons.ControllerContainerName,
+						SecurityContext: getContainerSecurityContext(controller),
+						ImagePullPolicy: controller.Spec.ImagePullPolicy,
+						Env:             getENV(controller),
+						VolumeMounts: []corev1.VolumeMount{{
+							MountPath: cons.LogMountPath,
+							Name:      controller.Spec.VolumeClaimTemplates[0].Name,
+							SubPath:   cons.LogSubPathName,
+						}, {
+							MountPath: cons.StoreMountPath,
+							Name:      controller.Spec.VolumeClaimTemplates[0].Name,
+							SubPath:   cons.StoreSubPathName,
+						}},
+						// Command: []string{"sh", "mqcontroller"},
+					}},
+					Volumes:         getVolumes(controller),
+					SecurityContext: getPodSecurityContext(controller),
+				},
+			},
+			VolumeClaimTemplates: getVolumeClaimTemplates(controller),
+		},
+	}
+	// Set Controller instance as the owner and controller
+	controllerutil.SetControllerReference(controller, dep, r.scheme)
+
+	return dep
+
+}
+
+func getENV(controller *rocketmqv1alpha1.Controller) []corev1.EnvVar {
+	var controllerDLegerPeersStr string
+	for controllerIndex := 0; controllerIndex < int(controller.Spec.Size); controllerIndex++ {
+		controllerDLegerPeersStr += controller.Name + strconv.Itoa(controllerIndex) + "-" + controller.Name + "-" + strconv.Itoa(controllerIndex) + "." + tool.BuildHeadlessSvcResourceName(controller.Name) + ":9878"
+		if controllerIndex < int(controller.Spec.Size)-1 {
+			controllerDLegerPeersStr += ";"
+		}
+	}
+	log.Info("controllerDLegerPeersStr=" + controllerDLegerPeersStr)
+	envs := []corev1.EnvVar{{
+		Name:      "MY_POD_NAME",
+		ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}},
+	}, {
+		Name:  cons.EnvControllerDLegerGroup,
+		Value: "ControllerGroup-" + controller.Name,
+	}, {
+		Name:  cons.EnvControllerDLegerPeers,
+		Value: controllerDLegerPeersStr,
+	}, {
+		Name:  cons.EnvControllerStorePath,
+		Value: cons.StoreMountPath,
+	}}
+	envs = append(envs, controller.Spec.Env...)
+	return envs
+}
+
+func getVolumeClaimTemplates(controller *rocketmqv1alpha1.Controller) []corev1.PersistentVolumeClaim {
+	switch controller.Spec.StorageMode {
+	case cons.StorageModeStorageClass:
+		return controller.Spec.VolumeClaimTemplates
+	case cons.StorageModeEmptyDir, cons.StorageModeHostPath:
+		fallthrough
+	default:
+		return nil
+	}
+}
+
+func getPodSecurityContext(controller *rocketmqv1alpha1.Controller) *corev1.PodSecurityContext {
+	var securityContext = corev1.PodSecurityContext{}
+	if controller.Spec.PodSecurityContext != nil {
+		securityContext = *controller.Spec.PodSecurityContext
+	}
+	return &securityContext
+}
+
+func getContainerSecurityContext(controller *rocketmqv1alpha1.Controller) *corev1.SecurityContext {
+	var securityContext = corev1.SecurityContext{}
+	if controller.Spec.ContainerSecurityContext != nil {
+		securityContext = *controller.Spec.ContainerSecurityContext
+	}
+	return &securityContext
+}
+
+func getVolumes(controller *rocketmqv1alpha1.Controller) []corev1.Volume {
+	switch controller.Spec.StorageMode {
+	case cons.StorageModeStorageClass:
+		return nil
+	case cons.StorageModeEmptyDir:
+		volumes := []corev1.Volume{{
+			Name: controller.Spec.VolumeClaimTemplates[0].Name,
+			VolumeSource: corev1.VolumeSource{
+				EmptyDir: &corev1.EmptyDirVolumeSource{}},
+		}}
+		return volumes
+	case cons.StorageModeHostPath:
+		fallthrough
+	default:
+
+		volumes := []corev1.Volume{{
+			Name: controller.Spec.VolumeClaimTemplates[0].Name,
+			VolumeSource: corev1.VolumeSource{
+				HostPath: &corev1.HostPathVolumeSource{
+					Path: controller.Spec.HostPath,
+				}},
+		}}
+		return volumes
+	}
+}
+
+// labelsForController returns the labels for selecting the resources
+// belonging to the given controller CR name.
+func labelsForController(name string) map[string]string {
+	return map[string]string{"app": "controller", "controller_cr": name}
+}
+
+// getPodNames returns the pod names of the array of pods passed in
+func getPodNames(pods []corev1.Pod) []string {
+	var podNames []string
+	for _, pod := range pods {
+		podNames = append(podNames, pod.Name)
+	}
+	return podNames
+}
+
+func (r *ReconcileController) generateHeadlessSvc(cr *rocketmqv1alpha1.Controller) *corev1.Service {
+	controllerSvc := &corev1.Service{
+		ObjectMeta: metav1.ObjectMeta{
+			Namespace:   cr.Namespace,
+			Name:        tool.BuildHeadlessSvcResourceName(cr.Name),
+			Annotations: map[string]string{"service.alpha.kubernetes.io/tolerate-unready-endpoints": "true"},
+			Labels:      cr.Labels,
+			//Finalizers:  []string{metav1.FinalizerOrphanDependents},
+		},
+		Spec: corev1.ServiceSpec{
+			ClusterIP:                "None",
+			PublishNotReadyAddresses: true,
+			Selector:                 labelsForController(cr.Name),
+			Ports: []corev1.ServicePort{
+				{
+					Name:       "controller",
+					Port:       9878,
+					Protocol:   corev1.ProtocolTCP,
+					TargetPort: intstr.FromInt(9878),
+				},
+			},
+		},
+	}
+
+	controllerutil.SetControllerReference(cr, controllerSvc, r.scheme)
+	return controllerSvc
+}
+
+func (r *ReconcileController) generateSvc(cr *rocketmqv1alpha1.Controller) *corev1.Service {
+	controllerSvc := &corev1.Service{
+		ObjectMeta: metav1.ObjectMeta{
+			Namespace:  cr.Namespace,
+			Name:       tool.BuildSvcResourceName(cr.Name),
+			Labels:     labelsForController(cr.Name),
+			Finalizers: []string{metav1.FinalizerOrphanDependents},
+		},
+		Spec: corev1.ServiceSpec{
+			Selector: labelsForController(cr.Name),
+			Ports: []corev1.ServicePort{
+				{
+					Name:       "controller",
+					Port:       9878,
+					Protocol:   corev1.ProtocolTCP,
+					TargetPort: intstr.FromInt(9878),
+				},
+			},
+		},
+	}
+
+	controllerutil.SetControllerReference(cr, controllerSvc, r.scheme)
+	return controllerSvc
+}
diff --git a/pkg/share/share.go b/pkg/share/share.go
index bd1304b..0656bc2 100644
--- a/pkg/share/share.go
+++ b/pkg/share/share.go
@@ -33,4 +33,7 @@ var (
 
 	// BrokerClusterName is the broker cluster name
 	BrokerClusterName = ""
+
+	// svc of controller for brokers
+	ControllerAccessPoint = ""
 )
diff --git a/pkg/share/share.go b/pkg/tool/resource_name.go
similarity index 61%
copy from pkg/share/share.go
copy to pkg/tool/resource_name.go
index bd1304b..7862e62 100644
--- a/pkg/share/share.go
+++ b/pkg/tool/resource_name.go
@@ -15,22 +15,14 @@
  * limitations under the License.
  */
 
-// Package share defines some variables shared by different packages
-package share
+package tool
 
-var (
-	// GroupNum is the number of broker group
-	GroupNum = 0
+import "fmt"
 
-	// NameServersStr is the name server list
-	NameServersStr = ""
+func BuildHeadlessSvcResourceName(name string) string {
+	return fmt.Sprintf("%s-svc-headless", name)
+}
 
-	// IsNameServersStrUpdated is whether the name server list is updated
-	IsNameServersStrUpdated = false
-
-	// IsNameServersStrInitialized is whether the name server list is initialized
-	IsNameServersStrInitialized = false
-
-	// BrokerClusterName is the broker cluster name
-	BrokerClusterName = ""
-)
+func BuildSvcResourceName(name string) string {
+	return fmt.Sprintf("%s-svc", name)
+}