You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/10 13:06:20 UTC
[skywalking-banyandb] branch tsdb-closer created (now c2e6cdf)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a change to branch tsdb-closer
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
at c2e6cdf Add a Closer to manage the closing phase
This branch includes the following new commits:
new c2e6cdf Add a Closer to manage the closing phase
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[skywalking-banyandb] 01/01: Add a Closer to manage the closing phase
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch tsdb-closer
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c2e6cdf40a7a9e095e0b70a4759dcd7ec74efafb
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Nov 10 12:44:50 2022 +0000
Add a Closer to manage the closing phase
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
.github/workflows/ci.yml | 10 +----
.github/workflows/dependency-review.yml | 34 ++++++++++++++
.gitignore | 6 +++
banyand/tsdb/block.go | 51 ++++-----------------
banyand/tsdb/bucket/bucket.go | 32 ++++++++-----
banyand/tsdb/bucket/queue.go | 10 +++--
banyand/tsdb/bucket/queue_test.go | 15 +++++--
banyand/tsdb/bucket/strategy.go | 36 +++++++++++----
banyand/tsdb/bucket/strategy_test.go | 4 +-
banyand/tsdb/metric.go | 3 +-
banyand/tsdb/retention.go | 27 ++++-------
banyand/tsdb/segment.go | 40 +++++++++--------
banyand/tsdb/shard.go | 25 +++++++----
pkg/index/index.go | 2 -
pkg/index/inverted/inverted.go | 5 ---
pkg/index/lsm/lsm.go | 4 --
pkg/run/closer.go | 79 +++++++++++++++++++++++++++++++++
17 files changed, 245 insertions(+), 138 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 1b75422..70d0a8a 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -161,17 +161,9 @@ jobs:
run: make test-ci
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
- dependency-review:
- runs-on: ubuntu-latest
- steps:
- - name: 'Checkout Repository'
- uses: actions/checkout@v3
- - name: 'Dependency Review'
- uses: actions/dependency-review-action@v2
result:
name: Continuous Integration
runs-on: ubuntu-20.04
- needs: [check, build, test, dependency-review]
+ needs: [check, build, test]
steps:
- run: echo 'success'
-
diff --git a/.github/workflows/dependency-review.yml b/.github/workflows/dependency-review.yml
new file mode 100644
index 0000000..09f5be9
--- /dev/null
+++ b/.github/workflows/dependency-review.yml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Dependency Review Action
+#
+# This Action will scan dependency manifest files that change as part of a Pull Request, surfacing known-vulnerable versions of the packages declared or updated in the PR. Once installed, if the workflow run is marked as required, PRs introducing known-vulnerable packages will be blocked from merging.
+#
+name: 'Dependency Review'
+on: [pull_request]
+
+permissions:
+ contents: read
+
+jobs:
+ dependency-review:
+ runs-on: ubuntu-latest
+ steps:
+ - name: 'Checkout Repository'
+ uses: actions/checkout@v3
+ - name: 'Dependency Review'
+ uses: actions/dependency-review-action@v2
diff --git a/.gitignore b/.gitignore
index 32e3104..14ba169 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,6 +30,9 @@ target
# Test binary, build with `go test -c`
*.test
+# Ginkgo test report
+*.report
+
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
@@ -47,3 +50,6 @@ target
# mock files
*mock.go
*mock_test.go
+
+# snky cache
+.dccache
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 8d050f5..9a813c0 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -72,13 +72,12 @@ type block struct {
invertedIndex index.Store
lsmIndex index.Store
closableLst []io.Closer
+ clock timestamp.Clock
timestamp.TimeRange
bucket.Reporter
segID uint16
blockID uint16
encodingMethod EncodingMethod
- flushCh chan struct{}
- stopCh chan struct{}
}
type blockOpts struct {
@@ -104,7 +103,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
l: logger.Fetch(ctx, "block"),
TimeRange: opts.timeRange,
Reporter: bucket.NewTimeBasedReporter(opts.timeRange, clock),
- flushCh: make(chan struct{}, 1),
+ clock: clock,
ref: &atomic.Int32{},
closed: &atomic.Bool{},
deleted: &atomic.Bool{},
@@ -154,9 +153,6 @@ func (b *block) open() (err error) {
kv.TSSWithEncoding(b.encodingMethod.EncoderPool, b.encodingMethod.DecoderPool),
kv.TSSWithLogger(b.l.Named(componentMain)),
kv.TSSWithMemTableSize(b.memSize),
- kv.TSSWithFlushCallback(func() {
- b.flushCh <- struct{}{}
- }),
); err != nil {
return err
}
@@ -174,20 +170,9 @@ func (b *block) open() (err error) {
}); err != nil {
return err
}
+ b.Reporter = bucket.NewTimeBasedReporter(b.TimeRange, b.clock)
b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
b.ref.Store(0)
- stopCh := make(chan struct{})
- b.stopCh = stopCh
- go func() {
- for {
- select {
- case <-b.flushCh:
- b.flush()
- case <-stopCh:
- return
- }
- }
- }()
b.closed.Store(false)
return nil
}
@@ -268,55 +253,35 @@ func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} {
return ch
}
-func (b *block) flush() {
- for i := 0; i < 10; i++ {
- err := b.invertedIndex.Flush()
- if err == nil {
- break
- }
- time.Sleep(time.Second)
- b.l.Warn().Err(err).Int("retried", i).Msg("failed to flush inverted index")
- }
-}
-
func (b *block) close(ctx context.Context) (err error) {
b.lock.Lock()
defer b.lock.Unlock()
if b.closed.Load() {
return nil
}
- b.closed.Store(true)
stopWaiting := &atomic.Bool{}
ch := b.waitDone(stopWaiting)
select {
case <-ctx.Done():
- b.closed.Store(false)
stopWaiting.Store(true)
return errors.Wrapf(ErrBlockClosingInterrupted, "block:%s", b)
case <-ch:
}
+ b.closed.Store(true)
+ if b.Reporter != nil {
+ b.Stop()
+ }
for _, closer := range b.closableLst {
err = multierr.Append(err, closer.Close())
}
- close(b.stopCh)
return err
}
-func (b *block) stopThenClose(ctx context.Context) error {
- if b.Reporter != nil {
- b.Stop()
- }
- return b.close(ctx)
-}
-
func (b *block) delete(ctx context.Context) error {
if b.deleted.Load() {
return nil
}
b.deleted.Store(true)
- if b.Reporter != nil {
- b.Stop()
- }
b.close(ctx)
return os.RemoveAll(b.path)
}
@@ -326,7 +291,7 @@ func (b *block) Closed() bool {
}
func (b *block) String() string {
- return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.segID), parseSuffix(b.blockID))
+ return fmt.Sprintf("BlockID-%d-%s", parseSuffix(b.segID), b.suffix)
}
func (b *block) stats() (names []string, stats []observability.Statistics) {
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index 5838c26..d0fe08a 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -18,11 +18,15 @@
package bucket
import (
+ "errors"
"time"
+ "github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
+var ErrReporterClosed = errors.New("reporter is closed")
+
type Controller interface {
Current() (Reporter, error)
Next() (Reporter, error)
@@ -37,7 +41,7 @@ type Status struct {
type Channel chan Status
type Reporter interface {
- Report() Channel
+ Report() (Channel, error)
Stop()
String() string
}
@@ -46,28 +50,34 @@ var _ Reporter = (*timeBasedReporter)(nil)
type timeBasedReporter struct {
timestamp.TimeRange
- reporterStopCh chan struct{}
- clock timestamp.Clock
+ clock timestamp.Clock
+ closer *run.Closer
}
func NewTimeBasedReporter(timeRange timestamp.TimeRange, clock timestamp.Clock) Reporter {
if timeRange.End.Before(clock.Now()) {
return nil
}
- return &timeBasedReporter{
- TimeRange: timeRange,
- reporterStopCh: make(chan struct{}),
- clock: clock,
+ t := &timeBasedReporter{
+ TimeRange: timeRange,
+ clock: clock,
+ closer: run.NewCloser(0),
}
+ return t
}
-func (tr *timeBasedReporter) Report() Channel {
+func (tr *timeBasedReporter) Report() (Channel, error) {
+ if tr.closer.Closed() {
+ return nil, ErrReporterClosed
+ }
ch := make(Channel, 1)
interval := tr.Duration() >> 4
if interval < 100*time.Millisecond {
interval = 100 * time.Millisecond
}
go func() {
+ tr.closer.AddRunning()
+ defer tr.closer.Done()
defer close(ch)
ticker := tr.clock.Ticker(interval)
defer ticker.Stop()
@@ -82,14 +92,14 @@ func (tr *timeBasedReporter) Report() Channel {
if status.Volume >= status.Capacity {
return
}
- case <-tr.reporterStopCh:
+ case <-tr.closer.CloseNotify():
return
}
}
}()
- return ch
+ return ch, nil
}
func (tr *timeBasedReporter) Stop() {
- close(tr.reporterStopCh)
+ tr.closer.CloseThenWait()
}
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index 0bcf49a..04f782b 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -28,6 +28,7 @@ import (
"github.com/robfig/cron/v3"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -66,7 +67,7 @@ type lruQueue struct {
recentEvict simplelru.LRUCache
lock sync.RWMutex
- stopCh chan struct{}
+ closer *run.Closer
}
func NewQueue(logger *logger.Logger, size int, maxSize int, clock timestamp.Clock, evictFn EvictFn) (Queue, error) {
@@ -98,7 +99,7 @@ func NewQueue(logger *logger.Logger, size int, maxSize int, clock timestamp.Cloc
evictSize: evictSize,
evictFn: evictFn,
l: logger,
- stopCh: make(chan struct{}),
+ closer: run.NewCloser(1),
}
parser := cron.NewParser(cron.Second)
// every 60 seconds to clean up recentEvict
@@ -107,6 +108,7 @@ func NewQueue(logger *logger.Logger, size int, maxSize int, clock timestamp.Cloc
return nil, err
}
go func() {
+ defer c.closer.Done()
now := clock.Now()
for {
next := scheduler.Next(now)
@@ -133,7 +135,7 @@ func NewQueue(logger *logger.Logger, size int, maxSize int, clock timestamp.Cloc
cancel()
c.lock.Unlock()
}
- case <-c.stopCh:
+ case <-c.closer.CloseNotify():
c.l.Info().Msg("stop")
timer.Stop()
return
@@ -281,6 +283,6 @@ func (q *lruQueue) removeOldest(ctx context.Context, lst simplelru.LRUCache) err
}
func (q *lruQueue) Close() error {
- close(q.stopCh)
+ q.closer.CloseThenWait()
return nil
}
diff --git a/banyand/tsdb/bucket/queue_test.go b/banyand/tsdb/bucket/queue_test.go
index f256358..8e16306 100644
--- a/banyand/tsdb/bucket/queue_test.go
+++ b/banyand/tsdb/bucket/queue_test.go
@@ -19,6 +19,7 @@ package bucket_test
import (
"context"
"strconv"
+ "sync"
"time"
. "github.com/onsi/ginkgo/v2"
@@ -27,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -47,6 +49,7 @@ func entryID(id uint16) queueEntryID {
}
var _ = Describe("Queue", func() {
+ var lock sync.Mutex
var evictLst []queueEntryID
var l bucket.Queue
var clock timestamp.MockClock
@@ -60,6 +63,8 @@ var _ = Describe("Queue", func() {
clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
var err error
l, err = bucket.NewQueue(logger.GetLogger("test"), 128, 192, clock, func(_ context.Context, id interface{}) error {
+ lock.Lock()
+ defer lock.Unlock()
evictLst = append(evictLst, id.(queueEntryID))
return nil
})
@@ -126,8 +131,12 @@ var _ = Describe("Queue", func() {
Expect(enRecentSize).To(Equal(192))
Expect(l.Len()).To(Equal(128))
Expect(len(evictLst)).To(Equal(0))
- clock.Add(time.Minute)
- GinkgoWriter.Printf("evicted size:%d \n", len(evictLst))
- Expect(len(evictLst)).To(BeNumerically(">", 1))
+ Eventually(func() int {
+ clock.Add(time.Minute)
+ clock.TriggerTimer()
+ lock.Lock()
+ defer lock.Unlock()
+ return len(evictLst)
+ }).WithTimeout(flags.EventuallyTimeout).Should(BeNumerically(">", 1))
})
})
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 6bf51f2..bc09583 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -26,6 +26,7 @@ import (
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
var (
@@ -42,7 +43,8 @@ type Strategy struct {
current atomic.Value
currentRatio uint64
logger *logger.Logger
- stopCh chan struct{}
+
+ closer *run.Closer
}
type StrategyOptions func(*Strategy)
@@ -71,7 +73,7 @@ func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error)
strategy := &Strategy{
ctrl: ctrl,
ratio: 0.8,
- stopCh: make(chan struct{}),
+ closer: run.NewCloser(1),
}
for _, opt := range options {
opt(strategy)
@@ -82,18 +84,36 @@ func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error)
if strategy.logger == nil {
strategy.logger = logger.GetLogger("bucket-strategy")
}
- c, err := ctrl.Current()
- if err != nil {
+ if err := strategy.resetCurrent(); err != nil {
return nil, err
}
- strategy.current.Store(c)
return strategy, nil
}
+func (s *Strategy) resetCurrent() error {
+ c, err := s.ctrl.Current()
+ if err != nil {
+ return err
+ }
+ s.current.Store(c)
+ return nil
+}
+
func (s *Strategy) Run() {
go func(s *Strategy) {
+ defer s.closer.Done()
for {
- c := s.current.Load().(Reporter).Report()
+ c, err := s.current.Load().(Reporter).Report()
+ if errors.Is(err, ErrReporterClosed) {
+ return
+ }
+ if err != nil {
+ s.logger.Error().Err(err).Msg("failed to get reporter")
+ if err := s.resetCurrent(); err != nil {
+ panic(err)
+ }
+ continue
+ }
if !s.observe(c) {
return
}
@@ -138,12 +158,12 @@ func (s *Strategy) observe(c Channel) bool {
}
return moreBucket
}
- case <-s.stopCh:
+ case <-s.closer.CloseNotify():
return false
}
}
}
func (s *Strategy) Close() {
- close(s.stopCh)
+ s.closer.CloseThenWait()
}
diff --git a/banyand/tsdb/bucket/strategy_test.go b/banyand/tsdb/bucket/strategy_test.go
index 00f6ef5..64adbda 100644
--- a/banyand/tsdb/bucket/strategy_test.go
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -133,7 +133,7 @@ type reporter struct {
step int
}
-func (r *reporter) Report() bucket.Channel {
+func (r *reporter) Report() (bucket.Channel, error) {
ch := make(bucket.Channel, r.capacity)
go func() {
var volume int
@@ -146,7 +146,7 @@ func (r *reporter) Report() bucket.Channel {
}
close(ch)
}()
- return ch
+ return ch, nil
}
func (r *reporter) Stop() {
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
index a97cddf..696e019 100644
--- a/banyand/tsdb/metric.go
+++ b/banyand/tsdb/metric.go
@@ -52,13 +52,14 @@ func init() {
func (s *shard) runStat() {
go func() {
+ defer s.closer.Done()
ticker := time.NewTicker(statInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.stat()
- case <-s.stopCh:
+ case <-s.closer.CloseNotify():
return
}
}
diff --git a/banyand/tsdb/retention.go b/banyand/tsdb/retention.go
index 755bf41..78f7747 100644
--- a/banyand/tsdb/retention.go
+++ b/banyand/tsdb/retention.go
@@ -19,22 +19,21 @@ package tsdb
import (
"context"
- "sync"
"time"
"github.com/robfig/cron/v3"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
type retentionController struct {
segment *segmentController
scheduler cron.Schedule
- stopped bool
- stopMux sync.Mutex
- stopCh chan struct{}
duration time.Duration
- l *logger.Logger
+
+ closer *run.Closer
+ l *logger.Logger
}
func newRetentionController(segment *segmentController, ttl IntervalRule) (*retentionController, error) {
@@ -56,22 +55,18 @@ func newRetentionController(segment *segmentController, ttl IntervalRule) (*rete
return &retentionController{
segment: segment,
scheduler: scheduler,
- stopCh: make(chan struct{}),
l: segment.l.Named("retention-controller"),
duration: ttl.EstimatedDuration(),
+ closer: run.NewCloser(1),
}, nil
}
func (rc *retentionController) start() {
- rc.stopMux.Lock()
- if rc.stopped {
- return
- }
- rc.stopMux.Unlock()
go rc.run()
}
func (rc *retentionController) run() {
+ defer rc.closer.Done()
rc.l.Info().Msg("start")
now := rc.segment.clock.Now()
for {
@@ -85,7 +80,7 @@ func (rc *retentionController) run() {
rc.l.Error().Err(err)
}
cancel()
- case <-rc.stopCh:
+ case <-rc.closer.CloseNotify():
timer.Stop()
rc.l.Info().Msg("stop")
return
@@ -94,11 +89,5 @@ func (rc *retentionController) run() {
}
func (rc *retentionController) stop() {
- rc.stopMux.Lock()
- defer rc.stopMux.Unlock()
- if rc.stopped {
- return
- }
- rc.stopped = true
- close(rc.stopCh)
+ rc.closer.CloseThenWait()
}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 6d08e2f..abfcd28 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -20,7 +20,6 @@ package tsdb
import (
"context"
"errors"
- "fmt"
"os"
"sort"
"strconv"
@@ -50,6 +49,7 @@ type segment struct {
bucket.Reporter
blockController *blockController
blockManageStrategy *bucket.Strategy
+ closeOnce sync.Once
}
func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
@@ -111,21 +111,23 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
return s, nil
}
-func (s *segment) close(ctx context.Context) error {
- if err := s.blockController.close(ctx); err != nil {
- return err
- }
- if s.globalIndex != nil {
- if err := s.globalIndex.Close(); err != nil {
- return err
+func (s *segment) close(ctx context.Context) (err error) {
+ s.closeOnce.Do(func() {
+ if err = s.blockController.close(ctx); err != nil {
+ return
}
- }
- if s.blockManageStrategy != nil {
- s.blockManageStrategy.Close()
- }
- if s.Reporter != nil {
- s.Stop()
- }
+ if s.globalIndex != nil {
+ if err = s.globalIndex.Close(); err != nil {
+ return
+ }
+ }
+ if s.blockManageStrategy != nil {
+ s.blockManageStrategy.Close()
+ }
+ if s.Reporter != nil {
+ s.Stop()
+ }
+ })
return nil
}
@@ -140,8 +142,8 @@ func (s *segment) delete(ctx context.Context) error {
return os.RemoveAll(s.path)
}
-func (s segment) String() string {
- return fmt.Sprintf("SegID-%d", parseSuffix(s.id))
+func (s *segment) String() string {
+ return "SegID-" + s.suffix
}
func (s *segment) Stats() observability.Statistics {
@@ -431,8 +433,10 @@ func (bc *blockController) sortLst() {
}
func (bc *blockController) close(ctx context.Context) (err error) {
+ bc.Lock()
+ defer bc.Unlock()
for _, s := range bc.lst {
- err = multierr.Append(err, s.stopThenClose(ctx))
+ err = multierr.Append(err, s.close(ctx))
}
return err
}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index b77a232..93c9824 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -51,7 +52,9 @@ type shard struct {
segmentController *segmentController
segmentManageStrategy *bucket.Strategy
retentionController *retentionController
- stopCh chan struct{}
+
+ closeOnce sync.Once
+ closer *run.Closer
}
func OpenShard(ctx context.Context, id common.ShardID,
@@ -79,7 +82,7 @@ func OpenShard(ctx context.Context, id common.ShardID,
id: id,
segmentController: sc,
l: l,
- stopCh: make(chan struct{}),
+ closer: run.NewCloser(1),
}
err = s.segmentController.open()
if err != nil {
@@ -162,13 +165,15 @@ func (s *shard) State() (shardState ShardState) {
return shardState
}
-func (s *shard) Close() error {
- s.retentionController.stop()
- s.segmentManageStrategy.Close()
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- err := multierr.Combine(s.segmentController.close(ctx), s.seriesDatabase.Close())
- close(s.stopCh)
+func (s *shard) Close() (err error) {
+ s.closeOnce.Do(func() {
+ s.closer.CloseThenWait()
+ s.retentionController.stop()
+ s.segmentManageStrategy.Close()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ err = multierr.Combine(s.segmentController.close(ctx), s.seriesDatabase.Close())
+ })
return err
}
@@ -445,6 +450,8 @@ func (sc *segmentController) removeSeg(segID uint16) {
}
func (sc *segmentController) close(ctx context.Context) (err error) {
+ sc.Lock()
+ defer sc.Unlock()
for _, s := range sc.lst {
err = multierr.Append(err, s.close(ctx))
}
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 6f490cd..6d06790 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -166,8 +166,6 @@ type Store interface {
io.Closer
Writer
Searcher
- // Flush flushed memory data to disk
- Flush() error
}
type GetSearcher func(location databasev1.IndexRule_Type) (Searcher, error)
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 5ce8878..4f680b8 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -204,11 +204,6 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
return
}
-// Flush flushed memory data to disk
-func (s *store) Flush() error {
- return nil
-}
-
type blugeMatchIterator struct {
delegated search.DocumentMatchIterator
fieldKey string
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 8c366ad..0a26c26 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -35,10 +35,6 @@ type store struct {
l *logger.Logger
}
-func (*store) Flush() error {
- panic("do not call flush here. LSM index is using its own controller to flush memory data")
-}
-
func (s *store) Stats() observability.Statistics {
return s.lsm.Stats()
}
diff --git a/pkg/run/closer.go b/pkg/run/closer.go
new file mode 100644
index 0000000..cc70e53
--- /dev/null
+++ b/pkg/run/closer.go
@@ -0,0 +1,79 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package run
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+)
+
+// Closer can close a goroutine then wait for it to stop.
+type Closer struct {
+ waiting sync.WaitGroup
+ closed *atomic.Bool
+
+ ctx context.Context
+ cancel context.CancelFunc
+}
+
+// NewCloser instances a new Closer.
+func NewCloser(initial int) *Closer {
+ c := &Closer{}
+ c.ctx, c.cancel = context.WithCancel(context.Background())
+ c.closed = &atomic.Bool{}
+ c.waiting.Add(initial)
+ return c
+}
+
+// AddRunning adds a running task.
+func (c *Closer) AddRunning() {
+ c.waiting.Add(1)
+}
+
+// Close sends a signal to the CloseNotify.
+func (c *Closer) Close() {
+ c.closed.Store(true)
+ c.cancel()
+}
+
+// CloseNotify receives a signal from Close.
+func (c *Closer) CloseNotify() <-chan struct{} {
+ return c.ctx.Done()
+}
+
+// Done notifies that one task is done.
+func (c *Closer) Done() {
+ c.waiting.Done()
+}
+
+// Wait waits until all tasks are done.
+func (c *Closer) Wait() {
+ c.waiting.Wait()
+}
+
+// CloseThenWait calls Close(), then Wait().
+func (c *Closer) CloseThenWait() {
+ c.Close()
+ c.Wait()
+}
+
+// Closed returns whether the Closer is closed
+func (c *Closer) Closed() bool {
+ return c.closed.Load()
+}