You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/09/20 00:46:49 UTC
[GitHub] little-cui closed pull request #444: SCB-924 Etcd cacher should
re-list etcd in fixed time interval
little-cui closed pull request #444: SCB-924 Etcd cacher should re-list etcd in fixed time interval
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/444
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/scripts/build/local.sh b/scripts/build/local.sh
index dec2147d..3e38b81f 100644
--- a/scripts/build/local.sh
+++ b/scripts/build/local.sh
@@ -19,6 +19,8 @@ set -e
export GOOS=${1:-"linux"}
export GOARCH=${4:-"amd64"}
+export CGO_ENABLED=${CGO_ENABLED:-0} # prevent to compile cgo file
+export GO_EXTLINK_ENABLED=${GO_EXTLINK_ENABLED:-0} # do not use host linker
export GO_LDFLAGS=${GO_LDFLAGS:-"-linkmode 'external' -extldflags '-static' -s -w"}
RELEASE=${2:-"0.0.1"}
diff --git a/scripts/release/make_release.sh b/scripts/release/make_release.sh
index fe9e3c3c..137b163c 100755
--- a/scripts/release/make_release.sh
+++ b/scripts/release/make_release.sh
@@ -57,6 +57,8 @@ esac
## Get the arch type
export GOARCH=${4:-"amd64"}
+export CGO_ENABLED=${CGO_ENABLED:-0} # prevent to compile cgo file
+export GO_EXTLINK_ENABLED=${GO_EXTLINK_ENABLED:-0} # do not use host linker
export GO_LDFLAGS=${GO_LDFLAGS:-"-linkmode 'external' -extldflags '-static' -s -w"}
root_path=$(cd "$(dirname "$0")"; pwd)
diff --git a/server/infra/discovery/common.go b/server/infra/discovery/common.go
index 965b4c12..eccaaa5c 100644
--- a/server/infra/discovery/common.go
+++ b/server/infra/discovery/common.go
@@ -21,8 +21,6 @@ import (
)
const (
- // re-list when there is no event coming in more than 1h(=120*30s)
- DEFAULT_MAX_NO_EVENT_INTERVAL = 120
- DEFAULT_TIMEOUT = 30 * time.Second
- DEFAULT_CACHE_INIT_SIZE = 100
+ DEFAULT_TIMEOUT = 30 * time.Second
+ DEFAULT_CACHE_INIT_SIZE = 100
)
diff --git a/server/infra/discovery/config.go b/server/infra/discovery/config.go
index f9bdc2ac..076053d3 100644
--- a/server/infra/discovery/config.go
+++ b/server/infra/discovery/config.go
@@ -24,14 +24,13 @@ import (
type Config struct {
// Key is the prefix to unique specify resource type
- Key string
- InitSize int
- NoEventPeriods int
- Timeout time.Duration
- Period time.Duration
- DeferHandler DeferHandler
- OnEvent KvEventFunc
- Parser pb.Parser
+ Key string
+ InitSize int
+ Timeout time.Duration
+ Period time.Duration
+ DeferHandler DeferHandler
+ OnEvent KvEventFunc
+ Parser pb.Parser
}
func (cfg *Config) String() string {
@@ -69,11 +68,6 @@ func (cfg *Config) WithEventFunc(f KvEventFunc) *Config {
return cfg
}
-func (cfg *Config) WithNoEventPeriods(p int) *Config {
- cfg.NoEventPeriods = p
- return cfg
-}
-
func (cfg *Config) AppendEventFunc(f KvEventFunc) *Config {
if prev := cfg.OnEvent; prev != nil {
next := f
@@ -93,11 +87,10 @@ func (cfg *Config) WithParser(parser pb.Parser) *Config {
func Configure() *Config {
return &Config{
- Key: "/",
- Timeout: DEFAULT_TIMEOUT,
- Period: time.Second,
- NoEventPeriods: DEFAULT_MAX_NO_EVENT_INTERVAL,
- InitSize: DEFAULT_CACHE_INIT_SIZE,
- Parser: pb.BytesParser,
+ Key: "/",
+ Timeout: DEFAULT_TIMEOUT,
+ Period: time.Second,
+ InitSize: DEFAULT_CACHE_INIT_SIZE,
+ Parser: pb.BytesParser,
}
}
diff --git a/server/infra/discovery/config_test.go b/server/infra/discovery/config_test.go
index 202537d7..0adfa0b6 100644
--- a/server/infra/discovery/config_test.go
+++ b/server/infra/discovery/config_test.go
@@ -55,10 +55,6 @@ func TestConfigure(t *testing.T) {
if cfg.Period != 3*time.Second {
t.Fatalf("TestConfigure failed")
}
- cfg.WithNoEventPeriods(1)
- if cfg.NoEventPeriods != 1 {
- t.Fatalf("TestConfigure failed")
- }
cfg.WithDeferHandler(&mockDeferHandler{})
if cfg.DeferHandler == nil {
t.Fatalf("TestConfigure failed")
diff --git a/server/plugin/infra/discovery/etcd/cacher_kv.go b/server/plugin/infra/discovery/etcd/cacher_kv.go
index 31c42751..0cade627 100644
--- a/server/plugin/infra/discovery/etcd/cacher_kv.go
+++ b/server/plugin/infra/discovery/etcd/cacher_kv.go
@@ -36,8 +36,8 @@ import (
type KvCacher struct {
Cfg *discovery.Config
- latestListRev int64
- noEventPeriods int
+ latestListRev int64
+ reListCount int
ready chan struct{}
lw ListWatch
@@ -53,22 +53,16 @@ func (c *KvCacher) Config() *discovery.Config {
func (c *KvCacher) needList() bool {
rev := c.lw.Revision()
+ // init stage or there is a backend error
if rev == 0 {
- c.noEventPeriods = 0
+ c.reListCount = 0
return true
}
- if c.latestListRev != rev {
- c.noEventPeriods = 0
+ c.reListCount++
+ if c.reListCount < DEFAULT_FORCE_LIST_INTERVAL {
return false
}
- c.noEventPeriods++
- if c.Cfg.NoEventPeriods == 0 || c.noEventPeriods < c.Cfg.NoEventPeriods {
- return false
- }
-
- log.Debugf("no events come in more then %s, need to list key %s, rev: %d",
- time.Duration(c.noEventPeriods)*c.Cfg.Timeout, c.Cfg.Key, rev)
- c.noEventPeriods = 0
+ c.reListCount = 0
return true
}
diff --git a/server/plugin/infra/discovery/etcd/cacher_kv_test.go b/server/plugin/infra/discovery/etcd/cacher_kv_test.go
index 1ee4650d..5ac42f84 100644
--- a/server/plugin/infra/discovery/etcd/cacher_kv_test.go
+++ b/server/plugin/infra/discovery/etcd/cacher_kv_test.go
@@ -97,7 +97,6 @@ func TestNewKvCacher(t *testing.T) {
var evt discovery.KvEvent
cr = &KvCacher{
Cfg: discovery.Configure().
- WithNoEventPeriods(0).
WithEventFunc(func(e discovery.KvEvent) {
evt = e
}),
@@ -151,11 +150,13 @@ func TestNewKvCacher(t *testing.T) {
}
// case re-list and over no event times
- lw.Bus <- nil
+ for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ {
+ lw.Bus <- nil
+ }
evt.KV = nil
- old := *cr.Cfg
- cr.Cfg.WithNoEventPeriods(1)
- cr.refresh(ctx)
+ for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ {
+ cr.refresh(ctx)
+ }
// check event
if evt.Type != pb.EVT_UPDATE || evt.Revision != 4 || evt.KV.ModRevision != 3 || string(evt.KV.Key) != "ka" || string(evt.KV.Value.([]byte)) != "va" {
t.Fatalf("TestNewKvCacher failed, %v", evt)
@@ -167,10 +168,13 @@ func TestNewKvCacher(t *testing.T) {
}
lw.ListResponse = ®istry.PluginResponse{Revision: 5}
- lw.Bus <- nil
+ for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ {
+ lw.Bus <- nil
+ }
evt.KV = nil
- cr.refresh(ctx)
- *cr.Cfg = old
+ for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ {
+ cr.refresh(ctx)
+ }
// check event
if evt.Type != pb.EVT_DELETE || evt.Revision != 5 || evt.KV.ModRevision != 3 || string(evt.KV.Key) != "ka" || string(evt.KV.Value.([]byte)) != "va" {
t.Fatalf("TestNewKvCacher failed, %v", evt)
@@ -281,7 +285,7 @@ func TestNewKvCacher(t *testing.T) {
lw.ListResponse = test
lw.Bus <- nil
evt.KV = nil
- old = *cr.Cfg
+ old := *cr.Cfg
cr.Cfg.WithParser(pb.MapParser)
cr.refresh(ctx)
// check event
diff --git a/server/plugin/infra/discovery/etcd/common.go b/server/plugin/infra/discovery/etcd/common.go
index fe1663db..d7891674 100644
--- a/server/plugin/infra/discovery/etcd/common.go
+++ b/server/plugin/infra/discovery/etcd/common.go
@@ -23,9 +23,11 @@ import (
)
const (
- DEFAULT_METRICS_INTERVAL = 30 * time.Second
- DEFAULT_COMPACT_TIMES = 2
- DEFAULT_COMPACT_TIMEOUT = 5 * time.Minute
+ // force re-list
+ DEFAULT_FORCE_LIST_INTERVAL = 4
+ DEFAULT_METRICS_INTERVAL = 30 * time.Second
+ DEFAULT_COMPACT_TIMES = 2
+ DEFAULT_COMPACT_TIMEOUT = 5 * time.Minute
minWaitInterval = 1 * time.Second
eventBlockSize = 1000
diff --git a/server/service/notification/processor_test.go b/server/service/notification/processor_test.go
index ed5f2702..7c22096c 100644
--- a/server/service/notification/processor_test.go
+++ b/server/service/notification/processor_test.go
@@ -19,11 +19,24 @@ package notification
import (
"github.com/apache/incubator-servicecomb-service-center/pkg/gopool"
"testing"
+ "time"
)
+type mockSubscriberChan struct {
+ *BaseSubscriber
+ job chan NotifyJob
+}
+
+func (s *mockSubscriberChan) OnMessage(job NotifyJob) {
+ s.job <- job
+}
+
func TestProcessor_Do(t *testing.T) {
- mock1 := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g1")}
- mock2 := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g2")}
+ delay := 50 * time.Millisecond
+ mock1 := &mockSubscriberChan{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g1"),
+ job: make(chan NotifyJob, 1)}
+ mock2 := &mockSubscriberChan{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g2"),
+ job: make(chan NotifyJob, 1)}
p := NewProcessor("p1", 0)
gopool.Go(p.Do)
if p.Name() != "p1" {
@@ -48,27 +61,55 @@ func TestProcessor_Do(t *testing.T) {
t.Fatalf("TestProcessor_Do")
}
p.AddSubscriber(mock1)
+ p.AddSubscriber(mock2)
job := &BaseNotifyJob{group: "g1"}
p.Accept(job)
- if mock1.job != nil {
+ select {
+ case <-mock1.job:
t.Fatalf("TestProcessor_Do")
+ case <-time.After(delay):
}
job.subject = "s1"
job.group = "g3"
p.Accept(job)
- if mock1.job != nil {
+ select {
+ case <-mock1.job:
t.Fatalf("TestProcessor_Do")
+ case <-time.After(delay):
}
job.subject = "s1"
job.group = "g1"
p.Accept(job)
- if mock1.job != job || mock2.job != nil {
+ select {
+ case j := <-mock1.job:
+ if j != job {
+ t.Fatalf("TestProcessor_Do")
+ }
+ case <-time.After(delay):
t.Fatalf("TestProcessor_Do")
}
+ select {
+ case <-mock2.job:
+ t.Fatalf("TestProcessor_Do")
+ case <-time.After(delay):
+ }
job.subject = "s1"
job.group = ""
p.Accept(job)
- if mock1.job != job && mock2.job != job {
+ select {
+ case j := <-mock1.job:
+ if j != job {
+ t.Fatalf("TestProcessor_Do")
+ }
+ case <-time.After(delay):
+ t.Fatalf("TestProcessor_Do")
+ }
+ select {
+ case j := <-mock2.job:
+ if j != job {
+ t.Fatalf("TestProcessor_Do")
+ }
+ case <-time.After(delay):
t.Fatalf("TestProcessor_Do")
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services