You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/09/20 00:46:49 UTC

[GitHub] little-cui closed pull request #444: SCB-924 Etcd cacher should re-list etcd in fixed time interval

little-cui closed pull request #444: SCB-924 Etcd cacher should re-list etcd in fixed time interval
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/444
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/scripts/build/local.sh b/scripts/build/local.sh
index dec2147d..3e38b81f 100644
--- a/scripts/build/local.sh
+++ b/scripts/build/local.sh
@@ -19,6 +19,8 @@ set -e
 
 export GOOS=${1:-"linux"}
 export GOARCH=${4:-"amd64"}
+export CGO_ENABLED=${CGO_ENABLED:-0} # prevent to compile cgo file
+export GO_EXTLINK_ENABLED=${GO_EXTLINK_ENABLED:-0} # do not use host linker
 export GO_LDFLAGS=${GO_LDFLAGS:-"-linkmode 'external' -extldflags '-static' -s -w"}
 
 RELEASE=${2:-"0.0.1"}
diff --git a/scripts/release/make_release.sh b/scripts/release/make_release.sh
index fe9e3c3c..137b163c 100755
--- a/scripts/release/make_release.sh
+++ b/scripts/release/make_release.sh
@@ -57,6 +57,8 @@ esac
 
 ## Get the arch type
 export GOARCH=${4:-"amd64"}
+export CGO_ENABLED=${CGO_ENABLED:-0} # prevent to compile cgo file
+export GO_EXTLINK_ENABLED=${GO_EXTLINK_ENABLED:-0} # do not use host linker
 export GO_LDFLAGS=${GO_LDFLAGS:-"-linkmode 'external' -extldflags '-static' -s -w"}
 
 root_path=$(cd "$(dirname "$0")"; pwd)
diff --git a/server/infra/discovery/common.go b/server/infra/discovery/common.go
index 965b4c12..eccaaa5c 100644
--- a/server/infra/discovery/common.go
+++ b/server/infra/discovery/common.go
@@ -21,8 +21,6 @@ import (
 )
 
 const (
-	// re-list when there is no event coming in more than 1h(=120*30s)
-	DEFAULT_MAX_NO_EVENT_INTERVAL = 120
-	DEFAULT_TIMEOUT               = 30 * time.Second
-	DEFAULT_CACHE_INIT_SIZE       = 100
+	DEFAULT_TIMEOUT         = 30 * time.Second
+	DEFAULT_CACHE_INIT_SIZE = 100
 )
