You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by ka...@apache.org on 2021/03/31 07:31:34 UTC
[submarine] branch master updated: SUBMARINE-771. Implement
Controller Pattern
This is an automated email from the ASF dual-hosted git repository.
kaihsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 8c145f5 SUBMARINE-771. Implement Controller Pattern
8c145f5 is described below
commit 8c145f50c55742ba4a9456ee439abd279f4e1d23
Author: kevin85421 <b0...@ntu.edu.tw>
AuthorDate: Wed Mar 31 15:31:26 2021 +0800
SUBMARINE-771. Implement Controller Pattern
---
.gitignore | 1 +
submarine-cloud-v2/README.md | 6 +
submarine-cloud-v2/controller.go | 185 +++++++++++++++++++++
submarine-cloud-v2/go.sum | 1 +
submarine-cloud-v2/main.go | 46 ++++-
.../{main.go => pkg/signals/signal.go} | 35 ++--
.../{main.go => pkg/signals/signal_posix.go} | 22 +--
.../{main.go => pkg/signals/signal_windows.go} | 21 +--
8 files changed, 262 insertions(+), 55 deletions(-)
diff --git a/.gitignore b/.gitignore
index 633e62a..25e6577 100644
--- a/.gitignore
+++ b/.gitignore
@@ -87,6 +87,7 @@ submarine-security/spark-security/derby.log
# submarine-cloud-v2
submarine-cloud-v2/vendor/*
+submarine-cloud-v2/submarine-operator
# vscode file
.project
diff --git a/submarine-cloud-v2/README.md b/submarine-cloud-v2/README.md
index 070d71e..7490e4a 100644
--- a/submarine-cloud-v2/README.md
+++ b/submarine-cloud-v2/README.md
@@ -38,4 +38,10 @@ kubectl apply -f artifacts/examples/example-submarine.yaml
# Step3: Run unit test
go test
+```
+
+# Build Project
+```bash
+go build -o submarine-operator
+./submarine-operator
```
\ No newline at end of file
diff --git a/submarine-cloud-v2/controller.go b/submarine-cloud-v2/controller.go
new file mode 100644
index 0000000..fc5d25d
--- /dev/null
+++ b/submarine-cloud-v2/controller.go
@@ -0,0 +1,185 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "fmt"
+ "time"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ appsinformers "k8s.io/client-go/informers/apps/v1"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ clientset "submarine-cloud-v2/pkg/generated/clientset/versioned"
+ informers "submarine-cloud-v2/pkg/generated/informers/externalversions/submarine/v1alpha1"
+ typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+ "k8s.io/klog/v2"
+ "k8s.io/client-go/util/workqueue"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/tools/record"
+ "k8s.io/apimachinery/pkg/util/wait"
+ submarinescheme "submarine-cloud-v2/pkg/generated/clientset/versioned/scheme"
+)
+
+const controllerAgentName = "submarine-controller"
+
+// Controller is the controller implementation for Foo resources
+type Controller struct {
+ // kubeclientset is a standard kubernetes clientset
+ kubeclientset kubernetes.Interface
+ // sampleclientset is a clientset for our own API group
+ submarineclientset clientset.Interface
+
+ submarinesSynced cache.InformerSynced
+ // workqueue is a rate limited work queue. This is used to queue work to be
+ // processed instead of performing it as soon as a change happens. This
+ // means we can ensure we only process a fixed amount of resources at a
+ // time, and makes it easy to ensure we are never processing the same item
+ // simultaneously in two different workers.
+ workqueue workqueue.RateLimitingInterface
+ // recorder is an event recorder for recording Event resources to the
+ // Kubernetes API.
+ recorder record.EventRecorder
+}
+
+// NewController returns a new sample controller
+func NewController(
+ kubeclientset kubernetes.Interface,
+ submarineclientset clientset.Interface,
+ deploymentInformer appsinformers.DeploymentInformer,
+ submarineInformer informers.SubmarineInformer) *Controller {
+
+ // TODO: Create event broadcaster
+ // Add Submarine types to the default Kubernetes Scheme so Events can be
+ // logged for Submarine types.
+ utilruntime.Must(submarinescheme.AddToScheme(scheme.Scheme))
+ klog.V(4).Info("Creating event broadcaster")
+ eventBroadcaster := record.NewBroadcaster()
+ eventBroadcaster.StartStructuredLogging(0)
+ eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
+ recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
+
+
+ // Initialize controller
+ controller := &Controller{
+ kubeclientset: kubeclientset,
+ submarineclientset: submarineclientset,
+ submarinesSynced: submarineInformer.Informer().HasSynced,
+ workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Submarines"),
+ recorder: recorder,
+ }
+
+ // Setting up event handler for Submarine
+ klog.Info("Setting up event handlers")
+ submarineInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: controller.enqueueSubmarine,
+ UpdateFunc: func(old, new interface{}) {
+ controller.enqueueSubmarine(new)
+ },
+ })
+
+ // TODO: Setting up event handler for other resources. E.g. namespace
+
+ return controller
+}
+
+func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
+ defer utilruntime.HandleCrash()
+ defer c.workqueue.ShutDown()
+
+ // Start the informer factories to begin populating the informer caches
+ klog.Info("Starting Submarine controller")
+
+ // Wait for the caches to be synced before starting workers
+ klog.Info("Waiting for informer caches to sync")
+ if ok := cache.WaitForCacheSync(stopCh, c.submarinesSynced); !ok {
+ return fmt.Errorf("failed to wait for caches to sync")
+ }
+
+ klog.Info("Starting workers")
+ // Launch two workers to process Submarine resources
+ for i := 0; i < threadiness; i++ {
+ go wait.Until(c.runWorker, time.Second, stopCh)
+ }
+
+ klog.Info("Started workers")
+ <-stopCh
+ klog.Info("Shutting down workers")
+
+ return nil
+}
+
+// runWorker is a long-running function that will continually call the
+// processNextWorkItem function in order to read and process a message on the
+// workqueue.
+func (c *Controller) runWorker() {
+ for c.processNextWorkItem() {
+ }
+}
+
+// processNextWorkItem will read a single work item off the workqueue and
+// attempt to process it, by calling the syncHandler.
+func (c *Controller) processNextWorkItem() bool {
+ obj, shutdown := c.workqueue.Get()
+ if shutdown {
+ return false
+ }
+
+ // We wrap this block in a func so we can defer c.workqueue.Done.
+ err := func(obj interface{}) error {
+ // TODO: Maintain workqueue
+ defer c.workqueue.Done(obj)
+ key, _ := obj.(string)
+ c.syncHandler(key)
+ c.workqueue.Forget(obj)
+ klog.Infof("Successfully synced '%s'", key)
+ return nil
+ }(obj)
+
+ if err != nil {
+ utilruntime.HandleError(err)
+ return true
+ }
+
+ return true
+}
+
+// syncHandler compares the actual state with the desired, and attempts to
+// converge the two. It then updates the Status block of the Foo resource
+// with the current status of the resource.
+func (c *Controller) syncHandler(key string) error {
+ // TODO: business logic
+ klog.Info("syncHandler: ", key)
+ return nil
+}
+
+// enqueueFoo takes a Submarine resource and converts it into a namespace/name
+// string which is then put onto the work queue. This method should *not* be
+// passed resources of any type other than Submarine.
+func (c *Controller) enqueueSubmarine(obj interface{}) {
+ var key string
+ var err error
+ if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
+ utilruntime.HandleError(err)
+ return
+ }
+
+ // key: [namespace]/[CR name]
+ // Example: default/example-submarine
+ c.workqueue.Add(key)
+}
diff --git a/submarine-cloud-v2/go.sum b/submarine-cloud-v2/go.sum
index 3433a21..6649c0e 100644
--- a/submarine-cloud-v2/go.sum
+++ b/submarine-cloud-v2/go.sum
@@ -83,6 +83,7 @@ github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
diff --git a/submarine-cloud-v2/main.go b/submarine-cloud-v2/main.go
index d59d924..b09ebea 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/main.go
@@ -19,8 +19,15 @@ package main
import (
"flag"
+ "time"
+ kubeinformers "k8s.io/client-go/informers"
"k8s.io/klog/v2"
+ "k8s.io/client-go/tools/clientcmd"
+ "k8s.io/client-go/kubernetes"
"os"
+ "submarine-cloud-v2/pkg/signals"
+ clientset "submarine-cloud-v2/pkg/generated/clientset/versioned"
+ informers "submarine-cloud-v2/pkg/generated/informers/externalversions"
)
var (
@@ -32,7 +39,44 @@ func main() {
klog.InitFlags(nil)
flag.Parse()
- // TODO: Create a Submarine operator
+ // set up signals so we handle the first shutdown signal gracefully
+ stopCh := signals.SetupSignalHandler()
+
+ cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
+ if err != nil {
+ klog.Fatalf("Error building kubeconfig: %s", err.Error())
+ }
+
+ kubeClient, err := kubernetes.NewForConfig(cfg)
+ if err != nil {
+ klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
+ }
+
+ submarineClient, err := clientset.NewForConfig(cfg)
+ if err != nil {
+ klog.Fatalf("Error building submarine clientset: %s", err.Error())
+ }
+
+ kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
+ submarineInformerFactory := informers.NewSharedInformerFactory(submarineClient, time.Second*30)
+
+ // TODO: Pass informers to NewController()
+ // ex: namespace informer
+
+ // Create a Submarine operator
+ controller := NewController(kubeClient, submarineClient,
+ kubeInformerFactory.Apps().V1().Deployments(),
+ submarineInformerFactory.Submarine().V1alpha1().Submarines())
+
+ // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
+ // Start method is non-blocking and runs all registered informers in a dedicated goroutine.
+ kubeInformerFactory.Start(stopCh)
+ submarineInformerFactory.Start(stopCh)
+
+ // Run controller
+ if err = controller.Run(2, stopCh); err != nil {
+ klog.Fatalf("Error running controller: %s", err.Error())
+ }
}
func init() {
diff --git a/submarine-cloud-v2/main.go b/submarine-cloud-v2/pkg/signals/signal.go
similarity index 58%
copy from submarine-cloud-v2/main.go
copy to submarine-cloud-v2/pkg/signals/signal.go
index d59d924..3e783c6 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/pkg/signals/signal.go
@@ -15,27 +15,30 @@
* limitations under the License.
*/
-package main
+package signals
import (
- "flag"
- "k8s.io/klog/v2"
"os"
+ "os/signal"
)
-var (
- masterURL string
- kubeconfig string
-)
+var onlyOneSignalHandler = make(chan struct{})
-func main() {
- klog.InitFlags(nil)
- flag.Parse()
+// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
+// which is closed on one of these signals. If a second signal is caught, the program
+// is terminated with exit code 1.
+func SetupSignalHandler() (stopCh <-chan struct{}) {
+ close(onlyOneSignalHandler) // panics when called twice
- // TODO: Create a Submarine operator
-}
+ stop := make(chan struct{})
+ c := make(chan os.Signal, 2)
+ signal.Notify(c, shutdownSignals...)
+ go func() {
+ <-c
+ close(stop)
+ <-c
+ os.Exit(1) // second signal. Exit directly.
+ }()
-func init() {
- flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("HOME") + "/.kube/config", "Path to a kubeconfig. Only required if out-of-cluster.")
- flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
-}
\ No newline at end of file
+ return stop
+}
diff --git a/submarine-cloud-v2/main.go b/submarine-cloud-v2/pkg/signals/signal_posix.go
similarity index 62%
copy from submarine-cloud-v2/main.go
copy to submarine-cloud-v2/pkg/signals/signal_posix.go
index d59d924..763f80c 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/pkg/signals/signal_posix.go
@@ -15,27 +15,11 @@
* limitations under the License.
*/
-package main
+package signals
import (
- "flag"
- "k8s.io/klog/v2"
"os"
+ "syscall"
)
-var (
- masterURL string
- kubeconfig string
-)
-
-func main() {
- klog.InitFlags(nil)
- flag.Parse()
-
- // TODO: Create a Submarine operator
-}
-
-func init() {
- flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("HOME") + "/.kube/config", "Path to a kubeconfig. Only required if out-of-cluster.")
- flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
-}
\ No newline at end of file
+var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
diff --git a/submarine-cloud-v2/main.go b/submarine-cloud-v2/pkg/signals/signal_windows.go
similarity index 62%
copy from submarine-cloud-v2/main.go
copy to submarine-cloud-v2/pkg/signals/signal_windows.go
index d59d924..08d95fc 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/pkg/signals/signal_windows.go
@@ -15,27 +15,10 @@
* limitations under the License.
*/
-package main
+package signals
import (
- "flag"
- "k8s.io/klog/v2"
"os"
)
-var (
- masterURL string
- kubeconfig string
-)
-
-func main() {
- klog.InitFlags(nil)
- flag.Parse()
-
- // TODO: Create a Submarine operator
-}
-
-func init() {
- flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("HOME") + "/.kube/config", "Path to a kubeconfig. Only required if out-of-cluster.")
- flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
-}
\ No newline at end of file
+var shutdownSignals = []os.Signal{os.Interrupt}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org