You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by ka...@apache.org on 2021/03/31 07:31:34 UTC

[submarine] branch master updated: SUBMARINE-771. Implement Controller Pattern

This is an automated email from the ASF dual-hosted git repository.

kaihsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c145f5  SUBMARINE-771. Implement Controller Pattern
8c145f5 is described below

commit 8c145f50c55742ba4a9456ee439abd279f4e1d23
Author: kevin85421 <b0...@ntu.edu.tw>
AuthorDate: Wed Mar 31 15:31:26 2021 +0800

    SUBMARINE-771. Implement Controller Pattern
---
 .gitignore                                         |   1 +
 submarine-cloud-v2/README.md                       |   6 +
 submarine-cloud-v2/controller.go                   | 185 +++++++++++++++++++++
 submarine-cloud-v2/go.sum                          |   1 +
 submarine-cloud-v2/main.go                         |  46 ++++-
 .../{main.go => pkg/signals/signal.go}             |  35 ++--
 .../{main.go => pkg/signals/signal_posix.go}       |  22 +--
 .../{main.go => pkg/signals/signal_windows.go}     |  21 +--
 8 files changed, 262 insertions(+), 55 deletions(-)

diff --git a/.gitignore b/.gitignore
index 633e62a..25e6577 100644
--- a/.gitignore
+++ b/.gitignore
@@ -87,6 +87,7 @@ submarine-security/spark-security/derby.log
 
 # submarine-cloud-v2
 submarine-cloud-v2/vendor/*
+submarine-cloud-v2/submarine-operator
 
 # vscode file
 .project
diff --git a/submarine-cloud-v2/README.md b/submarine-cloud-v2/README.md
index 070d71e..7490e4a 100644
--- a/submarine-cloud-v2/README.md
+++ b/submarine-cloud-v2/README.md
@@ -38,4 +38,10 @@ kubectl apply -f artifacts/examples/example-submarine.yaml
 
 # Step3: Run unit test
 go test
+```
+
+# Build Project
+```bash
+go build -o submarine-operator
+./submarine-operator
 ```
\ No newline at end of file
diff --git a/submarine-cloud-v2/controller.go b/submarine-cloud-v2/controller.go
new file mode 100644
index 0000000..fc5d25d
--- /dev/null
+++ b/submarine-cloud-v2/controller.go
@@ -0,0 +1,185 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package main
+
+import (
+	"fmt"
+	"time"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/kubernetes/scheme"
+	appsinformers "k8s.io/client-go/informers/apps/v1"
+	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+	clientset "submarine-cloud-v2/pkg/generated/clientset/versioned"
+	informers "submarine-cloud-v2/pkg/generated/informers/externalversions/submarine/v1alpha1"
+	typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+	"k8s.io/klog/v2"
+	"k8s.io/client-go/util/workqueue"
+	"k8s.io/client-go/tools/cache"
+	"k8s.io/client-go/tools/record"
+	"k8s.io/apimachinery/pkg/util/wait"
+	submarinescheme "submarine-cloud-v2/pkg/generated/clientset/versioned/scheme"
+)
+
+const controllerAgentName = "submarine-controller"
+
+// Controller is the controller implementation for Foo resources
+type Controller struct {
+	// kubeclientset is a standard kubernetes clientset
+	kubeclientset kubernetes.Interface
+	// sampleclientset is a clientset for our own API group
+	submarineclientset clientset.Interface
+	
+	submarinesSynced        cache.InformerSynced
+	// workqueue is a rate limited work queue. This is used to queue work to be
+	// processed instead of performing it as soon as a change happens. This
+	// means we can ensure we only process a fixed amount of resources at a
+	// time, and makes it easy to ensure we are never processing the same item
+	// simultaneously in two different workers.
+	workqueue workqueue.RateLimitingInterface
+	// recorder is an event recorder for recording Event resources to the
+	// Kubernetes API.
+	recorder record.EventRecorder
+}
+
+// NewController returns a new sample controller
+func NewController(
+	kubeclientset kubernetes.Interface,
+	submarineclientset clientset.Interface,
+	deploymentInformer appsinformers.DeploymentInformer,
+	submarineInformer informers.SubmarineInformer) *Controller {
+
+	// TODO: Create event broadcaster
+	// Add Submarine types to the default Kubernetes Scheme so Events can be
+	// logged for Submarine types.
+	utilruntime.Must(submarinescheme.AddToScheme(scheme.Scheme))
+	klog.V(4).Info("Creating event broadcaster")
+	eventBroadcaster := record.NewBroadcaster()
+	eventBroadcaster.StartStructuredLogging(0)
+	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
+	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
+
+
+	// Initialize controller
+	controller := &Controller{
+		kubeclientset:     kubeclientset,
+		submarineclientset:   submarineclientset,
+		submarinesSynced:        submarineInformer.Informer().HasSynced,
+		workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Submarines"),
+		recorder:          recorder,
+	}
+
+	// Setting up event handler for Submarine
+	klog.Info("Setting up event handlers")
+	submarineInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+		AddFunc: controller.enqueueSubmarine,
+		UpdateFunc: func(old, new interface{}) {
+			controller.enqueueSubmarine(new)
+		},
+	})
+
+	// TODO: Setting up event handler for other resources. E.g. namespace
+
+	return controller
+}
+
+func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
+	defer utilruntime.HandleCrash()
+	defer c.workqueue.ShutDown()
+
+	// Start the informer factories to begin populating the informer caches
+	klog.Info("Starting Submarine controller")
+
+	// Wait for the caches to be synced before starting workers
+	klog.Info("Waiting for informer caches to sync")
+	if ok := cache.WaitForCacheSync(stopCh, c.submarinesSynced); !ok {
+		return fmt.Errorf("failed to wait for caches to sync")
+	}
+
+	klog.Info("Starting workers")
+	// Launch two workers to process Submarine resources
+	for i := 0; i < threadiness; i++ {
+		go wait.Until(c.runWorker, time.Second, stopCh)
+	}
+
+	klog.Info("Started workers")
+	<-stopCh
+	klog.Info("Shutting down workers")
+
+	return nil
+}
+
+// runWorker is a long-running function that will continually call the
+// processNextWorkItem function in order to read and process a message on the
+// workqueue.
+func (c *Controller) runWorker() {
+	for c.processNextWorkItem() {
+	}
+}
+
+// processNextWorkItem will read a single work item off the workqueue and
+// attempt to process it, by calling the syncHandler.
+func (c *Controller) processNextWorkItem() bool {
+	obj, shutdown := c.workqueue.Get()
+	if shutdown {
+		return false
+	}
+
+	// We wrap this block in a func so we can defer c.workqueue.Done.
+	err := func(obj interface{}) error {
+		// TODO: Maintain workqueue
+		defer c.workqueue.Done(obj)
+		key, _ := obj.(string)
+		c.syncHandler(key)
+		c.workqueue.Forget(obj)
+		klog.Infof("Successfully synced '%s'", key)
+		return nil
+	}(obj)
+
+	if err != nil {
+		utilruntime.HandleError(err)
+		return true
+	}
+
+	return true
+}
+
+// syncHandler compares the actual state with the desired, and attempts to
+// converge the two. It then updates the Status block of the Foo resource
+// with the current status of the resource.
+func (c *Controller) syncHandler(key string) error {
+	// TODO: business logic
+	klog.Info("syncHandler: ", key)
+	return nil
+}
+
+// enqueueFoo takes a Submarine resource and converts it into a namespace/name
+// string which is then put onto the work queue. This method should *not* be
+// passed resources of any type other than Submarine.
+func (c *Controller) enqueueSubmarine(obj interface{}) {
+	var key string
+	var err error
+	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
+		utilruntime.HandleError(err)
+		return
+	}
+
+	// key: [namespace]/[CR name]
+	// Example: default/example-submarine 
+	c.workqueue.Add(key)
+}
diff --git a/submarine-cloud-v2/go.sum b/submarine-cloud-v2/go.sum
index 3433a21..6649c0e 100644
--- a/submarine-cloud-v2/go.sum
+++ b/submarine-cloud-v2/go.sum
@@ -83,6 +83,7 @@ github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
 github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
 github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
 github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
diff --git a/submarine-cloud-v2/main.go b/submarine-cloud-v2/main.go
index d59d924..b09ebea 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/main.go
@@ -19,8 +19,15 @@ package main
 
 import (
 	"flag"
+	"time"
+	kubeinformers "k8s.io/client-go/informers"
 	"k8s.io/klog/v2"
+	"k8s.io/client-go/tools/clientcmd"
+	"k8s.io/client-go/kubernetes"
 	"os"
+	"submarine-cloud-v2/pkg/signals"
+	clientset "submarine-cloud-v2/pkg/generated/clientset/versioned"
+	informers "submarine-cloud-v2/pkg/generated/informers/externalversions"
 )
 
 var (
@@ -32,7 +39,44 @@ func main() {
 	klog.InitFlags(nil)
 	flag.Parse()
 
-	// TODO: Create a Submarine operator	
+	// set up signals so we handle the first shutdown signal gracefully
+	stopCh := signals.SetupSignalHandler()
+
+	cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
+	if err != nil {
+		klog.Fatalf("Error building kubeconfig: %s", err.Error())
+	}
+
+	kubeClient, err := kubernetes.NewForConfig(cfg)
+	if err != nil {
+		klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
+	}
+
+	submarineClient, err := clientset.NewForConfig(cfg)
+	if err != nil {
+		klog.Fatalf("Error building submarine clientset: %s", err.Error())
+	}
+
+	kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
+	submarineInformerFactory := informers.NewSharedInformerFactory(submarineClient, time.Second*30)
+	
+	// TODO: Pass informers to NewController()
+	//       ex: namespace informer
+
+	// Create a Submarine operator
+	controller := NewController(kubeClient, submarineClient,
+		kubeInformerFactory.Apps().V1().Deployments(),
+		submarineInformerFactory.Submarine().V1alpha1().Submarines())
+
+	// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
+	// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
+	kubeInformerFactory.Start(stopCh)
+	submarineInformerFactory.Start(stopCh)
+
+	// Run controller
+	if err = controller.Run(2, stopCh); err != nil {
+		klog.Fatalf("Error running controller: %s", err.Error())
+	}
 }
 
 func init() {
diff --git a/submarine-cloud-v2/main.go b/submarine-cloud-v2/pkg/signals/signal.go
similarity index 58%
copy from submarine-cloud-v2/main.go
copy to submarine-cloud-v2/pkg/signals/signal.go
index d59d924..3e783c6 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/pkg/signals/signal.go
@@ -15,27 +15,30 @@
  * limitations under the License.
  */
 