diff --git a/server/infra/discovery/config.go b/server/infra/discovery/config.go
index f9bdc2ac..076053d3 100644
--- a/server/infra/discovery/config.go
+++ b/server/infra/discovery/config.go
@@ -24,14 +24,13 @@ import (
 
 type Config struct {
 	// Key is the prefix to unique specify resource type
-	Key            string
-	InitSize       int
-	NoEventPeriods int
-	Timeout        time.Duration
-	Period         time.Duration
-	DeferHandler   DeferHandler
-	OnEvent        KvEventFunc
-	Parser         pb.Parser
+	Key          string
+	InitSize     int
+	Timeout      time.Duration
+	Period       time.Duration
+	DeferHandler DeferHandler
+	OnEvent      KvEventFunc
+	Parser       pb.Parser
 }
 
 func (cfg *Config) String() string {
@@ -69,11 +68,6 @@ func (cfg *Config) WithEventFunc(f KvEventFunc) *Config {
 	return cfg
 }
 
-func (cfg *Config) WithNoEventPeriods(p int) *Config {
-	cfg.NoEventPeriods = p
-	return cfg
-}
-
 func (cfg *Config) AppendEventFunc(f KvEventFunc) *Config {
 	if prev := cfg.OnEvent; prev != nil {
 		next := f
@@ -93,11 +87,10 @@ func (cfg *Config) WithParser(parser pb.Parser) *Config {
 
 func Configure() *Config {
 	return &Config{
-		Key:            "/",
-		Timeout:        DEFAULT_TIMEOUT,
-		Period:         time.Second,
-		NoEventPeriods: DEFAULT_MAX_NO_EVENT_INTERVAL,
-		InitSize:       DEFAULT_CACHE_INIT_SIZE,
-		Parser:         pb.BytesParser,
+		Key:      "/",
+		Timeout:  DEFAULT_TIMEOUT,
+		Period:   time.Second,
+		InitSize: DEFAULT_CACHE_INIT_SIZE,
+		Parser:   pb.BytesParser,
 	}
 }
diff --git a/server/infra/discovery/config_test.go b/server/infra/discovery/config_test.go
index 202537d7..0adfa0b6 100644
--- a/server/infra/discovery/config_test.go
+++ b/server/infra/discovery/config_test.go
@@ -55,10 +55,6 @@ func TestConfigure(t *testing.T) {
 	if cfg.Period != 3*time.Second {
 		t.Fatalf("TestConfigure failed")
 	}
-	cfg.WithNoEventPeriods(1)
-	if cfg.NoEventPeriods != 1 {
-		t.Fatalf("TestConfigure failed")
-	}
 	cfg.WithDeferHandler(&mockDeferHandler{})
 	if cfg.DeferHandler == nil {
 		t.Fatalf("TestConfigure failed")
diff --git a/server/plugin/infra/discovery/etcd/cacher_kv.go b/server/plugin/infra/discovery/etcd/cacher_kv.go
index 31c42751..0cade627 100644
--- a/server/plugin/infra/discovery/etcd/cacher_kv.go
+++ b/server/plugin/infra/discovery/etcd/cacher_kv.go
@@ -36,8 +36,8 @@ import (
 type KvCacher struct {
 	Cfg *discovery.Config
 
-	latestListRev  int64
-	noEventPeriods int
+	latestListRev int64
+	reListCount   int
 
 	ready     chan struct{}
 	lw        ListWatch
@@ -53,22 +53,16 @@ func (c *KvCacher) Config() *discovery.Config {
 
 func (c *KvCacher) needList() bool {
 	rev := c.lw.Revision()
+	// init stage or there is a backend error
 	if rev == 0 {
-		c.noEventPeriods = 0
+		c.reListCount = 0
 		return true
 	}
-	if c.latestListRev != rev {
-		c.noEventPeriods = 0
+	c.reListCount++
+	if c.reListCount < DEFAULT_FORCE_LIST_INTERVAL {
 		return false
 	}
-	c.noEventPeriods++
-	if c.Cfg.NoEventPeriods == 0 || c.noEventPeriods < c.Cfg.NoEventPeriods {
-		return false
-	}
-
-	log.Debugf("no events come in more then %s, need to list key %s, rev: %d",
-		time.Duration(c.noEventPeriods)*c.Cfg.Timeout, c.Cfg.Key, rev)
-	c.noEventPeriods = 0
+	c.reListCount = 0
 	return true
 }
 
diff --git a/server/plugin/infra/discovery/etcd/cacher_kv_test.go b/server/plugin/infra/discovery/etcd/cacher_kv_test.go
index 1ee4650d..5ac42f84 100644
--- a/server/plugin/infra/discovery/etcd/cacher_kv_test.go
+++ b/server/plugin/infra/discovery/etcd/cacher_kv_test.go
@@ -97,7 +97,6 @@ func TestNewKvCacher(t *testing.T) {
 	var evt discovery.KvEvent
 	cr = &KvCacher{
 		Cfg: discovery.Configure().
-			WithNoEventPeriods(0).
 			WithEventFunc(func(e discovery.KvEvent) {
 				evt = e
 			}),
@@ -151,11 +150,13 @@ func TestNewKvCacher(t *testing.T) {
 	}
 
 	// case re-list and over no event times
-	lw.Bus <- nil
+	for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ {
+		lw.Bus <- nil
+	}
 	evt.KV = nil
-	old := *cr.Cfg
-	cr.Cfg.WithNoEventPeriods(1)
-	cr.refresh(ctx)
+	for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ {
+		cr.refresh(ctx)
+	}
 	// check event
 	if evt.Type != pb.EVT_UPDATE || evt.Revision != 4 || evt.KV.ModRevision != 3 || string(evt.KV.Key) != "ka" || string(evt.KV.Value.([]byte)) != "va" {
 		t.Fatalf("TestNewKvCacher failed, %v", evt)
@@ -167,10 +168,13 @@ func TestNewKvCacher(t *testing.T) {
 	}
 
 	lw.ListResponse = &registry.PluginResponse{Revision: 5}
-	lw.Bus <- nil
+	for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ {
+		lw.Bus <- nil
+	}
 	evt.KV = nil
-	cr.refresh(ctx)
-	*cr.Cfg = old
+	for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ {
+		cr.refresh(ctx)
+	}
 	// check event
 	if evt.Type != pb.EVT_DELETE || evt.Revision != 5 || evt.KV.ModRevision != 3 || string(evt.KV.Key) != "ka" || string(evt.KV.Value.([]byte)) != "va" {
 		t.Fatalf("TestNewKvCacher failed, %v", evt)
@@ -281,7 +285,7 @@ func TestNewKvCacher(t *testing.T) {
 	lw.ListResponse = test
 	lw.Bus <- nil
 	evt.KV = nil
-	old = *cr.Cfg
+	old := *cr.Cfg
 	cr.Cfg.WithParser(pb.MapParser)
 	cr.refresh(ctx)
 	// check event
diff --git a/server/plugin/infra/discovery/etcd/common.go b/server/plugin/infra/discovery/etcd/common.go
index fe1663db..d7891674 100644
--- a/server/plugin/infra/discovery/etcd/common.go
+++ b/server/plugin/infra/discovery/etcd/common.go
@@ -23,9 +23,11 @@ import (
 )
 
 const (
-	DEFAULT_METRICS_INTERVAL = 30 * time.Second
-	DEFAULT_COMPACT_TIMES    = 2
-	DEFAULT_COMPACT_TIMEOUT  = 5 * time.Minute
+	// force re-list
+	DEFAULT_FORCE_LIST_INTERVAL = 4
+	DEFAULT_METRICS_INTERVAL    = 30 * time.Second
+	DEFAULT_COMPACT_TIMES       = 2
+	DEFAULT_COMPACT_TIMEOUT     = 5 * time.Minute
 
 	minWaitInterval = 1 * time.Second
 	eventBlockSize  = 1000
diff --git a/server/service/notification/processor_test.go b/server/service/notification/processor_test.go
index ed5f2702..7c22096c 100644
--- a/server/service/notification/processor_test.go
+++ b/server/service/notification/processor_test.go
@@ -19,11 +19,24 @@ package notification
 import (
 	"github.com/apache/incubator-servicecomb-service-center/pkg/gopool"
 	"testing"
+	"time"
 )
 
+type mockSubscriberChan struct {
+	*BaseSubscriber
+	job chan NotifyJob
+}
+
+func (s *mockSubscriberChan) OnMessage(job NotifyJob) {
+	s.job <- job
+}
+
 func TestProcessor_Do(t *testing.T) {
-	mock1 := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g1")}
-	mock2 := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g2")}
+	delay := 50 * time.Millisecond
+	mock1 := &mockSubscriberChan{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g1"),
+		job: make(chan NotifyJob, 1)}
+	mock2 := &mockSubscriberChan{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g2"),
+		job: make(chan NotifyJob, 1)}
 	p := NewProcessor("p1", 0)
 	gopool.Go(p.Do)
 	if p.Name() != "p1" {
@@ -48,27 +61,55 @@ func TestProcessor_Do(t *testing.T) {
 		t.Fatalf("TestProcessor_Do")
 	}
 	p.AddSubscriber(mock1)
+	p.AddSubscriber(mock2)
 	job := &BaseNotifyJob{group: "g1"}
 	p.Accept(job)
-	if mock1.job != nil {
+	select {
+	case <-mock1.job:
 		t.Fatalf("TestProcessor_Do")
+	case <-time.After(delay):
 	}
 	job.subject = "s1"
 	job.group = "g3"
 	p.Accept(job)
-	if mock1.job != nil {
+	select {
+	case <-mock1.job:
 		t.Fatalf("TestProcessor_Do")
+	case <-time.After(delay):
 	}
 	job.subject = "s1"
 	job.group = "g1"
 	p.Accept(job)
-	if mock1.job != job || mock2.job != nil {
+	select {
+	case j := <-mock1.job:
+		if j != job {
+			t.Fatalf("TestProcessor_Do")
+		}
+	case <-time.After(delay):
 		t.Fatalf("TestProcessor_Do")
 	}
+	select {
+	case <-mock2.job:
+		t.Fatalf("TestProcessor_Do")
+	case <-time.After(delay):
+	}
 	job.subject = "s1"
 	job.group = ""
 	p.Accept(job)
-	if mock1.job != job && mock2.job != job {
+	select {
+	case j := <-mock1.job:
+		if j != job {
+			t.Fatalf("TestProcessor_Do")
+		}
+	case <-time.After(delay):
+		t.Fatalf("TestProcessor_Do")
+	}
+	select {
+	case j := <-mock2.job:
+		if j != job {
+			t.Fatalf("TestProcessor_Do")
+		}
+	case <-time.After(delay):
 		t.Fatalf("TestProcessor_Do")
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services