You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/04 08:40:05 UTC
[skywalking-banyandb] branch main updated: Add load test and inject testing flags (#204)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 28b5fd5 Add load test and inject testing flags (#204)
28b5fd5 is described below
commit 28b5fd5f23dee6bc4ea1ed729f7da1956ad257d0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Nov 4 16:40:00 2022 +0800
Add load test and inject testing flags (#204)
* Add a slow load test case to CI
* Update max opened blocks strategy. A shard could open 64 blocks
at most. A cleanup goroutine could collect 10 inactive oldest blocks
every minute until there are 4 blocks left in the cache.
* Inject logging level and the timeout of eventually through "ldflags"
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
.github/workflows/load.yml | 57 +++++++
Makefile | 7 +-
banyand/liaison/grpc/grpc_suite_test.go | 3 +-
banyand/measure/measure_suite_test.go | 3 +-
banyand/measure/measure_topn.go | 4 +-
banyand/measure/measure_write.go | 6 +-
banyand/metadata/metadata_test.go | 3 +-
banyand/metadata/schema/schema_suite_test.go | 3 +-
banyand/query/processor_topn.go | 4 +-
banyand/stream/stream_suite_test.go | 3 +-
banyand/stream/stream_write.go | 7 +-
banyand/tsdb/block.go | 81 ++++++----
banyand/tsdb/bucket/bucket_suite_test.go | 3 +-
banyand/tsdb/bucket/queue.go | 163 +++++++++++++++++----
banyand/tsdb/bucket/queue_test.go | 83 ++++++++++-
banyand/tsdb/bucket/strategy.go | 4 +-
banyand/tsdb/indexdb.go | 14 +-
banyand/tsdb/retention.go | 24 +--
banyand/tsdb/segment.go | 94 ++++++++----
banyand/tsdb/series.go | 20 +--
banyand/tsdb/seriesdb.go | 22 +--
banyand/tsdb/seriesdb_test.go | 5 +-
banyand/tsdb/shard.go | 53 ++++---
banyand/tsdb/shard_test.go | 53 +++----
banyand/tsdb/tsdb.go | 11 +-
banyand/tsdb/tsdb_suite_test.go | 6 +-
banyand/tsdb/tsdb_test.go | 3 +-
bydbctl/internal/cmd/cmd_suite_test.go | 3 +-
docs/crud/measure/query.md | 2 +
docs/crud/stream/query.md | 2 +
pkg/index/inverted/inverted_test.go | 3 +-
pkg/index/lsm/lsm_test.go | 3 +-
pkg/query/logical/common.go | 22 +--
.../measure/measure_plan_indexscan_local.go | 10 +-
.../logical/stream/stream_plan_indexscan_global.go | 5 +-
.../logical/stream/stream_plan_indexscan_local.go | 10 +-
.../test/flags/flags.go | 32 ++--
test/integration/cold_query/query_suite_test.go | 3 +-
.../load_suite_test.go} | 50 +++----
test/integration/other/other_suite_test.go | 3 +-
test/integration/query/query_suite_test.go | 3 +-
41 files changed, 630 insertions(+), 260 deletions(-)
diff --git a/.github/workflows/load.yml b/.github/workflows/load.yml
new file mode 100644
index 0000000..d3682d4
--- /dev/null
+++ b/.github/workflows/load.yml
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Continuous Integration
+
+on:
+ schedule:
+ - cron: '0 20 * * *'
+
+jobs:
+ test:
+ name: Load several days data
+ runs-on: ubuntu-20.04
+ strategy:
+ matrix:
+ tz: ["UTC", "Asia/Shanghai", "America/Los_Angeles"]
+ steps:
+ - name: Set timezone
+ run: sudo timedatectl set-timezone ${{ matrix.tz }}
+ - uses: actions/setup-node@v3
+ with:
+ node-version: 16.15
+ - name: Install Go
+ uses: actions/setup-go@v2
+ with:
+ go-version: 1.19
+ - name: Check out code into the Go module directory
+ uses: actions/checkout@v2
+ - uses: actions/cache@v3
+ id: cache-go
+ with:
+ path: |
+ ~/.cache/go-build
+ ~/go/pkg/mod
+ key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
+ restore-keys: |
+ ${{ runner.os }}-go-
+ - name: Update dependencies
+ if: steps.cache-go.outputs.cache-hit != 'true'
+ run: GOPROXY=https://proxy.golang.org go mod download
+ - name: Generate mocks
+ run: make generate
+ - name: Test
+ run: TEST_EXTRA_OPTS="--label-filter slow" make -C test test
diff --git a/Makefile b/Makefile
index 37eee1c..099a0b8 100644
--- a/Makefile
+++ b/Makefile
@@ -65,7 +65,12 @@ test-coverage: default ## Run the unit tests in all projects with coverage analy
include scripts/build/ginkgo.mk
test-ci: $(GINKGO) ## Run the unit tests in CI
- $(GINKGO) --race --cover --covermode atomic --coverprofile=coverage.out ./...
+ $(GINKGO) --race \
+ -ldflags \
+ "-X github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=warn" \
+ --cover --covermode atomic --coverprofile=coverage.out \
+ --label-filter !slow \
+ ./...
##@ Code quality targets
diff --git a/banyand/liaison/grpc/grpc_suite_test.go b/banyand/liaison/grpc/grpc_suite_test.go
index ef85011..352574a 100644
--- a/banyand/liaison/grpc/grpc_suite_test.go
+++ b/banyand/liaison/grpc/grpc_suite_test.go
@@ -24,6 +24,7 @@ import (
. "github.com/onsi/gomega"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
func TestGrpc(t *testing.T) {
@@ -34,6 +35,6 @@ func TestGrpc(t *testing.T) {
var _ = BeforeSuite(func() {
Expect(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
})).Should(Succeed())
})
diff --git a/banyand/measure/measure_suite_test.go b/banyand/measure/measure_suite_test.go
index 3f295d8..9f6cc38 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
)
@@ -45,7 +46,7 @@ func TestMeasure(t *testing.T) {
var _ = ginkgo.BeforeSuite(func() {
gomega.Expect(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
})).To(gomega.Succeed())
})
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 29295c3..6a969f1 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -166,7 +166,9 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
if err != nil {
return err
}
- span, err := series.Create(eventTime)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ span, err := series.Create(ctx, eventTime)
if err != nil {
if span != nil {
_ = span.Close()
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index c8a9aef..7cd8224 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -19,6 +19,8 @@ package measure
import (
"bytes"
+ "context"
+ "time"
"github.com/pkg/errors"
@@ -75,7 +77,9 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
if err != nil {
return err
}
- wp, err := series.Create(t)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ wp, err := series.Create(ctx, t)
if err != nil {
if wp != nil {
_ = wp.Close()
diff --git a/banyand/metadata/metadata_test.go b/banyand/metadata/metadata_test.go
index fc3c7e5..682c762 100644
--- a/banyand/metadata/metadata_test.go
+++ b/banyand/metadata/metadata_test.go
@@ -27,6 +27,7 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
test "github.com/apache/skywalking-banyandb/pkg/test/stream"
)
@@ -37,7 +38,7 @@ func Test_service_RulesBySubject(t *testing.T) {
is := assert.New(t)
is.NoError(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
}))
ctx := context.TODO()
s, _ := NewService(ctx)
diff --git a/banyand/metadata/schema/schema_suite_test.go b/banyand/metadata/schema/schema_suite_test.go
index aea8ea1..51afd65 100644
--- a/banyand/metadata/schema/schema_suite_test.go
+++ b/banyand/metadata/schema/schema_suite_test.go
@@ -24,6 +24,7 @@ import (
. "github.com/onsi/gomega"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
func TestSchema(t *testing.T) {
@@ -35,6 +36,6 @@ func TestSchema(t *testing.T) {
var _ = ginkgo.BeforeSuite(func() {
Expect(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
})).To(Succeed())
})
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 91f1199..021c7eb 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -196,7 +196,9 @@ func familyIdentity(name string, flag []byte) []byte {
}
func (t *topNQueryProcessor) scanSeries(series tsdb.Series, request *measurev1.TopNRequest) ([]tsdb.Iterator, error) {
- seriesSpan, err := series.Span(timestamp.NewInclusiveTimeRange(
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ seriesSpan, err := series.Span(ctx, timestamp.NewInclusiveTimeRange(
request.GetTimeRange().GetBegin().AsTime(),
request.GetTimeRange().GetEnd().AsTime()),
)
diff --git a/banyand/stream/stream_suite_test.go b/banyand/stream/stream_suite_test.go
index 4f827eb..d3177ad 100644
--- a/banyand/stream/stream_suite_test.go
+++ b/banyand/stream/stream_suite_test.go
@@ -32,6 +32,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
)
@@ -44,7 +45,7 @@ func TestStream(t *testing.T) {
var _ = BeforeSuite(func() {
Expect(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
})).To(Succeed())
})
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 5dac816..af03580 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -18,6 +18,9 @@
package stream
import (
+ "context"
+ "time"
+
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -87,7 +90,9 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
return err
}
t := timestamp.MToN(tp)
- wp, err := series.Create(t)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ wp, err := series.Create(ctx, t)
if err != nil {
if wp != nil {
_ = wp.Close()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 3bc87ea..8d050f5 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -30,6 +30,7 @@ import (
"time"
"github.com/pkg/errors"
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
@@ -49,8 +50,11 @@ const (
componentSecondLSMIdx = "lsm"
defaultMainMemorySize = 8 << 20
+ defaultEnqueueTimeout = 500 * time.Millisecond
)
+var ErrBlockClosingInterrupted = errors.New("interrupt to close the block")
+
type block struct {
path string
l *logger.Logger
@@ -106,13 +110,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
deleted: &atomic.Bool{},
queue: opts.queue,
}
+ b.closed.Store(true)
b.options(ctx)
position := ctx.Value(common.PositionKey)
if position != nil {
b.position = position.(common.Position)
}
- return b, b.open()
+ return b, nil
}
func (b *block) options(ctx context.Context) {
@@ -187,24 +192,28 @@ func (b *block) open() (err error) {
return nil
}
-func (b *block) delegate() (BlockDelegate, error) {
+func (b *block) delegate(ctx context.Context) (BlockDelegate, error) {
if b.deleted.Load() {
- return nil, errors.WithMessagef(ErrBlockAbsent, "block %d is deleted", b.blockID)
+ return nil, errors.WithMessagef(ErrBlockAbsent, "block %s is deleted", b)
+ }
+ blockID := BlockID{
+ BlockID: b.blockID,
+ SegID: b.segID,
}
if b.incRef() {
+ b.queue.Touch(blockID)
return &bDelegate{
delegate: b,
}, nil
}
b.lock.Lock()
defer b.lock.Unlock()
- b.queue.Push(BlockID{
- BlockID: b.blockID,
- SegID: b.segID,
- })
- // TODO: remove the block which fails to open from the queue
- err := b.open()
- if err != nil {
+ if err := b.queue.Push(ctx, blockID, func() error {
+ if !b.Closed() {
+ return nil
+ }
+ return b.open()
+ }); err != nil {
b.l.Error().Err(err).Stringer("block", b).Msg("fail to open block")
return nil, err
}
@@ -240,14 +249,23 @@ loop:
goto loop
}
-func (b *block) waitDone() {
-loop:
- if b.ref.Load() < 1 {
- b.ref.Store(0)
- return
- }
- runtime.Gosched()
- goto loop
+func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} {
+ ch := make(chan struct{})
+ go func() {
+ loop:
+ if b.ref.Load() < 1 {
+ b.ref.Store(0)
+ close(ch)
+ return
+ }
+ if stopped.Load() {
+ close(ch)
+ return
+ }
+ runtime.Gosched()
+ goto loop
+ }()
+ return ch
}
func (b *block) flush() {
@@ -261,28 +279,37 @@ func (b *block) flush() {
}
}
-func (b *block) close() {
+func (b *block) close(ctx context.Context) (err error) {
b.lock.Lock()
defer b.lock.Unlock()
if b.closed.Load() {
- return
+ return nil
}
b.closed.Store(true)
- b.waitDone()
+ stopWaiting := &atomic.Bool{}
+ ch := b.waitDone(stopWaiting)
+ select {
+ case <-ctx.Done():
+ b.closed.Store(false)
+ stopWaiting.Store(true)
+ return errors.Wrapf(ErrBlockClosingInterrupted, "block:%s", b)
+ case <-ch:
+ }
for _, closer := range b.closableLst {
- _ = closer.Close()
+ err = multierr.Append(err, closer.Close())
}
close(b.stopCh)
+ return err
}
-func (b *block) stopThenClose() {
+func (b *block) stopThenClose(ctx context.Context) error {
if b.Reporter != nil {
b.Stop()
}
- b.close()
+ return b.close(ctx)
}
-func (b *block) delete() error {
+func (b *block) delete(ctx context.Context) error {
if b.deleted.Load() {
return nil
}
@@ -290,7 +317,7 @@ func (b *block) delete() error {
if b.Reporter != nil {
b.Stop()
}
- b.close()
+ b.close(ctx)
return os.RemoveAll(b.path)
}
@@ -299,7 +326,7 @@ func (b *block) Closed() bool {
}
func (b *block) String() string {
- return fmt.Sprintf("BlockID-%d-%d", b.segID, b.blockID)
+ return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.segID), parseSuffix(b.blockID))
}
func (b *block) stats() (names []string, stats []observability.Statistics) {
diff --git a/banyand/tsdb/bucket/bucket_suite_test.go b/banyand/tsdb/bucket/bucket_suite_test.go
index 59ca1fe..289cd7d 100644
--- a/banyand/tsdb/bucket/bucket_suite_test.go
+++ b/banyand/tsdb/bucket/bucket_suite_test.go
@@ -23,6 +23,7 @@ import (
. "github.com/onsi/gomega"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
func TestBucket(t *testing.T) {
@@ -30,7 +31,7 @@ func TestBucket(t *testing.T) {
BeforeSuite(func() {
Expect(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
})).Should(Succeed())
})
RunSpecs(t, "Bucket Suite")
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index 2b3dced..0bcf49a 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -17,29 +17,45 @@
package bucket
import (
+ "context"
"errors"
+ "fmt"
+ "io"
"sync"
+ "time"
"github.com/hashicorp/golang-lru/simplelru"
+ "github.com/robfig/cron/v3"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
-type EvictFn func(id interface{})
+type (
+ EvictFn func(ctx context.Context, id interface{}) error
+ OnAddRecentFn func() error
+)
type Queue interface {
- Push(id interface{})
- Remove(id interface{})
+ io.Closer
+ Touch(id fmt.Stringer) bool
+ Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error
+ Remove(id fmt.Stringer)
Len() int
+ Volume() int
All() []interface{}
}
const (
- DefaultRecentRatio = 0.25
- DefaultGhostEntries = 0.50
+ DefaultRecentRatio = 0.25
+
+ defaultEvictBatchSize = 10
)
var ErrInvalidSize = errors.New("invalid size")
type lruQueue struct {
+ l *logger.Logger
size int
recentSize int
evictSize int
@@ -49,15 +65,17 @@ type lruQueue struct {
frequent simplelru.LRUCache
recentEvict simplelru.LRUCache
lock sync.RWMutex
+
+ stopCh chan struct{}
}
-func NewQueue(size int, evictFn EvictFn) (Queue, error) {
+func NewQueue(logger *logger.Logger, size int, maxSize int, clock timestamp.Clock, evictFn EvictFn) (Queue, error) {
if size <= 0 {
return nil, ErrInvalidSize
}
recentSize := int(float64(size) * DefaultRecentRatio)
- evictSize := int(float64(size) * DefaultGhostEntries)
+ evictSize := maxSize - size
recent, err := simplelru.NewLRU(size, nil)
if err != nil {
@@ -79,37 +97,108 @@ func NewQueue(size int, evictFn EvictFn) (Queue, error) {
recentEvict: recentEvict,
evictSize: evictSize,
evictFn: evictFn,
+ l: logger,
+ stopCh: make(chan struct{}),
}
+ parser := cron.NewParser(cron.Second)
+ // every 60 seconds to clean up recentEvict
+ scheduler, err := parser.Parse("59")
+ if err != nil {
+ return nil, err
+ }
+ go func() {
+ now := clock.Now()
+ for {
+ next := scheduler.Next(now)
+ timer := clock.Timer(next.Sub(now))
+ select {
+ case now = <-timer.C:
+ c.l.Info().Time("now", now).Msg("wakes")
+ var evictLen int
+ c.lock.RLock()
+ evictLen = c.recentEvict.Len()
+ c.lock.RUnlock()
+ if evictLen < 1 {
+ continue
+ }
+ for i := 0; i < defaultEvictBatchSize; i++ {
+ c.lock.Lock()
+ if evictLen < 1 {
+ continue
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ if err := c.removeOldest(ctx, c.recentEvict); err != nil {
+ c.l.Error().Err(err).Msg("failed to remove oldest blocks")
+ }
+ cancel()
+ c.lock.Unlock()
+ }
+ case <-c.stopCh:
+ c.l.Info().Msg("stop")
+ timer.Stop()
+ return
+ }
+ }
+ }()
return c, nil
}
-func (q *lruQueue) Push(id interface{}) {
+func (q *lruQueue) Touch(id fmt.Stringer) bool {
q.lock.Lock()
defer q.lock.Unlock()
if q.frequent.Contains(id) {
+ q.l.Debug().Stringer("id", id).Msg("get from frequent")
+ return true
+ }
+
+ if q.recent.Contains(id) {
+ q.l.Debug().Stringer("id", id).Msg("promote from recent to frequent")
+ q.recent.Remove(id)
q.frequent.Add(id, nil)
- return
+ return true
+ }
+ return false
+}
+
+func (q *lruQueue) Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ if q.frequent.Contains(id) {
+ q.l.Debug().Stringer("id", id).Msg("push to frequent")
+ q.frequent.Add(id, nil)
+ return nil
}
if q.recent.Contains(id) {
+ q.l.Debug().Stringer("id", id).Msg("promote from recent to frequent")
q.recent.Remove(id)
q.frequent.Add(id, nil)
- return
+ return nil
}
if q.recentEvict.Contains(id) {
- q.ensureSpace(true)
+ q.l.Debug().Stringer("id", id).Msg("restore from recentEvict")
+ if err := q.ensureSpace(ctx, true); err != nil {
+ return err
+ }
q.recentEvict.Remove(id)
q.frequent.Add(id, nil)
- return
+ return nil
}
- q.ensureSpace(false)
+ if err := q.ensureSpace(ctx, false); err != nil {
+ return err
+ }
q.recent.Add(id, nil)
+ if fn == nil {
+ return nil
+ }
+ return fn()
}
-func (q *lruQueue) Remove(id interface{}) {
+func (q *lruQueue) Remove(id fmt.Stringer) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -134,6 +223,10 @@ func (q *lruQueue) Len() int {
return q.recent.Len() + q.frequent.Len()
}
+func (q *lruQueue) Volume() int {
+ return q.size + q.recentSize + q.evictSize
+}
+
func (q *lruQueue) All() []interface{} {
q.lock.RLock()
defer q.lock.RUnlock()
@@ -144,32 +237,50 @@ func (q *lruQueue) All() []interface{} {
return all
}
-func (q *lruQueue) ensureSpace(recentEvict bool) {
+func (q *lruQueue) ensureSpace(ctx context.Context, recentEvict bool) error {
recentLen := q.recent.Len()
freqLen := q.frequent.Len()
if recentLen+freqLen < q.size {
- return
+ return nil
}
if recentLen > 0 && (recentLen > q.recentSize || (recentLen == q.recentSize && !recentEvict)) {
- k, _, _ := q.recent.RemoveOldest()
- q.addLst(q.recentEvict, q.evictSize, k)
- return
+ k, _, ok := q.recent.GetOldest()
+ if !ok {
+ return errors.New("failed to get oldest from recent queue")
+ }
+ if err := q.addLst(ctx, q.recentEvict, q.evictSize, k); err != nil {
+ return err
+ }
+ q.recent.Remove(k)
+ return nil
}
- q.removeOldest(q.frequent)
+ return q.removeOldest(ctx, q.frequent)
}
-func (q *lruQueue) addLst(lst simplelru.LRUCache, size int, id interface{}) {
+func (q *lruQueue) addLst(ctx context.Context, lst simplelru.LRUCache, size int, id interface{}) error {
if lst.Len() < size {
lst.Add(id, nil)
- return
+ return nil
+ }
+ if err := q.removeOldest(ctx, lst); err != nil {
+ return err
}
- q.removeOldest(lst)
lst.Add(id, nil)
+ return nil
}
-func (q *lruQueue) removeOldest(lst simplelru.LRUCache) {
- oldestID, _, ok := lst.RemoveOldest()
+func (q *lruQueue) removeOldest(ctx context.Context, lst simplelru.LRUCache) error {
+ oldestID, _, ok := lst.GetOldest()
if ok && q.evictFn != nil {
- q.evictFn(oldestID)
+ if err := q.evictFn(ctx, oldestID); err != nil {
+ return err
+ }
+ _ = lst.Remove(oldestID)
}
+ return nil
+}
+
+func (q *lruQueue) Close() error {
+ close(q.stopCh)
+ return nil
}
diff --git a/banyand/tsdb/bucket/queue_test.go b/banyand/tsdb/bucket/queue_test.go
index 0eb6fe0..f256358 100644
--- a/banyand/tsdb/bucket/queue_test.go
+++ b/banyand/tsdb/bucket/queue_test.go
@@ -17,11 +17,17 @@
package bucket_test
import (
+ "context"
+ "strconv"
+ "time"
+
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gleak"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type queueEntryID struct {
@@ -29,6 +35,10 @@ type queueEntryID struct {
second uint16
}
+func (q queueEntryID) String() string {
+ return strconv.Itoa(int(q.first))
+}
+
func entryID(id uint16) queueEntryID {
return queueEntryID{
first: id,
@@ -37,26 +47,87 @@ func entryID(id uint16) queueEntryID {
}
var _ = Describe("Queue", func() {
+ var evictLst []queueEntryID
+ var l bucket.Queue
+ var clock timestamp.MockClock
BeforeEach(func() {
goods := gleak.Goroutines()
DeferCleanup(func() {
Eventually(gleak.Goroutines).ShouldNot(gleak.HaveLeaked(goods))
})
- })
- It("pushes data", func() {
- evictLst := make([]queueEntryID, 0)
- l, err := bucket.NewQueue(128, func(id interface{}) {
+ evictLst = make([]queueEntryID, 0)
+ clock = timestamp.NewMockClock()
+ clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+ var err error
+ l, err = bucket.NewQueue(logger.GetLogger("test"), 128, 192, clock, func(_ context.Context, id interface{}) error {
evictLst = append(evictLst, id.(queueEntryID))
+ return nil
})
Expect(err).ShouldNot(HaveOccurred())
-
+ DeferCleanup(func() {
+ evictLst = evictLst[:0]
+ Expect(l.Close()).To(Succeed())
+ })
+ })
+ It("pushes to recent", func() {
+ enRecentSize := 0
for i := 0; i < 256; i++ {
- l.Push(entryID(uint16(i)))
+ Expect(l.Push(context.Background(), entryID(uint16(i)), func() error {
+ enRecentSize++
+ return nil
+ })).To(Succeed())
}
+ Expect(enRecentSize).To(Equal(256))
Expect(l.Len()).To(Equal(128))
Expect(len(evictLst)).To(Equal(64))
for i := 0; i < 64; i++ {
Expect(evictLst[i]).To(Equal(entryID(uint16(i))))
}
})
+
+ It("promotes to frequent", func() {
+ enRecentSize := 0
+ for i := 0; i < 128; i++ {
+ Expect(l.Push(context.Background(), entryID(uint16(i)), func() error {
+ enRecentSize++
+ return nil
+ })).To(Succeed())
+ }
+ Expect(enRecentSize).To(Equal(128))
+ Expect(l.Len()).To(Equal(128))
+ Expect(len(evictLst)).To(Equal(0))
+ for i := 0; i < 64; i++ {
+ Expect(l.Touch(entryID(uint16(i)))).To(BeTrue())
+ }
+ enRecentSize = 0
+ for i := 128; i < 256; i++ {
+ Expect(l.Push(context.Background(), entryID(uint16(i)), func() error {
+ enRecentSize++
+ return nil
+ })).To(Succeed())
+ }
+ Expect(enRecentSize).To(Equal(128))
+ Expect(l.Len()).To(Equal(128))
+ Expect(len(evictLst)).To(Equal(64))
+ for i := 0; i < 64; i++ {
+ Expect(evictLst[i]).To(Equal(entryID(uint16(i + 64))))
+ }
+ })
+
+ It("cleans up evict queue", func() {
+ enRecentSize := 0
+
+ for i := 0; i < 192; i++ {
+ Expect(l.Push(context.Background(), entryID(uint16(i)), func() error {
+ enRecentSize++
+ return nil
+ })).To(Succeed())
+ }
+ Expect(enRecentSize).To(Equal(192))
+ Expect(l.Len()).To(Equal(128))
+ Expect(len(evictLst)).To(Equal(0))
+ clock.Add(time.Minute)
+ GinkgoWriter.Printf("evicted size:%d \n", len(evictLst))
+ Expect(len(evictLst)).To(BeNumerically(">", 1))
+ })
})
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 4a8d14e..6bf51f2 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -117,7 +117,7 @@ func (s *Strategy) observe(c Channel) bool {
select {
case status, more := <-c:
if !more {
- return true
+ return moreBucket
}
ratio := Ratio(status.Volume) / Ratio(status.Capacity)
atomic.StoreUint64(&s.currentRatio, math.Float64bits(float64(ratio)))
@@ -136,7 +136,7 @@ func (s *Strategy) observe(c Channel) bool {
if next != nil {
s.current.Store(next)
}
- return true
+ return moreBucket
}
case <-s.stopCh:
return false
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index e7ce863..c4281d0 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -28,7 +28,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type IndexDatabase interface {
@@ -97,7 +96,6 @@ type indexWriterBuilder struct {
scope Entry
segCtrl *segmentController
ts time.Time
- seg *segment
globalItemID *GlobalItemID
}
@@ -108,11 +106,6 @@ func (i *indexWriterBuilder) Scope(scope Entry) IndexWriterBuilder {
func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder {
i.ts = ts
- segs := i.segCtrl.span(timestamp.NewTimeRangeDuration(ts, 0, true, false))
- if len(segs) != 1 {
- return i
- }
- i.seg = segs[0]
return i
}
@@ -122,15 +115,16 @@ func (i *indexWriterBuilder) GlobalItemID(itemID GlobalItemID) IndexWriterBuilde
}
func (i *indexWriterBuilder) Build() (IndexWriter, error) {
- if i.seg == nil {
- return nil, errors.WithStack(ErrNoTime)
+ seg, err := i.segCtrl.create(i.segCtrl.Format(i.ts), false)
+ if err != nil {
+ return nil, err
}
if i.globalItemID == nil {
return nil, errors.WithStack(ErrNoVal)
}
return &indexWriter{
scope: i.scope,
- seg: i.seg,
+ seg: seg,
ts: i.ts,
itemID: i.globalItemID,
}, nil
diff --git a/banyand/tsdb/retention.go b/banyand/tsdb/retention.go
index ab61a8d..755bf41 100644
--- a/banyand/tsdb/retention.go
+++ b/banyand/tsdb/retention.go
@@ -18,6 +18,7 @@
package tsdb
import (
+ "context"
"sync"
"time"
@@ -76,19 +77,18 @@ func (rc *retentionController) run() {
for {
next := rc.scheduler.Next(now)
timer := rc.segment.clock.Timer(next.Sub(now))
- for {
- select {
- case now = <-timer.C:
- rc.l.Info().Time("now", now).Msg("wake")
- if err := rc.segment.remove(now.Add(-rc.duration)); err != nil {
- rc.l.Error().Err(err)
- }
- case <-rc.stopCh:
- timer.Stop()
- rc.l.Info().Msg("stop")
- return
+ select {
+ case now = <-timer.C:
+ rc.l.Info().Time("now", now).Msg("wake")
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
+ if err := rc.segment.remove(ctx, now.Add(-rc.duration)); err != nil {
+ rc.l.Error().Err(err)
}
- break
+ cancel()
+ case <-rc.stopCh:
+ timer.Stop()
+ rc.l.Info().Msg("stop")
+ return
}
}
}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 3393f67..6d08e2f 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -111,10 +111,14 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
return s, nil
}
-func (s *segment) close() {
- s.blockController.close()
+func (s *segment) close(ctx context.Context) error {
+ if err := s.blockController.close(ctx); err != nil {
+ return err
+ }
if s.globalIndex != nil {
- s.globalIndex.Close()
+ if err := s.globalIndex.Close(); err != nil {
+ return err
+ }
}
if s.blockManageStrategy != nil {
s.blockManageStrategy.Close()
@@ -122,19 +126,22 @@ func (s *segment) close() {
if s.Reporter != nil {
s.Stop()
}
+ return nil
}
-func (s *segment) closeBlock(id uint16) {
- s.blockController.closeBlock(id)
+func (s *segment) closeBlock(ctx context.Context, id uint16) error {
+ return s.blockController.closeBlock(ctx, id)
}
-func (s *segment) delete() error {
- s.close()
+func (s *segment) delete(ctx context.Context) error {
+ if err := s.close(ctx); err != nil {
+ return err
+ }
return os.RemoveAll(s.path)
}
func (s segment) String() string {
- return fmt.Sprintf("SegID-%d", s.id)
+ return fmt.Sprintf("SegID-%d", parseSuffix(s.id))
}
func (s *segment) Stats() observability.Statistics {
@@ -177,7 +184,7 @@ func newBlockController(segCtx context.Context, segID uint16, location string, s
func (bc *blockController) Current() (bucket.Reporter, error) {
now := bc.clock.Now()
ns := uint64(now.UnixNano())
- if b := func() bucket.Reporter {
+ if b := func() *block {
bc.RLock()
defer bc.RUnlock()
for _, s := range bc.lst {
@@ -187,9 +194,14 @@ func (bc *blockController) Current() (bucket.Reporter, error) {
}
return nil
}(); b != nil {
+ if b.Closed() {
+ if err := b.open(); err != nil {
+ return nil, err
+ }
+ }
return b, nil
}
- return bc.create(now)
+ return bc.newHeadBlock(now)
}
func (bc *blockController) Next() (bucket.Reporter, error) {
@@ -198,17 +210,34 @@ func (bc *blockController) Next() (bucket.Reporter, error) {
return nil, err
}
b := c.(*block)
- return bc.create(bc.blockSize.NextTime(b.Start))
+
+ return bc.newHeadBlock(bc.blockSize.NextTime(b.Start))
+}
+
+func (bc *blockController) newHeadBlock(now time.Time) (*block, error) {
+ b, err := bc.create(now)
+ if err != nil {
+ return nil, err
+ }
+ return b, nil
}
func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
event := bc.l.Info()
if prev != nil {
event.Stringer("prev", prev)
- bc.blockQueue.Push(BlockID{
+ b := prev.(*block)
+ ctx, cancel := context.WithTimeout(context.Background(), defaultEnqueueTimeout)
+ defer cancel()
+ if err := bc.blockQueue.Push(ctx, BlockID{
SegID: bc.segID,
- BlockID: prev.(*block).blockID,
- })
+ BlockID: b.blockID,
+ }, nil); err != nil {
+ bc.l.Debug().Err(err).Msg("failed to push a expired head block to the queue")
+ ctxClosing, cancelClosing := context.WithTimeout(context.Background(), defaultEnqueueTimeout)
+ defer cancelClosing()
+ b.close(ctxClosing)
+ }
}
if next != nil {
event.Stringer("next", next)
@@ -236,7 +265,7 @@ func (bc *blockController) Parse(value string) (time.Time, error) {
panic("invalid interval unit")
}
-func (bc *blockController) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error) {
+func (bc *blockController) span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error) {
bb := bc.search(func(b *block) bool {
return b.Overlapping(timeRange)
})
@@ -245,7 +274,7 @@ func (bc *blockController) span(timeRange timestamp.TimeRange) ([]BlockDelegate,
}
dd := make([]BlockDelegate, len(bb))
for i, b := range bb {
- d, err := b.delegate()
+ d, err := b.delegate(ctx)
if err != nil {
return nil, err
}
@@ -254,10 +283,10 @@ func (bc *blockController) span(timeRange timestamp.TimeRange) ([]BlockDelegate,
return dd, nil
}
-func (bc *blockController) get(blockID uint16) (BlockDelegate, error) {
+func (bc *blockController) get(ctx context.Context, blockID uint16) (BlockDelegate, error) {
b := bc.getBlock(blockID)
if b != nil {
- return b.delegate()
+ return b.delegate(ctx)
}
return nil, nil
}
@@ -297,14 +326,14 @@ func (bc *blockController) search(matcher func(*block) bool) (bb []*block) {
return bb
}
-func (bc *blockController) closeBlock(blockID uint16) {
+func (bc *blockController) closeBlock(ctx context.Context, blockID uint16) error {
bc.RLock()
- defer bc.RUnlock()
b := bc.getBlock(blockID)
+ bc.RUnlock()
if b == nil {
- return
+ return nil
}
- b.close()
+ return b.close(ctx)
}
func (bc *blockController) startTime(suffix string) (time.Time, error) {
@@ -327,7 +356,7 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) {
func (bc *blockController) open() error {
return WalkDir(
bc.location,
- segPathPrefix,
+ blockPathPrefix,
func(suffix, absolutePath string) error {
bc.Lock()
defer bc.Unlock()
@@ -355,7 +384,15 @@ func (bc *blockController) create(startTime time.Time) (*block, error) {
if err != nil {
return nil, err
}
- return bc.load(suffix, segPath)
+ b, err := bc.load(suffix, segPath)
+ if err != nil {
+ return nil, err
+ }
+ err = b.open()
+ if err != nil {
+ return nil, err
+ }
+ return b, nil
}
func (bc *blockController) load(suffix, path string) (b *block, err error) {
@@ -393,17 +430,18 @@ func (bc *blockController) sortLst() {
})
}
-func (bc *blockController) close() {
+func (bc *blockController) close(ctx context.Context) (err error) {
for _, s := range bc.lst {
- s.stopThenClose()
+ err = multierr.Append(err, s.stopThenClose(ctx))
}
+ return err
}
-func (bc *blockController) remove(deadline time.Time) (err error) {
+func (bc *blockController) remove(ctx context.Context, deadline time.Time) (err error) {
for _, b := range bc.blocks() {
if b.End.Before(deadline) {
bc.Lock()
- if errDel := b.delete(); errDel != nil {
+ if errDel := b.delete(ctx); errDel != nil {
err = multierr.Append(err, errDel)
} else {
b.queue.Remove(BlockID{
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index c0956de..09d62aa 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -75,9 +75,9 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
type Series interface {
ID() common.SeriesID
- Span(timeRange timestamp.TimeRange) (SeriesSpan, error)
- Create(t time.Time) (SeriesSpan, error)
- Get(id GlobalItemID) (Item, io.Closer, error)
+ Span(ctx context.Context, timeRange timestamp.TimeRange) (SeriesSpan, error)
+ Create(ctx context.Context, t time.Time) (SeriesSpan, error)
+ Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, error)
}
type SeriesSpan interface {
@@ -95,8 +95,8 @@ type series struct {
l *logger.Logger
}
-func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
- b, err := s.blockDB.block(id)
+func (s *series) Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, error) {
+ b, err := s.blockDB.block(ctx, id)
if err != nil {
return nil, nil, err
}
@@ -114,8 +114,8 @@ func (s *series) ID() common.SeriesID {
return s.id
}
-func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) {
- blocks, err := s.blockDB.span(timeRange)
+func (s *series) Span(ctx context.Context, timeRange timestamp.TimeRange) (SeriesSpan, error) {
+ blocks, err := s.blockDB.span(ctx, timeRange)
if err != nil {
return nil, err
}
@@ -128,9 +128,9 @@ func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) {
return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil
}
-func (s *series) Create(t time.Time) (SeriesSpan, error) {
+func (s *series) Create(ctx context.Context, t time.Time) (SeriesSpan, error) {
tr := timestamp.NewInclusiveTimeRange(t, t)
- blocks, err := s.blockDB.span(tr)
+ blocks, err := s.blockDB.span(ctx, tr)
if err != nil {
return nil, err
}
@@ -140,7 +140,7 @@ func (s *series) Create(t time.Time) (SeriesSpan, error) {
Msg("load a series span")
return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil
}
- b, err := s.blockDB.create(t)
+ b, err := s.blockDB.create(ctx, t)
if err != nil {
return nil, err
}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 7825338..b878b38 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -148,9 +148,9 @@ type SeriesDatabase interface {
type blockDatabase interface {
shardID() common.ShardID
- span(timeRange timestamp.TimeRange) ([]BlockDelegate, error)
- create(ts time.Time) (BlockDelegate, error)
- block(id GlobalItemID) (BlockDelegate, error)
+ span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error)
+ create(ctx context.Context, ts time.Time) (BlockDelegate, error)
+ block(ctx context.Context, id GlobalItemID) (BlockDelegate, error)
}
var (
@@ -189,12 +189,12 @@ func (s *seriesDB) GetByID(id common.SeriesID) (Series, error) {
return newSeries(s.context(), id, s), nil
}
-func (s *seriesDB) block(id GlobalItemID) (BlockDelegate, error) {
+func (s *seriesDB) block(ctx context.Context, id GlobalItemID) (BlockDelegate, error) {
seg := s.segCtrl.get(id.segID)
if seg == nil {
return nil, nil
}
- return seg.blockController.get(id.blockID)
+ return seg.blockController.get(ctx, id.blockID)
}
func (s *seriesDB) shardID() common.ShardID {
@@ -251,11 +251,11 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
return result, err
}
-func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error) {
+func (s *seriesDB) span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error) {
// TODO: return correct blocks
result := make([]BlockDelegate, 0)
for _, s := range s.segCtrl.span(timeRange) {
- dd, err := s.blockController.span(timeRange)
+ dd, err := s.blockController.span(ctx, timeRange)
if err != nil {
return nil, err
}
@@ -267,14 +267,14 @@ func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error)
return result, nil
}
-func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) {
+func (s *seriesDB) create(ctx context.Context, ts time.Time) (BlockDelegate, error) {
s.Lock()
defer s.Unlock()
timeRange := timestamp.NewInclusiveTimeRange(ts, ts)
ss := s.segCtrl.span(timeRange)
if len(ss) > 0 {
s := ss[0]
- dd, err := s.blockController.span(timeRange)
+ dd, err := s.blockController.span(ctx, timeRange)
if err != nil {
return nil, err
}
@@ -285,7 +285,7 @@ func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) {
if err != nil {
return nil, err
}
- return block.delegate()
+ return block.delegate(ctx)
}
seg, err := s.segCtrl.create(s.segCtrl.Format(timeRange.Start), false)
if err != nil {
@@ -295,7 +295,7 @@ func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) {
if err != nil {
return nil, err
}
- return block.delegate()
+ return block.delegate(ctx)
}
func (s *seriesDB) context() context.Context {
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index f5b1875..3c4623c 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
func TestEntity(t *testing.T) {
@@ -355,7 +356,7 @@ func Test_SeriesDatabase_Get(t *testing.T) {
tester := assert.New(t)
tester.NoError(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
}))
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -376,7 +377,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
tester := assert.New(t)
tester.NoError(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
}))
dir, deferFunc := test.Space(require.New(t))
defer deferFunc()
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 277ee03..b77a232 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -34,8 +34,9 @@ import (
)
const (
- defaultBlockQueueSize = 1 << 4
- defaultKVMemorySize = 1 << 20
+ defaultBlockQueueSize = 4
+ defaultMaxBlockQueueSize = 64
+ defaultKVMemorySize = 1 << 20
)
var _ Shard = (*shard)(nil)
@@ -54,7 +55,7 @@ type shard struct {
}
func OpenShard(ctx context.Context, id common.ShardID,
- root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize int,
+ root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize, maxOpenedBlockSize int,
) (Shard, error) {
path, err := mkdir(shardTemplate, root, int(id))
if err != nil {
@@ -70,7 +71,7 @@ func OpenShard(ctx context.Context, id common.ShardID,
p.Shard = strconv.Itoa(int(id))
return p
})
- sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, l)
+ sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, maxOpenedBlockSize, l)
if err != nil {
return nil, errors.Wrapf(err, "create the segment controller of the shard %d", int(id))
}
@@ -164,8 +165,9 @@ func (s *shard) State() (shardState ShardState) {
func (s *shard) Close() error {
s.retentionController.stop()
s.segmentManageStrategy.Close()
- s.segmentController.close()
- err := s.seriesDatabase.Close()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ err := multierr.Combine(s.segmentController.close(ctx), s.seriesDatabase.Close())
close(s.stopCh)
return err
}
@@ -236,7 +238,7 @@ type segmentController struct {
}
func newSegmentController(shardCtx context.Context, location string, segmentSize, blockSize IntervalRule,
- openedBlockSize int, l *logger.Logger,
+ openedBlockSize, maxOpenedBlockSize int, l *logger.Logger,
) (*segmentController, error) {
clock, _ := timestamp.GetClock(shardCtx)
sc := &segmentController{
@@ -248,16 +250,21 @@ func newSegmentController(shardCtx context.Context, location string, segmentSize
clock: clock,
}
var err error
- sc.blockQueue, err = bucket.NewQueue(openedBlockSize, func(id interface{}) {
- bsID := id.(BlockID)
- seg := sc.get(bsID.SegID)
- if seg == nil {
- l.Warn().Uint16("segID", bsID.SegID).Msg("segment is absent")
- return
- }
- l.Info().Uint16("blockID", bsID.BlockID).Uint16("segID", bsID.SegID).Msg("closing the block")
- seg.closeBlock(bsID.BlockID)
- })
+ sc.blockQueue, err = bucket.NewQueue(
+ l.Named("block-queue"),
+ openedBlockSize,
+ maxOpenedBlockSize,
+ clock,
+ func(ctx context.Context, id interface{}) error {
+ bsID := id.(BlockID)
+ seg := sc.get(bsID.SegID)
+ if seg == nil {
+ l.Warn().Int("segID", parseSuffix(bsID.SegID)).Msg("segment is absent")
+ return nil
+ }
+ l.Info().Uint16("blockID", bsID.BlockID).Int("segID", parseSuffix(bsID.SegID)).Msg("closing the block")
+ return seg.closeBlock(ctx, bsID.BlockID)
+ })
return sc, err
}
@@ -409,14 +416,14 @@ func (sc *segmentController) load(suffix, path string, createBlockIfEmpty bool)
return seg, nil
}
-func (sc *segmentController) remove(deadline time.Time) (err error) {
+func (sc *segmentController) remove(ctx context.Context, deadline time.Time) (err error) {
sc.l.Info().Time("deadline", deadline).Msg("start to remove before deadline")
for _, s := range sc.segments() {
if s.End.Before(deadline) || s.Contains(uint64(deadline.UnixNano())) {
- err = multierr.Append(err, s.blockController.remove(deadline))
+ err = multierr.Append(err, s.blockController.remove(ctx, deadline))
if s.End.Before(deadline) {
sc.Lock()
- if errDel := s.delete(); errDel != nil {
+ if errDel := s.delete(ctx); errDel != nil {
err = multierr.Append(err, errDel)
} else {
sc.removeSeg(s.id)
@@ -437,8 +444,10 @@ func (sc *segmentController) removeSeg(segID uint16) {
}
}
-func (sc *segmentController) close() {
+func (sc *segmentController) close(ctx context.Context) (err error) {
for _, s := range sc.lst {
- s.close()
+ err = multierr.Append(err, s.close(ctx))
}
+ err = multierr.Append(err, sc.blockQueue.Close())
+ return err
}
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 1e1197e..6b66ea4 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -70,13 +71,14 @@ var _ = Describe("Shard", func() {
Num: 7,
},
2,
+ 3,
)
Expect(err).NotTo(HaveOccurred())
By("01/01 00:00 1st block is opened")
t1 := clock.Now()
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -93,7 +95,7 @@ var _ = Describe("Shard", func() {
GinkgoWriter.Println("01/01 10:00 has been triggered")
}
return shard.State().Blocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -111,7 +113,7 @@ var _ = Describe("Shard", func() {
}))
Eventually(func() []tsdb.BlockID {
return shard.State().OpenBlocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
By("01/01 13:00 moves to the 2nd block")
clock.Add(3 * time.Hour)
Eventually(func() []tsdb.BlockID {
@@ -119,7 +121,7 @@ var _ = Describe("Shard", func() {
GinkgoWriter.Println("01/01 13:00 has been triggered")
}
return shard.State().OpenBlocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -133,7 +135,7 @@ var _ = Describe("Shard", func() {
GinkgoWriter.Println("01/01 22:00 has been triggered")
}
return shard.State().Blocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -163,7 +165,7 @@ var _ = Describe("Shard", func() {
GinkgoWriter.Println("01/02 01:00 has been triggered")
}
return shard.State().OpenBlocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -181,7 +183,7 @@ var _ = Describe("Shard", func() {
GinkgoWriter.Println("01/02 10:00 has been triggered")
}
return shard.State().Blocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -218,7 +220,7 @@ var _ = Describe("Shard", func() {
GinkgoWriter.Println("01/02 13:00 has been triggered")
}
return shard.State().OpenBlocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -240,7 +242,7 @@ var _ = Describe("Shard", func() {
GinkgoWriter.Println("01/02 22:00 has been triggered")
}
return shard.State().Blocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -284,11 +286,7 @@ var _ = Describe("Shard", func() {
GinkgoWriter.Println("01/03 01:00 has been triggered")
}
return shard.State().OpenBlocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
- {
- SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
- BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
- },
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -300,7 +298,7 @@ var _ = Describe("Shard", func() {
}))
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -315,6 +313,7 @@ var _ = Describe("Shard", func() {
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
},
TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+ Closed: true,
},
{
ID: tsdb.BlockID{
@@ -342,7 +341,7 @@ var _ = Describe("Shard", func() {
series, err := shard.Series().GetByID(common.SeriesID(11))
Expect(err).NotTo(HaveOccurred())
t1Range := timestamp.NewInclusiveTimeRangeDuration(t1, 1*time.Hour)
- span, err := series.Span(t1Range)
+ span, err := series.Span(context.Background(), t1Range)
Expect(err).NotTo(HaveOccurred())
defer span.Close()
writer, err := span.WriterBuilder().Family([]byte("test"), []byte("test")).Time(t1Range.End).Build()
@@ -351,14 +350,14 @@ var _ = Describe("Shard", func() {
Expect(err).NotTo(HaveOccurred())
Eventually(func() []tsdb.BlockID {
return shard.State().OpenBlocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
},
{
- SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
- BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
},
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
@@ -367,7 +366,7 @@ var _ = Describe("Shard", func() {
}))
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -381,6 +380,7 @@ var _ = Describe("Shard", func() {
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
},
TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+ Closed: true,
},
{
ID: tsdb.BlockID{
@@ -388,7 +388,6 @@ var _ = Describe("Shard", func() {
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
},
TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
- Closed: true,
},
{
ID: tsdb.BlockID{
@@ -422,6 +421,7 @@ var _ = Describe("Shard", func() {
Num: 1,
},
10,
+ 15,
)
Expect(err).NotTo(HaveOccurred())
By("open 4 blocks")
@@ -451,7 +451,7 @@ var _ = Describe("Shard", func() {
GinkgoWriter.Println("01/02 13:00 has been triggered")
}
return shard.State().Blocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -497,7 +497,7 @@ var _ = Describe("Shard", func() {
GinkgoWriter.Println("01/03 01:00 has been triggered")
}
return shard.State().Blocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
@@ -558,13 +558,14 @@ var _ = Describe("Shard", func() {
Num: 7,
},
2,
+ 3,
)
Expect(err).NotTo(HaveOccurred())
By("01/01 00:01 1st block is opened")
t1 := clock.Now()
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -581,7 +582,7 @@ var _ = Describe("Shard", func() {
GinkgoWriter.Println("01/01 10:00 has been triggered")
}
return shard.State().Blocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -600,7 +601,7 @@ var _ = Describe("Shard", func() {
}))
Eventually(func() []tsdb.BlockID {
return shard.State().OpenBlocks
- }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
+ }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
})
})
})
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 5710837..42bb4f5 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -107,10 +107,18 @@ type BlockID struct {
BlockID uint16
}
+func (b BlockID) String() string {
+ return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.SegID), parseSuffix(b.BlockID))
+}
+
func GenerateInternalID(unit IntervalUnit, suffix int) uint16 {
return uint16(unit)<<12 | ((uint16(suffix) << 4) >> 4)
}
+func parseSuffix(id uint16) int {
+ return int((id << 12) >> 12)
+}
+
type BlockState struct {
ID BlockID
TimeRange timestamp.TimeRange
@@ -211,7 +219,7 @@ func createDatabase(ctx context.Context, db *database, startID int) (Database, e
for i := startID; i < int(db.shardNum); i++ {
db.logger.Info().Int("shard_id", i).Msg("creating a shard")
so, errNewShard := OpenShard(ctx, common.ShardID(i),
- db.location, db.segmentSize, db.blockSize, db.ttl, defaultBlockQueueSize)
+ db.location, db.segmentSize, db.blockSize, db.ttl, defaultBlockQueueSize, defaultMaxBlockQueueSize)
if errNewShard != nil {
err = multierr.Append(err, errNewShard)
continue
@@ -243,6 +251,7 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) {
db.blockSize,
db.ttl,
defaultBlockQueueSize,
+ defaultMaxBlockQueueSize,
)
if errOpenShard != nil {
return errOpenShard
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index cd11765..52e1157 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -18,16 +18,14 @@ package tsdb_test
import (
"testing"
- "time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
-var defaultEventuallyTimeout = 30 * time.Second
-
func TestTsdb(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Tsdb Suite")
@@ -36,6 +34,6 @@ func TestTsdb(t *testing.T) {
var _ = BeforeSuite(func() {
Expect(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
})).Should(Succeed())
})
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 684269f..5a482a8 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -100,7 +101,7 @@ func verifyDatabaseStructure(tester *assert.Assertions, tempDir string, now time
func openDatabase(ctx context.Context, t *require.Assertions, path string) (db Database) {
t.NoError(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
}))
db, err := OpenDatabase(
context.WithValue(ctx, logger.ContextKey, logger.GetLogger("test")),
diff --git a/bydbctl/internal/cmd/cmd_suite_test.go b/bydbctl/internal/cmd/cmd_suite_test.go
index 6dc56ae..dc563e3 100644
--- a/bydbctl/internal/cmd/cmd_suite_test.go
+++ b/bydbctl/internal/cmd/cmd_suite_test.go
@@ -24,6 +24,7 @@ import (
. "github.com/onsi/gomega"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
func TestCmd(t *testing.T) {
@@ -34,6 +35,6 @@ func TestCmd(t *testing.T) {
var _ = BeforeSuite(func() {
Expect(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
})).To(Succeed())
})
diff --git a/docs/crud/measure/query.md b/docs/crud/measure/query.md
index 5dd8c1d..40639a7 100644
--- a/docs/crud/measure/query.md
+++ b/docs/crud/measure/query.md
@@ -43,6 +43,7 @@ fieldProjection:
timeRange:
begin: 2022-10-15T22:32:48Z
end: 2022-10-15T23:32:48Z
+EOF
```
The below command could query data in the last 30 minutes using relative time duration :
@@ -58,6 +59,7 @@ tagProjection:
tags: ["id", "entity_id"]
fieldProjection:
names: ["total", "value"]
+EOF
```
## API Reference
diff --git a/docs/crud/stream/query.md b/docs/crud/stream/query.md
index cc7dea3..620f572 100644
--- a/docs/crud/stream/query.md
+++ b/docs/crud/stream/query.md
@@ -43,6 +43,7 @@ projection:
timeRange:
begin: 2022-10-15T22:32:48+08:00
end: 2022-10-15T23:32:48+08:00
+EOF
```
The below command could query data in the last 30 minutes using relative time duration :
@@ -58,6 +59,7 @@ projection:
tags: ["trace_id"]
- name: "data"
tags: ["data_binary"]
+EOF
```
## API Reference
diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go
index 4d889cd..fda5215 100644
--- a/pkg/index/inverted/inverted_test.go
+++ b/pkg/index/inverted/inverted_test.go
@@ -32,6 +32,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/testcases"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
var serviceName = index.FieldKey{
@@ -173,7 +174,7 @@ func TestStore_Iterator(t *testing.T) {
func setUp(t *require.Assertions) (tempDir string, deferFunc func()) {
t.NoError(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
}))
tempDir, deferFunc = test.Space(t)
return tempDir, deferFunc
diff --git a/pkg/index/lsm/lsm_test.go b/pkg/index/lsm/lsm_test.go
index c0de013..4af5eb5 100644
--- a/pkg/index/lsm/lsm_test.go
+++ b/pkg/index/lsm/lsm_test.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/testcases"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
func TestStore_MatchTerm(t *testing.T) {
@@ -63,7 +64,7 @@ func TestStore_Iterator(t *testing.T) {
func setUp(t *require.Assertions) (tempDir string, deferFunc func()) {
t.NoError(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
}))
tempDir, deferFunc = test.Space(t)
return tempDir, deferFunc
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 3537ce7..716be97 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -19,6 +19,9 @@ package logical
import (
"bytes"
+ "context"
+ "io"
+ "time"
"github.com/pkg/errors"
@@ -101,16 +104,15 @@ func ProjectItem(ec executor.ExecutionContext, item tsdb.Item, projectionFieldRe
// This method is used by the underlying tableScan and localIndexScan plans.
func ExecuteForShard(series tsdb.SeriesList, timeRange timestamp.TimeRange,
builders ...SeekerBuilder,
-) ([]tsdb.Iterator, error) {
+) ([]tsdb.Iterator, []io.Closer, error) {
var itersInShard []tsdb.Iterator
+ var closers []io.Closer
for _, seriesFound := range series {
itersInSeries, err := func() ([]tsdb.Iterator, error) {
- sp, errInner := seriesFound.Span(timeRange)
- defer func(sp tsdb.SeriesSpan) {
- if sp != nil {
- _ = sp.Close()
- }
- }(sp)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ sp, errInner := seriesFound.Span(ctx, timeRange)
+ closers = append(closers, sp)
if errInner != nil {
return nil, errInner
}
@@ -129,16 +131,16 @@ func ExecuteForShard(series tsdb.SeriesList, timeRange timestamp.TimeRange,
return iters, nil
}()
if err != nil {
- return nil, err
+ return nil, nil, err
}
if len(itersInSeries) > 0 {
itersInShard = append(itersInShard, itersInSeries...)
}
}
- return itersInShard, nil
+ return itersInShard, closers, nil
}
-var DefaultLimit uint32 = 100
+var DefaultLimit uint32 = 20
type Tag struct {
familyName, name string
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index c9147d4..69df5b3 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -19,6 +19,7 @@ package measure
import (
"fmt"
+ "io"
"time"
"go.uber.org/multierr"
@@ -132,7 +133,14 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor.
b.Filter(i.filter)
})
}
- iters, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...)
+ iters, closers, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...)
+ if len(closers) > 0 {
+ defer func(closers []io.Closer) {
+ for _, c := range closers {
+ _ = c.Close()
+ }
+ }(closers)
+ }
if innerErr != nil {
return nil, innerErr
}
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_global.go b/pkg/query/logical/stream/stream_plan_indexscan_global.go
index 75549e1..549a83d 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_global.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_global.go
@@ -18,6 +18,7 @@
package stream
import (
+ "context"
"fmt"
"io"
"time"
@@ -96,7 +97,9 @@ func (t *globalIndexScan) executeForShard(ec executor.StreamExecutionContext, sh
return elementsInShard, errors.WithStack(err)
}
err = func() error {
- item, closer, errInner := series.Get(itemID)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ item, closer, errInner := series.Get(ctx, itemID)
defer func(closer io.Closer) {
if closer != nil {
_ = closer.Close()
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 6cc4403..b33e4ff 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -19,6 +19,7 @@ package stream
import (
"fmt"
+ "io"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -77,7 +78,14 @@ func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) ([]*streamv
b.Filter(i.filter)
})
}
- iters, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...)
+ iters, closers, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...)
+ if len(closers) > 0 {
+ defer func(closers []io.Closer) {
+ for _, c := range closers {
+ _ = c.Close()
+ }
+ }(closers)
+ }
if innerErr != nil {
return nil, innerErr
}
diff --git a/banyand/tsdb/bucket/bucket_suite_test.go b/pkg/test/flags/flags.go
similarity index 70%
copy from banyand/tsdb/bucket/bucket_suite_test.go
copy to pkg/test/flags/flags.go
index 59ca1fe..b2f83f5 100644
--- a/banyand/tsdb/bucket/bucket_suite_test.go
+++ b/pkg/test/flags/flags.go
@@ -14,24 +14,26 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package bucket_test
+package flags
import (
- "testing"
-
- . "github.com/onsi/ginkgo/v2"
- . "github.com/onsi/gomega"
+ "time"
+)
- "github.com/apache/skywalking-banyandb/pkg/logger"
+var (
+ eventuallyTimeout string
+ EventuallyTimeout time.Duration
+ LogLevel = "debug"
)
-func TestBucket(t *testing.T) {
- RegisterFailHandler(Fail)
- BeforeSuite(func() {
- Expect(logger.Init(logger.Logging{
- Env: "dev",
- Level: "warn",
- })).Should(Succeed())
- })
- RunSpecs(t, "Bucket Suite")
+func init() {
+ if eventuallyTimeout == "" {
+ EventuallyTimeout = time.Second * 3
+ return
+ }
+ d, err := time.ParseDuration(eventuallyTimeout)
+ if err != nil {
+ panic(err)
+ }
+ EventuallyTimeout = d
}
diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go
index 9b3a190..849a059 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -51,7 +52,7 @@ var (
var _ = SynchronizedBeforeSuite(func() []byte {
Expect(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
})).To(Succeed())
var addr string
addr, _, deferFunc = setup.SetUp()
diff --git a/test/integration/query/query_suite_test.go b/test/integration/load/load_suite_test.go
similarity index 61%
copy from test/integration/query/query_suite_test.go
copy to test/integration/load/load_suite_test.go
index 0bb67be..b7430f2 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/load/load_suite_test.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package integration_query_test
+package integration_load_test
import (
"testing"
@@ -29,17 +29,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
- "github.com/apache/skywalking-banyandb/pkg/timestamp"
- cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
- cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
cases_stream_data "github.com/apache/skywalking-banyandb/test/cases/stream/data"
- cases_topn "github.com/apache/skywalking-banyandb/test/cases/topn"
)
-func TestIntegrationQuery(t *testing.T) {
+func TestIntegrationLoad(t *testing.T) {
RegisterFailHandler(Fail)
- RunSpecs(t, "Integration Query Suite")
+ RunSpecs(t, "Integration Load Suite", Label("integration", "slow"))
}
var (
@@ -60,18 +56,26 @@ var _ = SynchronizedBeforeSuite(func() []byte {
grpclib.WithTransportCredentials(insecure.NewCredentials()),
)
Expect(err).NotTo(HaveOccurred())
- ns := timestamp.NowMilli().UnixNano()
- now = time.Unix(0, ns-ns%int64(time.Minute))
- interval := 500 * time.Millisecond
- // stream
- cases_stream_data.Write(conn, "data.json", now, interval)
- // measure
- interval = time.Minute
- cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
- cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
- cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
- cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
- cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+ days := 7
+ hours := 24
+ minutes := 60
+ interval := 10 * time.Second
+ c := time.Now()
+ for i := 0; i < days; i++ {
+ date := c.Add(-time.Hour * time.Duration((days-i)*24))
+ for h := 0; h < hours; h++ {
+ hour := date.Add(time.Hour * time.Duration(h))
+ start := time.Now()
+ for j := 0; j < minutes; j++ {
+ n := hour.Add(time.Minute * time.Duration(j))
+ ns := n.UnixNano()
+ now = time.Unix(0, ns-ns%int64(time.Minute))
+ // stream
+ cases_stream_data.Write(conn, "data.json", now, interval)
+ }
+ GinkgoWriter.Printf("written stream in %s took %s \n", hour, time.Since(start))
+ }
+ }
Expect(conn.Close()).To(Succeed())
return []byte(addr)
}, func(address []byte) {
@@ -85,14 +89,6 @@ var _ = SynchronizedBeforeSuite(func() []byte {
Connection: connection,
BaseTime: now,
}
- cases_measure.SharedContext = helpers.SharedContext{
- Connection: connection,
- BaseTime: now,
- }
- cases_topn.SharedContext = helpers.SharedContext{
- Connection: connection,
- BaseTime: now,
- }
Expect(err).NotTo(HaveOccurred())
})
diff --git a/test/integration/other/other_suite_test.go b/test/integration/other/other_suite_test.go
index 4eaee84..eaf2474 100644
--- a/test/integration/other/other_suite_test.go
+++ b/test/integration/other/other_suite_test.go
@@ -24,6 +24,7 @@ import (
gm "github.com/onsi/gomega"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
func TestIntegrationOther(t *testing.T) {
@@ -34,6 +35,6 @@ func TestIntegrationOther(t *testing.T) {
var _ = g.BeforeSuite(func() {
gm.Expect(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
})).To(gm.Succeed())
})
diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go
index 0bb67be..69be763 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -51,7 +52,7 @@ var (
var _ = SynchronizedBeforeSuite(func() []byte {
Expect(logger.Init(logger.Logging{
Env: "dev",
- Level: "warn",
+ Level: flags.LogLevel,
})).To(Succeed())
var addr string
addr, _, deferFunc = setup.SetUp()