-package main
+package signals
 
 import (
-	"flag"
-	"k8s.io/klog/v2"
 	"os"
+	"os/signal"
 )
 
-var (
-	masterURL  string
-	kubeconfig string
-)
+var onlyOneSignalHandler = make(chan struct{})
 
-func main() {
-	klog.InitFlags(nil)
-	flag.Parse()
+// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
+// which is closed on one of these signals. If a second signal is caught, the program
+// is terminated with exit code 1.
+func SetupSignalHandler() (stopCh <-chan struct{}) {
+	close(onlyOneSignalHandler) // panics when called twice
 
-	// TODO: Create a Submarine operator	
-}
+	stop := make(chan struct{})
+	c := make(chan os.Signal, 2)
+	signal.Notify(c, shutdownSignals...)
+	go func() {
+		<-c
+		close(stop)
+		<-c
+		os.Exit(1) // second signal. Exit directly.
+	}()
 
-func init() {
-	flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("HOME") + "/.kube/config", "Path to a kubeconfig. Only required if out-of-cluster.")
-	flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
-}
\ No newline at end of file
+	return stop
+}
diff --git a/submarine-cloud-v2/main.go b/submarine-cloud-v2/pkg/signals/signal_posix.go
similarity index 62%
copy from submarine-cloud-v2/main.go
copy to submarine-cloud-v2/pkg/signals/signal_posix.go
index d59d924..763f80c 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/pkg/signals/signal_posix.go
@@ -15,27 +15,11 @@
  * limitations under the License.
  */
 
-package main
+package signals
 
 import (
-	"flag"
-	"k8s.io/klog/v2"
 	"os"
+	"syscall"
 )
 
-var (
-	masterURL  string
-	kubeconfig string
-)
-
-func main() {
-	klog.InitFlags(nil)
-	flag.Parse()
-
-	// TODO: Create a Submarine operator	
-}
-
-func init() {
-	flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("HOME") + "/.kube/config", "Path to a kubeconfig. Only required if out-of-cluster.")
-	flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
-}
\ No newline at end of file
+var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
diff --git a/submarine-cloud-v2/main.go b/submarine-cloud-v2/pkg/signals/signal_windows.go
similarity index 62%
copy from submarine-cloud-v2/main.go
copy to submarine-cloud-v2/pkg/signals/signal_windows.go
index d59d924..08d95fc 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/pkg/signals/signal_windows.go
@@ -15,27 +15,10 @@
  * limitations under the License.
  */
 
-package main
+package signals
 
 import (
-	"flag"
-	"k8s.io/klog/v2"
 	"os"
 )
 
-var (
-	masterURL  string
-	kubeconfig string
-)
-
-func main() {
-	klog.InitFlags(nil)
-	flag.Parse()
-
-	// TODO: Create a Submarine operator	
-}
-
-func init() {
-	flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("HOME") + "/.kube/config", "Path to a kubeconfig. Only required if out-of-cluster.")
-	flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
-}
\ No newline at end of file
+var shutdownSignals = []os.Signal{os.Interrupt}

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org