You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/04 08:40:05 UTC

[skywalking-banyandb] branch main updated: Add load test and inject testing flags (#204)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 28b5fd5  Add load test and inject testing flags (#204)
28b5fd5 is described below

commit 28b5fd5f23dee6bc4ea1ed729f7da1956ad257d0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Nov 4 16:40:00 2022 +0800

    Add load test and inject testing flags (#204)
    
    * Add a slow load test case to CI
    * Update max opened blocks strategy. A shard could open 64 blocks
      at most. A cleanup goroutine could collect 10 inactive oldest blocks
       every minute until there are 4 blocks left in the cache.
    * Inject logging level and the timeout of eventually through "ldflags"
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 .github/workflows/load.yml                         |  57 +++++++
 Makefile                                           |   7 +-
 banyand/liaison/grpc/grpc_suite_test.go            |   3 +-
 banyand/measure/measure_suite_test.go              |   3 +-
 banyand/measure/measure_topn.go                    |   4 +-
 banyand/measure/measure_write.go                   |   6 +-
 banyand/metadata/metadata_test.go                  |   3 +-
 banyand/metadata/schema/schema_suite_test.go       |   3 +-
 banyand/query/processor_topn.go                    |   4 +-
 banyand/stream/stream_suite_test.go                |   3 +-
 banyand/stream/stream_write.go                     |   7 +-
 banyand/tsdb/block.go                              |  81 ++++++----
 banyand/tsdb/bucket/bucket_suite_test.go           |   3 +-
 banyand/tsdb/bucket/queue.go                       | 163 +++++++++++++++++----
 banyand/tsdb/bucket/queue_test.go                  |  83 ++++++++++-
 banyand/tsdb/bucket/strategy.go                    |   4 +-
 banyand/tsdb/indexdb.go                            |  14 +-
 banyand/tsdb/retention.go                          |  24 +--
 banyand/tsdb/segment.go                            |  94 ++++++++----
 banyand/tsdb/series.go                             |  20 +--
 banyand/tsdb/seriesdb.go                           |  22 +--
 banyand/tsdb/seriesdb_test.go                      |   5 +-
 banyand/tsdb/shard.go                              |  53 ++++---
 banyand/tsdb/shard_test.go                         |  53 +++----
 banyand/tsdb/tsdb.go                               |  11 +-
 banyand/tsdb/tsdb_suite_test.go                    |   6 +-
 banyand/tsdb/tsdb_test.go                          |   3 +-
 bydbctl/internal/cmd/cmd_suite_test.go             |   3 +-
 docs/crud/measure/query.md                         |   2 +
 docs/crud/stream/query.md                          |   2 +
 pkg/index/inverted/inverted_test.go                |   3 +-
 pkg/index/lsm/lsm_test.go                          |   3 +-
 pkg/query/logical/common.go                        |  22 +--
 .../measure/measure_plan_indexscan_local.go        |  10 +-
 .../logical/stream/stream_plan_indexscan_global.go |   5 +-
 .../logical/stream/stream_plan_indexscan_local.go  |  10 +-
 .../test/flags/flags.go                            |  32 ++--
 test/integration/cold_query/query_suite_test.go    |   3 +-
 .../load_suite_test.go}                            |  50 +++----
 test/integration/other/other_suite_test.go         |   3 +-
 test/integration/query/query_suite_test.go         |   3 +-
 41 files changed, 630 insertions(+), 260 deletions(-)

diff --git a/.github/workflows/load.yml b/.github/workflows/load.yml
new file mode 100644
index 0000000..d3682d4
--- /dev/null
+++ b/.github/workflows/load.yml
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Continuous Integration
+
+on:
+  schedule:
+    - cron: '0 20 * * *'
+
+jobs:
+  test:
+    name: Load several days data
+    runs-on: ubuntu-20.04
+    strategy:
+      matrix:
+        tz: ["UTC", "Asia/Shanghai", "America/Los_Angeles"]
+    steps:
+      - name: Set timezone
+        run: sudo timedatectl set-timezone ${{ matrix.tz }}
+      - uses: actions/setup-node@v3
+        with:
+          node-version: 16.15
+      - name: Install Go
+        uses: actions/setup-go@v2
+        with:
+          go-version: 1.19
+      - name: Check out code into the Go module directory
+        uses: actions/checkout@v2
+      - uses: actions/cache@v3
+        id: cache-go
+        with:
+          path: |
+            ~/.cache/go-build
+            ~/go/pkg/mod
+          key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
+          restore-keys: |
+            ${{ runner.os }}-go-
+      - name: Update dependencies 
+        if: steps.cache-go.outputs.cache-hit != 'true'
+        run: GOPROXY=https://proxy.golang.org go mod download
+      - name: Generate mocks
+        run: make generate
+      - name: Test
+        run: TEST_EXTRA_OPTS="--label-filter slow" make -C test test
diff --git a/Makefile b/Makefile
index 37eee1c..099a0b8 100644
--- a/Makefile
+++ b/Makefile
@@ -65,7 +65,12 @@ test-coverage: default ## Run the unit tests in all projects with coverage analy
 include scripts/build/ginkgo.mk
 
 test-ci: $(GINKGO) ## Run the unit tests in CI
-	$(GINKGO) --race --cover --covermode atomic --coverprofile=coverage.out ./... 
+	$(GINKGO) --race \
+	  -ldflags \
+	  "-X github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=warn" \
+	  --cover --covermode atomic --coverprofile=coverage.out \
+	  --label-filter !slow \
+	  ./... 
 
 ##@ Code quality targets
 
diff --git a/banyand/liaison/grpc/grpc_suite_test.go b/banyand/liaison/grpc/grpc_suite_test.go
index ef85011..352574a 100644
--- a/banyand/liaison/grpc/grpc_suite_test.go
+++ b/banyand/liaison/grpc/grpc_suite_test.go
@@ -24,6 +24,7 @@ import (
 	. "github.com/onsi/gomega"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestGrpc(t *testing.T) {
@@ -34,6 +35,6 @@ func TestGrpc(t *testing.T) {
 var _ = BeforeSuite(func() {
 	Expect(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	})).Should(Succeed())
 })
diff --git a/banyand/measure/measure_suite_test.go b/banyand/measure/measure_suite_test.go
index 3f295d8..9f6cc38 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -33,6 +33,7 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 	testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
 )
 
@@ -45,7 +46,7 @@ func TestMeasure(t *testing.T) {
 var _ = ginkgo.BeforeSuite(func() {
 	gomega.Expect(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	})).To(gomega.Succeed())
 })
 
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 29295c3..6a969f1 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -166,7 +166,9 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
 	if err != nil {
 		return err
 	}
-	span, err := series.Create(eventTime)
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	span, err := series.Create(ctx, eventTime)
 	if err != nil {
 		if span != nil {
 			_ = span.Close()
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index c8a9aef..7cd8224 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -19,6 +19,8 @@ package measure
 
 import (
 	"bytes"
+	"context"
+	"time"
 
 	"github.com/pkg/errors"
 
@@ -75,7 +77,9 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
 	if err != nil {
 		return err
 	}
-	wp, err := series.Create(t)
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	wp, err := series.Create(ctx, t)
 	if err != nil {
 		if wp != nil {
 			_ = wp.Close()
diff --git a/banyand/metadata/metadata_test.go b/banyand/metadata/metadata_test.go
index fc3c7e5..682c762 100644
--- a/banyand/metadata/metadata_test.go
+++ b/banyand/metadata/metadata_test.go
@@ -27,6 +27,7 @@ import (
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 	test "github.com/apache/skywalking-banyandb/pkg/test/stream"
 )
 
@@ -37,7 +38,7 @@ func Test_service_RulesBySubject(t *testing.T) {
 	is := assert.New(t)
 	is.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	}))
 	ctx := context.TODO()
 	s, _ := NewService(ctx)
diff --git a/banyand/metadata/schema/schema_suite_test.go b/banyand/metadata/schema/schema_suite_test.go
index aea8ea1..51afd65 100644
--- a/banyand/metadata/schema/schema_suite_test.go
+++ b/banyand/metadata/schema/schema_suite_test.go
@@ -24,6 +24,7 @@ import (
 	. "github.com/onsi/gomega"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestSchema(t *testing.T) {
@@ -35,6 +36,6 @@ func TestSchema(t *testing.T) {
 var _ = ginkgo.BeforeSuite(func() {
 	Expect(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	})).To(Succeed())
 })
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 91f1199..021c7eb 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -196,7 +196,9 @@ func familyIdentity(name string, flag []byte) []byte {
 }
 
 func (t *topNQueryProcessor) scanSeries(series tsdb.Series, request *measurev1.TopNRequest) ([]tsdb.Iterator, error) {
-	seriesSpan, err := series.Span(timestamp.NewInclusiveTimeRange(
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	seriesSpan, err := series.Span(ctx, timestamp.NewInclusiveTimeRange(
 		request.GetTimeRange().GetBegin().AsTime(),
 		request.GetTimeRange().GetEnd().AsTime()),
 	)
diff --git a/banyand/stream/stream_suite_test.go b/banyand/stream/stream_suite_test.go
index 4f827eb..d3177ad 100644
--- a/banyand/stream/stream_suite_test.go
+++ b/banyand/stream/stream_suite_test.go
@@ -32,6 +32,7 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 	teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
 )
 
@@ -44,7 +45,7 @@ func TestStream(t *testing.T) {
 var _ = BeforeSuite(func() {
 	Expect(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	})).To(Succeed())
 })
 
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 5dac816..af03580 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -18,6 +18,9 @@
 package stream
 
 import (
+	"context"
+	"time"
+
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/promauto"
@@ -87,7 +90,9 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
 		return err
 	}
 	t := timestamp.MToN(tp)
-	wp, err := series.Create(t)
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	wp, err := series.Create(ctx, t)
 	if err != nil {
 		if wp != nil {
 			_ = wp.Close()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 3bc87ea..8d050f5 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -30,6 +30,7 @@ import (
 	"time"
 
 	"github.com/pkg/errors"
+	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
@@ -49,8 +50,11 @@ const (
 	componentSecondLSMIdx      = "lsm"
 
 	defaultMainMemorySize = 8 << 20
+	defaultEnqueueTimeout = 500 * time.Millisecond
 )
 
+var ErrBlockClosingInterrupted = errors.New("interrupt to close the block")
+
 type block struct {
 	path       string
 	l          *logger.Logger
@@ -106,13 +110,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 		deleted:   &atomic.Bool{},
 		queue:     opts.queue,
 	}
+	b.closed.Store(true)
 	b.options(ctx)
 	position := ctx.Value(common.PositionKey)
 	if position != nil {
 		b.position = position.(common.Position)
 	}
 
-	return b, b.open()
+	return b, nil
 }
 
 func (b *block) options(ctx context.Context) {
@@ -187,24 +192,28 @@ func (b *block) open() (err error) {
 	return nil
 }
 
-func (b *block) delegate() (BlockDelegate, error) {
+func (b *block) delegate(ctx context.Context) (BlockDelegate, error) {
 	if b.deleted.Load() {
-		return nil, errors.WithMessagef(ErrBlockAbsent, "block %d is deleted", b.blockID)
+		return nil, errors.WithMessagef(ErrBlockAbsent, "block %s is deleted", b)
+	}
+	blockID := BlockID{
+		BlockID: b.blockID,
+		SegID:   b.segID,
 	}
 	if b.incRef() {
+		b.queue.Touch(blockID)
 		return &bDelegate{
 			delegate: b,
 		}, nil
 	}
 	b.lock.Lock()
 	defer b.lock.Unlock()
-	b.queue.Push(BlockID{
-		BlockID: b.blockID,
-		SegID:   b.segID,
-	})
-	// TODO: remove the block which fails to open from the queue
-	err := b.open()
-	if err != nil {
+	if err := b.queue.Push(ctx, blockID, func() error {
+		if !b.Closed() {
+			return nil
+		}
+		return b.open()
+	}); err != nil {
 		b.l.Error().Err(err).Stringer("block", b).Msg("fail to open block")
 		return nil, err
 	}
@@ -240,14 +249,23 @@ loop:
 	goto loop
 }
 
-func (b *block) waitDone() {
-loop:
-	if b.ref.Load() < 1 {
-		b.ref.Store(0)
-		return
-	}
-	runtime.Gosched()
-	goto loop
+func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} {
+	ch := make(chan struct{})
+	go func() {
+	loop:
+		if b.ref.Load() < 1 {
+			b.ref.Store(0)
+			close(ch)
+			return
+		}
+		if stopped.Load() {
+			close(ch)
+			return
+		}
+		runtime.Gosched()
+		goto loop
+	}()
+	return ch
 }
 
 func (b *block) flush() {
@@ -261,28 +279,37 @@ func (b *block) flush() {
 	}
 }
 
-func (b *block) close() {
+func (b *block) close(ctx context.Context) (err error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()
 	if b.closed.Load() {
-		return
+		return nil
 	}
 	b.closed.Store(true)
-	b.waitDone()
+	stopWaiting := &atomic.Bool{}
+	ch := b.waitDone(stopWaiting)
+	select {
+	case <-ctx.Done():
+		b.closed.Store(false)
+		stopWaiting.Store(true)
+		return errors.Wrapf(ErrBlockClosingInterrupted, "block:%s", b)
+	case <-ch:
+	}
 	for _, closer := range b.closableLst {
-		_ = closer.Close()
+		err = multierr.Append(err, closer.Close())
 	}
 	close(b.stopCh)
+	return err
 }
 
-func (b *block) stopThenClose() {
+func (b *block) stopThenClose(ctx context.Context) error {
 	if b.Reporter != nil {
 		b.Stop()
 	}
-	b.close()
+	return b.close(ctx)
 }
 
-func (b *block) delete() error {
+func (b *block) delete(ctx context.Context) error {
 	if b.deleted.Load() {
 		return nil
 	}
@@ -290,7 +317,7 @@ func (b *block) delete() error {
 	if b.Reporter != nil {
 		b.Stop()
 	}
-	b.close()
+	b.close(ctx)
 	return os.RemoveAll(b.path)
 }
 
@@ -299,7 +326,7 @@ func (b *block) Closed() bool {
 }
 
 func (b *block) String() string {
-	return fmt.Sprintf("BlockID-%d-%d", b.segID, b.blockID)
+	return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.segID), parseSuffix(b.blockID))
 }
 
 func (b *block) stats() (names []string, stats []observability.Statistics) {
diff --git a/banyand/tsdb/bucket/bucket_suite_test.go b/banyand/tsdb/bucket/bucket_suite_test.go
index 59ca1fe..289cd7d 100644
--- a/banyand/tsdb/bucket/bucket_suite_test.go
+++ b/banyand/tsdb/bucket/bucket_suite_test.go
@@ -23,6 +23,7 @@ import (
 	. "github.com/onsi/gomega"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestBucket(t *testing.T) {
@@ -30,7 +31,7 @@ func TestBucket(t *testing.T) {
 	BeforeSuite(func() {
 		Expect(logger.Init(logger.Logging{
 			Env:   "dev",
-			Level: "warn",
+			Level: flags.LogLevel,
 		})).Should(Succeed())
 	})
 	RunSpecs(t, "Bucket Suite")
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index 2b3dced..0bcf49a 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -17,29 +17,45 @@
 package bucket
 
 import (
+	"context"
 	"errors"
+	"fmt"
+	"io"
 	"sync"
+	"time"
 
 	"github.com/hashicorp/golang-lru/simplelru"
+	"github.com/robfig/cron/v3"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-type EvictFn func(id interface{})
+type (
+	EvictFn       func(ctx context.Context, id interface{}) error
+	OnAddRecentFn func() error
+)
 
 type Queue interface {
-	Push(id interface{})
-	Remove(id interface{})
+	io.Closer
+	Touch(id fmt.Stringer) bool
+	Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error
+	Remove(id fmt.Stringer)
 	Len() int
+	Volume() int
 	All() []interface{}
 }
 
 const (
-	DefaultRecentRatio  = 0.25
-	DefaultGhostEntries = 0.50
+	DefaultRecentRatio = 0.25
+
+	defaultEvictBatchSize = 10
 )
 
 var ErrInvalidSize = errors.New("invalid size")
 
 type lruQueue struct {
+	l          *logger.Logger
 	size       int
 	recentSize int
 	evictSize  int
@@ -49,15 +65,17 @@ type lruQueue struct {
 	frequent    simplelru.LRUCache
 	recentEvict simplelru.LRUCache
 	lock        sync.RWMutex
+
+	stopCh chan struct{}
 }
 
-func NewQueue(size int, evictFn EvictFn) (Queue, error) {
+func NewQueue(logger *logger.Logger, size int, maxSize int, clock timestamp.Clock, evictFn EvictFn) (Queue, error) {
 	if size <= 0 {
 		return nil, ErrInvalidSize
 	}
 
 	recentSize := int(float64(size) * DefaultRecentRatio)
-	evictSize := int(float64(size) * DefaultGhostEntries)
+	evictSize := maxSize - size
 
 	recent, err := simplelru.NewLRU(size, nil)
 	if err != nil {
@@ -79,37 +97,108 @@ func NewQueue(size int, evictFn EvictFn) (Queue, error) {
 		recentEvict: recentEvict,
 		evictSize:   evictSize,
 		evictFn:     evictFn,
+		l:           logger,
+		stopCh:      make(chan struct{}),
 	}
+	parser := cron.NewParser(cron.Second)
+	// every 60 seconds to clean up recentEvict
+	scheduler, err := parser.Parse("59")
+	if err != nil {
+		return nil, err
+	}
+	go func() {
+		now := clock.Now()
+		for {
+			next := scheduler.Next(now)
+			timer := clock.Timer(next.Sub(now))
+			select {
+			case now = <-timer.C:
+				c.l.Info().Time("now", now).Msg("wakes")
+				var evictLen int
+				c.lock.RLock()
+				evictLen = c.recentEvict.Len()
+				c.lock.RUnlock()
+				if evictLen < 1 {
+					continue
+				}
+				for i := 0; i < defaultEvictBatchSize; i++ {
+					c.lock.Lock()
+					if evictLen < 1 {
+						continue
+					}
+					ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+					if err := c.removeOldest(ctx, c.recentEvict); err != nil {
+						c.l.Error().Err(err).Msg("failed to remove oldest blocks")
+					}
+					cancel()
+					c.lock.Unlock()
+				}
+			case <-c.stopCh:
+				c.l.Info().Msg("stop")
+				timer.Stop()
+				return
+			}
+		}
+	}()
 	return c, nil
 }
 
-func (q *lruQueue) Push(id interface{}) {
+func (q *lruQueue) Touch(id fmt.Stringer) bool {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
 	if q.frequent.Contains(id) {
+		q.l.Debug().Stringer("id", id).Msg("get from frequent")
+		return true
+	}
+
+	if q.recent.Contains(id) {
+		q.l.Debug().Stringer("id", id).Msg("promote from recent to frequent")
+		q.recent.Remove(id)
 		q.frequent.Add(id, nil)
-		return
+		return true
+	}
+	return false
+}
+
+func (q *lruQueue) Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+
+	if q.frequent.Contains(id) {
+		q.l.Debug().Stringer("id", id).Msg("push to frequent")
+		q.frequent.Add(id, nil)
+		return nil
 	}
 
 	if q.recent.Contains(id) {
+		q.l.Debug().Stringer("id", id).Msg("promote from recent to frequent")
 		q.recent.Remove(id)
 		q.frequent.Add(id, nil)
-		return
+		return nil
 	}
 
 	if q.recentEvict.Contains(id) {
-		q.ensureSpace(true)
+		q.l.Debug().Stringer("id", id).Msg("restore from recentEvict")
+		if err := q.ensureSpace(ctx, true); err != nil {
+			return err
+		}
 		q.recentEvict.Remove(id)
 		q.frequent.Add(id, nil)
-		return
+		return nil
 	}
 
-	q.ensureSpace(false)
+	if err := q.ensureSpace(ctx, false); err != nil {
+		return err
+	}
 	q.recent.Add(id, nil)
+	if fn == nil {
+		return nil
+	}
+	return fn()
 }
 
-func (q *lruQueue) Remove(id interface{}) {
+func (q *lruQueue) Remove(id fmt.Stringer) {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
@@ -134,6 +223,10 @@ func (q *lruQueue) Len() int {
 	return q.recent.Len() + q.frequent.Len()
 }
 
+func (q *lruQueue) Volume() int {
+	return q.size + q.recentSize + q.evictSize
+}
+
 func (q *lruQueue) All() []interface{} {
 	q.lock.RLock()
 	defer q.lock.RUnlock()
@@ -144,32 +237,50 @@ func (q *lruQueue) All() []interface{} {
 	return all
 }
 
-func (q *lruQueue) ensureSpace(recentEvict bool) {
+func (q *lruQueue) ensureSpace(ctx context.Context, recentEvict bool) error {
 	recentLen := q.recent.Len()
 	freqLen := q.frequent.Len()
 	if recentLen+freqLen < q.size {
-		return
+		return nil
 	}
 	if recentLen > 0 && (recentLen > q.recentSize || (recentLen == q.recentSize && !recentEvict)) {
-		k, _, _ := q.recent.RemoveOldest()
-		q.addLst(q.recentEvict, q.evictSize, k)
-		return
+		k, _, ok := q.recent.GetOldest()
+		if !ok {
+			return errors.New("failed to get oldest from recent queue")
+		}
+		if err := q.addLst(ctx, q.recentEvict, q.evictSize, k); err != nil {
+			return err
+		}
+		q.recent.Remove(k)
+		return nil
 	}
-	q.removeOldest(q.frequent)
+	return q.removeOldest(ctx, q.frequent)
 }
 
-func (q *lruQueue) addLst(lst simplelru.LRUCache, size int, id interface{}) {
+func (q *lruQueue) addLst(ctx context.Context, lst simplelru.LRUCache, size int, id interface{}) error {
 	if lst.Len() < size {
 		lst.Add(id, nil)
-		return
+		return nil
+	}
+	if err := q.removeOldest(ctx, lst); err != nil {
+		return err
 	}
-	q.removeOldest(lst)
 	lst.Add(id, nil)
+	return nil
 }
 
-func (q *lruQueue) removeOldest(lst simplelru.LRUCache) {
-	oldestID, _, ok := lst.RemoveOldest()
+func (q *lruQueue) removeOldest(ctx context.Context, lst simplelru.LRUCache) error {
+	oldestID, _, ok := lst.GetOldest()
 	if ok && q.evictFn != nil {
-		q.evictFn(oldestID)
+		if err := q.evictFn(ctx, oldestID); err != nil {
+			return err
+		}
+		_ = lst.Remove(oldestID)
 	}
+	return nil
+}
+
+func (q *lruQueue) Close() error {
+	close(q.stopCh)
+	return nil
 }
diff --git a/banyand/tsdb/bucket/queue_test.go b/banyand/tsdb/bucket/queue_test.go
index 0eb6fe0..f256358 100644
--- a/banyand/tsdb/bucket/queue_test.go
+++ b/banyand/tsdb/bucket/queue_test.go
@@ -17,11 +17,17 @@
 package bucket_test
 
 import (
+	"context"
+	"strconv"
+	"time"
+
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
 	"github.com/onsi/gomega/gleak"
 
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type queueEntryID struct {
@@ -29,6 +35,10 @@ type queueEntryID struct {
 	second uint16
 }
 
+func (q queueEntryID) String() string {
+	return strconv.Itoa(int(q.first))
+}
+
 func entryID(id uint16) queueEntryID {
 	return queueEntryID{
 		first:  id,
@@ -37,26 +47,87 @@ func entryID(id uint16) queueEntryID {
 }
 
 var _ = Describe("Queue", func() {
+	var evictLst []queueEntryID
+	var l bucket.Queue
+	var clock timestamp.MockClock
 	BeforeEach(func() {
 		goods := gleak.Goroutines()
 		DeferCleanup(func() {
 			Eventually(gleak.Goroutines).ShouldNot(gleak.HaveLeaked(goods))
 		})
-	})
-	It("pushes data", func() {
-		evictLst := make([]queueEntryID, 0)
-		l, err := bucket.NewQueue(128, func(id interface{}) {
+		evictLst = make([]queueEntryID, 0)
+		clock = timestamp.NewMockClock()
+		clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+		var err error
+		l, err = bucket.NewQueue(logger.GetLogger("test"), 128, 192, clock, func(_ context.Context, id interface{}) error {
 			evictLst = append(evictLst, id.(queueEntryID))
+			return nil
 		})
 		Expect(err).ShouldNot(HaveOccurred())
-
+		DeferCleanup(func() {
+			evictLst = evictLst[:0]
+			Expect(l.Close()).To(Succeed())
+		})
+	})
+	It("pushes to recent", func() {
+		enRecentSize := 0
 		for i := 0; i < 256; i++ {
-			l.Push(entryID(uint16(i)))
+			Expect(l.Push(context.Background(), entryID(uint16(i)), func() error {
+				enRecentSize++
+				return nil
+			})).To(Succeed())
 		}
+		Expect(enRecentSize).To(Equal(256))
 		Expect(l.Len()).To(Equal(128))
 		Expect(len(evictLst)).To(Equal(64))
 		for i := 0; i < 64; i++ {
 			Expect(evictLst[i]).To(Equal(entryID(uint16(i))))
 		}
 	})
+
+	It("promotes to frequent", func() {
+		enRecentSize := 0
+		for i := 0; i < 128; i++ {
+			Expect(l.Push(context.Background(), entryID(uint16(i)), func() error {
+				enRecentSize++
+				return nil
+			})).To(Succeed())
+		}
+		Expect(enRecentSize).To(Equal(128))
+		Expect(l.Len()).To(Equal(128))
+		Expect(len(evictLst)).To(Equal(0))
+		for i := 0; i < 64; i++ {
+			Expect(l.Touch(entryID(uint16(i)))).To(BeTrue())
+		}
+		enRecentSize = 0
+		for i := 128; i < 256; i++ {
+			Expect(l.Push(context.Background(), entryID(uint16(i)), func() error {
+				enRecentSize++
+				return nil
+			})).To(Succeed())
+		}
+		Expect(enRecentSize).To(Equal(128))
+		Expect(l.Len()).To(Equal(128))
+		Expect(len(evictLst)).To(Equal(64))
+		for i := 0; i < 64; i++ {
+			Expect(evictLst[i]).To(Equal(entryID(uint16(i + 64))))
+		}
+	})
+
+	It("cleans up evict queue", func() {
+		enRecentSize := 0
+
+		for i := 0; i < 192; i++ {
+			Expect(l.Push(context.Background(), entryID(uint16(i)), func() error {
+				enRecentSize++
+				return nil
+			})).To(Succeed())
+		}
+		Expect(enRecentSize).To(Equal(192))
+		Expect(l.Len()).To(Equal(128))
+		Expect(len(evictLst)).To(Equal(0))
+		clock.Add(time.Minute)
+		GinkgoWriter.Printf("evicted size:%d \n", len(evictLst))
+		Expect(len(evictLst)).To(BeNumerically(">", 1))
+	})
 })
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 4a8d14e..6bf51f2 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -117,7 +117,7 @@ func (s *Strategy) observe(c Channel) bool {
 		select {
 		case status, more := <-c:
 			if !more {
-				return true
+				return moreBucket
 			}
 			ratio := Ratio(status.Volume) / Ratio(status.Capacity)
 			atomic.StoreUint64(&s.currentRatio, math.Float64bits(float64(ratio)))
@@ -136,7 +136,7 @@ func (s *Strategy) observe(c Channel) bool {
 				if next != nil {
 					s.current.Store(next)
 				}
-				return true
+				return moreBucket
 			}
 		case <-s.stopCh:
 			return false
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index e7ce863..c4281d0 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -28,7 +28,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type IndexDatabase interface {
@@ -97,7 +96,6 @@ type indexWriterBuilder struct {
 	scope        Entry
 	segCtrl      *segmentController
 	ts           time.Time
-	seg          *segment
 	globalItemID *GlobalItemID
 }
 
@@ -108,11 +106,6 @@ func (i *indexWriterBuilder) Scope(scope Entry) IndexWriterBuilder {
 
 func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder {
 	i.ts = ts
-	segs := i.segCtrl.span(timestamp.NewTimeRangeDuration(ts, 0, true, false))
-	if len(segs) != 1 {
-		return i
-	}
-	i.seg = segs[0]
 	return i
 }
 
@@ -122,15 +115,16 @@ func (i *indexWriterBuilder) GlobalItemID(itemID GlobalItemID) IndexWriterBuilde
 }
 
 func (i *indexWriterBuilder) Build() (IndexWriter, error) {
-	if i.seg == nil {
-		return nil, errors.WithStack(ErrNoTime)
+	seg, err := i.segCtrl.create(i.segCtrl.Format(i.ts), false)
+	if err != nil {
+		return nil, err
 	}
 	if i.globalItemID == nil {
 		return nil, errors.WithStack(ErrNoVal)
 	}
 	return &indexWriter{
 		scope:  i.scope,
-		seg:    i.seg,
+		seg:    seg,
 		ts:     i.ts,
 		itemID: i.globalItemID,
 	}, nil
diff --git a/banyand/tsdb/retention.go b/banyand/tsdb/retention.go
index ab61a8d..755bf41 100644
--- a/banyand/tsdb/retention.go
+++ b/banyand/tsdb/retention.go
@@ -18,6 +18,7 @@
 package tsdb
 
 import (
+	"context"
 	"sync"
 	"time"
 
@@ -76,19 +77,18 @@ func (rc *retentionController) run() {
 	for {
 		next := rc.scheduler.Next(now)
 		timer := rc.segment.clock.Timer(next.Sub(now))
-		for {
-			select {
-			case now = <-timer.C:
-				rc.l.Info().Time("now", now).Msg("wake")
-				if err := rc.segment.remove(now.Add(-rc.duration)); err != nil {
-					rc.l.Error().Err(err)
-				}
-			case <-rc.stopCh:
-				timer.Stop()
-				rc.l.Info().Msg("stop")
-				return
+		select {
+		case now = <-timer.C:
+			rc.l.Info().Time("now", now).Msg("wake")
+			ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
+			if err := rc.segment.remove(ctx, now.Add(-rc.duration)); err != nil {
+				rc.l.Error().Err(err)
 			}
-			break
+			cancel()
+		case <-rc.stopCh:
+			timer.Stop()
+			rc.l.Info().Msg("stop")
+			return
 		}
 	}
 }
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 3393f67..6d08e2f 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -111,10 +111,14 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 	return s, nil
 }
 
-func (s *segment) close() {
-	s.blockController.close()
+func (s *segment) close(ctx context.Context) error {
+	if err := s.blockController.close(ctx); err != nil {
+		return err
+	}
 	if s.globalIndex != nil {
-		s.globalIndex.Close()
+		if err := s.globalIndex.Close(); err != nil {
+			return err
+		}
 	}
 	if s.blockManageStrategy != nil {
 		s.blockManageStrategy.Close()
@@ -122,19 +126,22 @@ func (s *segment) close() {
 	if s.Reporter != nil {
 		s.Stop()
 	}
+	return nil
 }
 
-func (s *segment) closeBlock(id uint16) {
-	s.blockController.closeBlock(id)
+func (s *segment) closeBlock(ctx context.Context, id uint16) error {
+	return s.blockController.closeBlock(ctx, id)
 }
 
-func (s *segment) delete() error {
-	s.close()
+func (s *segment) delete(ctx context.Context) error {
+	if err := s.close(ctx); err != nil {
+		return err
+	}
 	return os.RemoveAll(s.path)
 }
 
 func (s segment) String() string {
-	return fmt.Sprintf("SegID-%d", s.id)
+	return fmt.Sprintf("SegID-%d", parseSuffix(s.id))
 }
 
 func (s *segment) Stats() observability.Statistics {
@@ -177,7 +184,7 @@ func newBlockController(segCtx context.Context, segID uint16, location string, s
 func (bc *blockController) Current() (bucket.Reporter, error) {
 	now := bc.clock.Now()
 	ns := uint64(now.UnixNano())
-	if b := func() bucket.Reporter {
+	if b := func() *block {
 		bc.RLock()
 		defer bc.RUnlock()
 		for _, s := range bc.lst {
@@ -187,9 +194,14 @@ func (bc *blockController) Current() (bucket.Reporter, error) {
 		}
 		return nil
 	}(); b != nil {
+		if b.Closed() {
+			if err := b.open(); err != nil {
+				return nil, err
+			}
+		}
 		return b, nil
 	}
-	return bc.create(now)
+	return bc.newHeadBlock(now)
 }
 
 func (bc *blockController) Next() (bucket.Reporter, error) {
@@ -198,17 +210,34 @@ func (bc *blockController) Next() (bucket.Reporter, error) {
 		return nil, err
 	}
 	b := c.(*block)
-	return bc.create(bc.blockSize.NextTime(b.Start))
+
+	return bc.newHeadBlock(bc.blockSize.NextTime(b.Start))
+}
+
+func (bc *blockController) newHeadBlock(now time.Time) (*block, error) {
+	b, err := bc.create(now)
+	if err != nil {
+		return nil, err
+	}
+	return b, nil
 }
 
 func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
 	event := bc.l.Info()
 	if prev != nil {
 		event.Stringer("prev", prev)
-		bc.blockQueue.Push(BlockID{
+		b := prev.(*block)
+		ctx, cancel := context.WithTimeout(context.Background(), defaultEnqueueTimeout)
+		defer cancel()
+		if err := bc.blockQueue.Push(ctx, BlockID{
 			SegID:   bc.segID,
-			BlockID: prev.(*block).blockID,
-		})
+			BlockID: b.blockID,
+		}, nil); err != nil {
+			bc.l.Debug().Err(err).Msg("failed to push a expired head block to the queue")
+			ctxClosing, cancelClosing := context.WithTimeout(context.Background(), defaultEnqueueTimeout)
+			defer cancelClosing()
+			b.close(ctxClosing)
+		}
 	}
 	if next != nil {
 		event.Stringer("next", next)
@@ -236,7 +265,7 @@ func (bc *blockController) Parse(value string) (time.Time, error) {
 	panic("invalid interval unit")
 }
 
-func (bc *blockController) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error) {
+func (bc *blockController) span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error) {
 	bb := bc.search(func(b *block) bool {
 		return b.Overlapping(timeRange)
 	})
@@ -245,7 +274,7 @@ func (bc *blockController) span(timeRange timestamp.TimeRange) ([]BlockDelegate,
 	}
 	dd := make([]BlockDelegate, len(bb))
 	for i, b := range bb {
-		d, err := b.delegate()
+		d, err := b.delegate(ctx)
 		if err != nil {
 			return nil, err
 		}
@@ -254,10 +283,10 @@ func (bc *blockController) span(timeRange timestamp.TimeRange) ([]BlockDelegate,
 	return dd, nil
 }
 
-func (bc *blockController) get(blockID uint16) (BlockDelegate, error) {
+func (bc *blockController) get(ctx context.Context, blockID uint16) (BlockDelegate, error) {
 	b := bc.getBlock(blockID)
 	if b != nil {
-		return b.delegate()
+		return b.delegate(ctx)
 	}
 	return nil, nil
 }
@@ -297,14 +326,14 @@ func (bc *blockController) search(matcher func(*block) bool) (bb []*block) {
 	return bb
 }
 
-func (bc *blockController) closeBlock(blockID uint16) {
+func (bc *blockController) closeBlock(ctx context.Context, blockID uint16) error {
 	bc.RLock()
-	defer bc.RUnlock()
 	b := bc.getBlock(blockID)
+	bc.RUnlock()
 	if b == nil {
-		return
+		return nil
 	}
-	b.close()
+	return b.close(ctx)
 }
 
 func (bc *blockController) startTime(suffix string) (time.Time, error) {
@@ -327,7 +356,7 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) {
 func (bc *blockController) open() error {
 	return WalkDir(
 		bc.location,
-		segPathPrefix,
+		blockPathPrefix,
 		func(suffix, absolutePath string) error {
 			bc.Lock()
 			defer bc.Unlock()
@@ -355,7 +384,15 @@ func (bc *blockController) create(startTime time.Time) (*block, error) {
 	if err != nil {
 		return nil, err
 	}
-	return bc.load(suffix, segPath)
+	b, err := bc.load(suffix, segPath)
+	if err != nil {
+		return nil, err
+	}
+	err = b.open()
+	if err != nil {
+		return nil, err
+	}
+	return b, nil
 }
 
 func (bc *blockController) load(suffix, path string) (b *block, err error) {
@@ -393,17 +430,18 @@ func (bc *blockController) sortLst() {
 	})
 }
 
-func (bc *blockController) close() {
+func (bc *blockController) close(ctx context.Context) (err error) {
 	for _, s := range bc.lst {
-		s.stopThenClose()
+		err = multierr.Append(err, s.stopThenClose(ctx))
 	}
+	return err
 }
 
-func (bc *blockController) remove(deadline time.Time) (err error) {
+func (bc *blockController) remove(ctx context.Context, deadline time.Time) (err error) {
 	for _, b := range bc.blocks() {
 		if b.End.Before(deadline) {
 			bc.Lock()
-			if errDel := b.delete(); errDel != nil {
+			if errDel := b.delete(ctx); errDel != nil {
 				err = multierr.Append(err, errDel)
 			} else {
 				b.queue.Remove(BlockID{
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index c0956de..09d62aa 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -75,9 +75,9 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
 
 type Series interface {
 	ID() common.SeriesID
-	Span(timeRange timestamp.TimeRange) (SeriesSpan, error)
-	Create(t time.Time) (SeriesSpan, error)
-	Get(id GlobalItemID) (Item, io.Closer, error)
+	Span(ctx context.Context, timeRange timestamp.TimeRange) (SeriesSpan, error)
+	Create(ctx context.Context, t time.Time) (SeriesSpan, error)
+	Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, error)
 }
 
 type SeriesSpan interface {
@@ -95,8 +95,8 @@ type series struct {
 	l       *logger.Logger
 }
 
-func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
-	b, err := s.blockDB.block(id)
+func (s *series) Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, error) {
+	b, err := s.blockDB.block(ctx, id)
 	if err != nil {
 		return nil, nil, err
 	}
@@ -114,8 +114,8 @@ func (s *series) ID() common.SeriesID {
 	return s.id
 }
 
-func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) {
-	blocks, err := s.blockDB.span(timeRange)
+func (s *series) Span(ctx context.Context, timeRange timestamp.TimeRange) (SeriesSpan, error) {
+	blocks, err := s.blockDB.span(ctx, timeRange)
 	if err != nil {
 		return nil, err
 	}
@@ -128,9 +128,9 @@ func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) {
 	return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil
 }
 
-func (s *series) Create(t time.Time) (SeriesSpan, error) {
+func (s *series) Create(ctx context.Context, t time.Time) (SeriesSpan, error) {
 	tr := timestamp.NewInclusiveTimeRange(t, t)
-	blocks, err := s.blockDB.span(tr)
+	blocks, err := s.blockDB.span(ctx, tr)
 	if err != nil {
 		return nil, err
 	}
@@ -140,7 +140,7 @@ func (s *series) Create(t time.Time) (SeriesSpan, error) {
 			Msg("load a series span")
 		return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil
 	}
-	b, err := s.blockDB.create(t)
+	b, err := s.blockDB.create(ctx, t)
 	if err != nil {
 		return nil, err
 	}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 7825338..b878b38 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -148,9 +148,9 @@ type SeriesDatabase interface {
 
 type blockDatabase interface {
 	shardID() common.ShardID
-	span(timeRange timestamp.TimeRange) ([]BlockDelegate, error)
-	create(ts time.Time) (BlockDelegate, error)
-	block(id GlobalItemID) (BlockDelegate, error)
+	span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error)
+	create(ctx context.Context, ts time.Time) (BlockDelegate, error)
+	block(ctx context.Context, id GlobalItemID) (BlockDelegate, error)
 }
 
 var (
@@ -189,12 +189,12 @@ func (s *seriesDB) GetByID(id common.SeriesID) (Series, error) {
 	return newSeries(s.context(), id, s), nil
 }
 
-func (s *seriesDB) block(id GlobalItemID) (BlockDelegate, error) {
+func (s *seriesDB) block(ctx context.Context, id GlobalItemID) (BlockDelegate, error) {
 	seg := s.segCtrl.get(id.segID)
 	if seg == nil {
 		return nil, nil
 	}
-	return seg.blockController.get(id.blockID)
+	return seg.blockController.get(ctx, id.blockID)
 }
 
 func (s *seriesDB) shardID() common.ShardID {
@@ -251,11 +251,11 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 	return result, err
 }
 
-func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error) {
+func (s *seriesDB) span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error) {
 	// TODO: return correct blocks
 	result := make([]BlockDelegate, 0)
 	for _, s := range s.segCtrl.span(timeRange) {
-		dd, err := s.blockController.span(timeRange)
+		dd, err := s.blockController.span(ctx, timeRange)
 		if err != nil {
 			return nil, err
 		}
@@ -267,14 +267,14 @@ func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error)
 	return result, nil
 }
 
-func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) {
+func (s *seriesDB) create(ctx context.Context, ts time.Time) (BlockDelegate, error) {
 	s.Lock()
 	defer s.Unlock()
 	timeRange := timestamp.NewInclusiveTimeRange(ts, ts)
 	ss := s.segCtrl.span(timeRange)
 	if len(ss) > 0 {
 		s := ss[0]
-		dd, err := s.blockController.span(timeRange)
+		dd, err := s.blockController.span(ctx, timeRange)
 		if err != nil {
 			return nil, err
 		}
@@ -285,7 +285,7 @@ func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) {
 		if err != nil {
 			return nil, err
 		}
-		return block.delegate()
+		return block.delegate(ctx)
 	}
 	seg, err := s.segCtrl.create(s.segCtrl.Format(timeRange.Start), false)
 	if err != nil {
@@ -295,7 +295,7 @@ func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) {
 	if err != nil {
 		return nil, err
 	}
-	return block.delegate()
+	return block.delegate(ctx)
 }
 
 func (s *seriesDB) context() context.Context {
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index f5b1875..3c4623c 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -30,6 +30,7 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestEntity(t *testing.T) {
@@ -355,7 +356,7 @@ func Test_SeriesDatabase_Get(t *testing.T) {
 	tester := assert.New(t)
 	tester.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	}))
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -376,7 +377,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
 	tester := assert.New(t)
 	tester.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	}))
 	dir, deferFunc := test.Space(require.New(t))
 	defer deferFunc()
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 277ee03..b77a232 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -34,8 +34,9 @@ import (
 )
 
 const (
-	defaultBlockQueueSize = 1 << 4
-	defaultKVMemorySize   = 1 << 20
+	defaultBlockQueueSize    = 4
+	defaultMaxBlockQueueSize = 64
+	defaultKVMemorySize      = 1 << 20
 )
 
 var _ Shard = (*shard)(nil)
@@ -54,7 +55,7 @@ type shard struct {
 }
 
 func OpenShard(ctx context.Context, id common.ShardID,
-	root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize int,
+	root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize, maxOpenedBlockSize int,
 ) (Shard, error) {
 	path, err := mkdir(shardTemplate, root, int(id))
 	if err != nil {
@@ -70,7 +71,7 @@ func OpenShard(ctx context.Context, id common.ShardID,
 		p.Shard = strconv.Itoa(int(id))
 		return p
 	})
-	sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, l)
+	sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, maxOpenedBlockSize, l)
 	if err != nil {
 		return nil, errors.Wrapf(err, "create the segment controller of the shard %d", int(id))
 	}
@@ -164,8 +165,9 @@ func (s *shard) State() (shardState ShardState) {
 func (s *shard) Close() error {
 	s.retentionController.stop()
 	s.segmentManageStrategy.Close()
-	s.segmentController.close()
-	err := s.seriesDatabase.Close()
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	err := multierr.Combine(s.segmentController.close(ctx), s.seriesDatabase.Close())
 	close(s.stopCh)
 	return err
 }
@@ -236,7 +238,7 @@ type segmentController struct {
 }
 
 func newSegmentController(shardCtx context.Context, location string, segmentSize, blockSize IntervalRule,
-	openedBlockSize int, l *logger.Logger,
+	openedBlockSize, maxOpenedBlockSize int, l *logger.Logger,
 ) (*segmentController, error) {
 	clock, _ := timestamp.GetClock(shardCtx)
 	sc := &segmentController{
@@ -248,16 +250,21 @@ func newSegmentController(shardCtx context.Context, location string, segmentSize
 		clock:       clock,
 	}
 	var err error
-	sc.blockQueue, err = bucket.NewQueue(openedBlockSize, func(id interface{}) {
-		bsID := id.(BlockID)
-		seg := sc.get(bsID.SegID)
-		if seg == nil {
-			l.Warn().Uint16("segID", bsID.SegID).Msg("segment is absent")
-			return
-		}
-		l.Info().Uint16("blockID", bsID.BlockID).Uint16("segID", bsID.SegID).Msg("closing the block")
-		seg.closeBlock(bsID.BlockID)
-	})
+	sc.blockQueue, err = bucket.NewQueue(
+		l.Named("block-queue"),
+		openedBlockSize,
+		maxOpenedBlockSize,
+		clock,
+		func(ctx context.Context, id interface{}) error {
+			bsID := id.(BlockID)
+			seg := sc.get(bsID.SegID)
+			if seg == nil {
+				l.Warn().Int("segID", parseSuffix(bsID.SegID)).Msg("segment is absent")
+				return nil
+			}
+			l.Info().Uint16("blockID", bsID.BlockID).Int("segID", parseSuffix(bsID.SegID)).Msg("closing the block")
+			return seg.closeBlock(ctx, bsID.BlockID)
+		})
 	return sc, err
 }
 
@@ -409,14 +416,14 @@ func (sc *segmentController) load(suffix, path string, createBlockIfEmpty bool)
 	return seg, nil
 }
 
-func (sc *segmentController) remove(deadline time.Time) (err error) {
+func (sc *segmentController) remove(ctx context.Context, deadline time.Time) (err error) {
 	sc.l.Info().Time("deadline", deadline).Msg("start to remove before deadline")
 	for _, s := range sc.segments() {
 		if s.End.Before(deadline) || s.Contains(uint64(deadline.UnixNano())) {
-			err = multierr.Append(err, s.blockController.remove(deadline))
+			err = multierr.Append(err, s.blockController.remove(ctx, deadline))
 			if s.End.Before(deadline) {
 				sc.Lock()
-				if errDel := s.delete(); errDel != nil {
+				if errDel := s.delete(ctx); errDel != nil {
 					err = multierr.Append(err, errDel)
 				} else {
 					sc.removeSeg(s.id)
@@ -437,8 +444,10 @@ func (sc *segmentController) removeSeg(segID uint16) {
 	}
 }
 
-func (sc *segmentController) close() {
+func (sc *segmentController) close(ctx context.Context) (err error) {
 	for _, s := range sc.lst {
-		s.close()
+		err = multierr.Append(err, s.close(ctx))
 	}
+	err = multierr.Append(err, sc.blockQueue.Close())
+	return err
 }
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 1e1197e..6b66ea4 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -70,13 +71,14 @@ var _ = Describe("Shard", func() {
 					Num:  7,
 				},
 				2,
+				3,
 			)
 			Expect(err).NotTo(HaveOccurred())
 			By("01/01 00:00 1st block is opened")
 			t1 := clock.Now()
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -93,7 +95,7 @@ var _ = Describe("Shard", func() {
 					GinkgoWriter.Println("01/01 10:00 has been triggered")
 				}
 				return shard.State().Blocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -111,7 +113,7 @@ var _ = Describe("Shard", func() {
 			}))
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
 			By("01/01 13:00 moves to the 2nd block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
@@ -119,7 +121,7 @@ var _ = Describe("Shard", func() {
 					GinkgoWriter.Println("01/01 13:00 has been triggered")
 				}
 				return shard.State().OpenBlocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -133,7 +135,7 @@ var _ = Describe("Shard", func() {
 					GinkgoWriter.Println("01/01 22:00 has been triggered")
 				}
 				return shard.State().Blocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -163,7 +165,7 @@ var _ = Describe("Shard", func() {
 					GinkgoWriter.Println("01/02 01:00 has been triggered")
 				}
 				return shard.State().OpenBlocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -181,7 +183,7 @@ var _ = Describe("Shard", func() {
 					GinkgoWriter.Println("01/02 10:00 has been triggered")
 				}
 				return shard.State().Blocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -218,7 +220,7 @@ var _ = Describe("Shard", func() {
 					GinkgoWriter.Println("01/02 13:00 has been triggered")
 				}
 				return shard.State().OpenBlocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -240,7 +242,7 @@ var _ = Describe("Shard", func() {
 					GinkgoWriter.Println("01/02 22:00 has been triggered")
 				}
 				return shard.State().Blocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -284,11 +286,7 @@ var _ = Describe("Shard", func() {
 					GinkgoWriter.Println("01/03 01:00 has been triggered")
 				}
 				return shard.State().OpenBlocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
-				{
-					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
-					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
-				},
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -300,7 +298,7 @@ var _ = Describe("Shard", func() {
 			}))
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -315,6 +313,7 @@ var _ = Describe("Shard", func() {
 						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
 					},
 					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+					Closed:    true,
 				},
 				{
 					ID: tsdb.BlockID{
@@ -342,7 +341,7 @@ var _ = Describe("Shard", func() {
 			series, err := shard.Series().GetByID(common.SeriesID(11))
 			Expect(err).NotTo(HaveOccurred())
 			t1Range := timestamp.NewInclusiveTimeRangeDuration(t1, 1*time.Hour)
-			span, err := series.Span(t1Range)
+			span, err := series.Span(context.Background(), t1Range)
 			Expect(err).NotTo(HaveOccurred())
 			defer span.Close()
 			writer, err := span.WriterBuilder().Family([]byte("test"), []byte("test")).Time(t1Range.End).Build()
@@ -351,14 +350,14 @@ var _ = Describe("Shard", func() {
 			Expect(err).NotTo(HaveOccurred())
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
 				},
 				{
-					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
-					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
 				},
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
@@ -367,7 +366,7 @@ var _ = Describe("Shard", func() {
 			}))
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -381,6 +380,7 @@ var _ = Describe("Shard", func() {
 						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
 					},
 					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+					Closed:    true,
 				},
 				{
 					ID: tsdb.BlockID{
@@ -388,7 +388,6 @@ var _ = Describe("Shard", func() {
 						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
 					},
 					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
-					Closed:    true,
 				},
 				{
 					ID: tsdb.BlockID{
@@ -422,6 +421,7 @@ var _ = Describe("Shard", func() {
 					Num:  1,
 				},
 				10,
+				15,
 			)
 			Expect(err).NotTo(HaveOccurred())
 			By("open 4 blocks")
@@ -451,7 +451,7 @@ var _ = Describe("Shard", func() {
 					GinkgoWriter.Println("01/02 13:00 has been triggered")
 				}
 				return shard.State().Blocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -497,7 +497,7 @@ var _ = Describe("Shard", func() {
 					GinkgoWriter.Println("01/03 01:00 has been triggered")
 				}
 				return shard.State().Blocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
@@ -558,13 +558,14 @@ var _ = Describe("Shard", func() {
 					Num:  7,
 				},
 				2,
+				3,
 			)
 			Expect(err).NotTo(HaveOccurred())
 			By("01/01 00:01 1st block is opened")
 			t1 := clock.Now()
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -581,7 +582,7 @@ var _ = Describe("Shard", func() {
 					GinkgoWriter.Println("01/01 10:00 has been triggered")
 				}
 				return shard.State().Blocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -600,7 +601,7 @@ var _ = Describe("Shard", func() {
 			}))
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
+			}, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
 		})
 	})
 })
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 5710837..42bb4f5 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -107,10 +107,18 @@ type BlockID struct {
 	BlockID uint16
 }
 
+func (b BlockID) String() string {
+	return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.SegID), parseSuffix(b.BlockID))
+}
+
 func GenerateInternalID(unit IntervalUnit, suffix int) uint16 {
 	return uint16(unit)<<12 | ((uint16(suffix) << 4) >> 4)
 }
 
+func parseSuffix(id uint16) int {
+	return int((id << 12) >> 12)
+}
+
 type BlockState struct {
 	ID        BlockID
 	TimeRange timestamp.TimeRange
@@ -211,7 +219,7 @@ func createDatabase(ctx context.Context, db *database, startID int) (Database, e
 	for i := startID; i < int(db.shardNum); i++ {
 		db.logger.Info().Int("shard_id", i).Msg("creating a shard")
 		so, errNewShard := OpenShard(ctx, common.ShardID(i),
-			db.location, db.segmentSize, db.blockSize, db.ttl, defaultBlockQueueSize)
+			db.location, db.segmentSize, db.blockSize, db.ttl, defaultBlockQueueSize, defaultMaxBlockQueueSize)
 		if errNewShard != nil {
 			err = multierr.Append(err, errNewShard)
 			continue
@@ -243,6 +251,7 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) {
 			db.blockSize,
 			db.ttl,
 			defaultBlockQueueSize,
+			defaultMaxBlockQueueSize,
 		)
 		if errOpenShard != nil {
 			return errOpenShard
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index cd11765..52e1157 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -18,16 +18,14 @@ package tsdb_test
 
 import (
 	"testing"
-	"time"
 
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
-var defaultEventuallyTimeout = 30 * time.Second
-
 func TestTsdb(t *testing.T) {
 	RegisterFailHandler(Fail)
 	RunSpecs(t, "Tsdb Suite")
@@ -36,6 +34,6 @@ func TestTsdb(t *testing.T) {
 var _ = BeforeSuite(func() {
 	Expect(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	})).Should(Succeed())
 })
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 684269f..5a482a8 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -30,6 +30,7 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -100,7 +101,7 @@ func verifyDatabaseStructure(tester *assert.Assertions, tempDir string, now time
 func openDatabase(ctx context.Context, t *require.Assertions, path string) (db Database) {
 	t.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	}))
 	db, err := OpenDatabase(
 		context.WithValue(ctx, logger.ContextKey, logger.GetLogger("test")),
diff --git a/bydbctl/internal/cmd/cmd_suite_test.go b/bydbctl/internal/cmd/cmd_suite_test.go
index 6dc56ae..dc563e3 100644
--- a/bydbctl/internal/cmd/cmd_suite_test.go
+++ b/bydbctl/internal/cmd/cmd_suite_test.go
@@ -24,6 +24,7 @@ import (
 	. "github.com/onsi/gomega"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestCmd(t *testing.T) {
@@ -34,6 +35,6 @@ func TestCmd(t *testing.T) {
 var _ = BeforeSuite(func() {
 	Expect(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	})).To(Succeed())
 })
diff --git a/docs/crud/measure/query.md b/docs/crud/measure/query.md
index 5dd8c1d..40639a7 100644
--- a/docs/crud/measure/query.md
+++ b/docs/crud/measure/query.md
@@ -43,6 +43,7 @@ fieldProjection:
 timeRange:
   begin: 2022-10-15T22:32:48Z
   end: 2022-10-15T23:32:48Z
+EOF
 ```
 
 The below command could query data in the last 30 minutes using relative time duration :
@@ -58,6 +59,7 @@ tagProjection:
     tags: ["id", "entity_id"]
 fieldProjection:
   names: ["total", "value"]
+EOF
 ```
 
 ## API Reference
diff --git a/docs/crud/stream/query.md b/docs/crud/stream/query.md
index cc7dea3..620f572 100644
--- a/docs/crud/stream/query.md
+++ b/docs/crud/stream/query.md
@@ -43,6 +43,7 @@ projection:
 timeRange:
   begin: 2022-10-15T22:32:48+08:00
   end: 2022-10-15T23:32:48+08:00
+EOF
 ```
 
 The below command could query data in the last 30 minutes using relative time duration :
@@ -58,6 +59,7 @@ projection:
     tags: ["trace_id"]
   - name: "data"
     tags: ["data_binary"]
+EOF
 ```
 
 ## API Reference
diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go
index 4d889cd..fda5215 100644
--- a/pkg/index/inverted/inverted_test.go
+++ b/pkg/index/inverted/inverted_test.go
@@ -32,6 +32,7 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/index/testcases"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 var serviceName = index.FieldKey{
@@ -173,7 +174,7 @@ func TestStore_Iterator(t *testing.T) {
 func setUp(t *require.Assertions) (tempDir string, deferFunc func()) {
 	t.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	}))
 	tempDir, deferFunc = test.Space(t)
 	return tempDir, deferFunc
diff --git a/pkg/index/lsm/lsm_test.go b/pkg/index/lsm/lsm_test.go
index c0de013..4af5eb5 100644
--- a/pkg/index/lsm/lsm_test.go
+++ b/pkg/index/lsm/lsm_test.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/index/testcases"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestStore_MatchTerm(t *testing.T) {
@@ -63,7 +64,7 @@ func TestStore_Iterator(t *testing.T) {
 func setUp(t *require.Assertions) (tempDir string, deferFunc func()) {
 	t.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	}))
 	tempDir, deferFunc = test.Space(t)
 	return tempDir, deferFunc
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 3537ce7..716be97 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -19,6 +19,9 @@ package logical
 
 import (
 	"bytes"
+	"context"
+	"io"
+	"time"
 
 	"github.com/pkg/errors"
 
@@ -101,16 +104,15 @@ func ProjectItem(ec executor.ExecutionContext, item tsdb.Item, projectionFieldRe
 // This method is used by the underlying tableScan and localIndexScan plans.
 func ExecuteForShard(series tsdb.SeriesList, timeRange timestamp.TimeRange,
 	builders ...SeekerBuilder,
-) ([]tsdb.Iterator, error) {
+) ([]tsdb.Iterator, []io.Closer, error) {
 	var itersInShard []tsdb.Iterator
+	var closers []io.Closer
 	for _, seriesFound := range series {
 		itersInSeries, err := func() ([]tsdb.Iterator, error) {
-			sp, errInner := seriesFound.Span(timeRange)
-			defer func(sp tsdb.SeriesSpan) {
-				if sp != nil {
-					_ = sp.Close()
-				}
-			}(sp)
+			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+			defer cancel()
+			sp, errInner := seriesFound.Span(ctx, timeRange)
+			closers = append(closers, sp)
 			if errInner != nil {
 				return nil, errInner
 			}
@@ -129,16 +131,16 @@ func ExecuteForShard(series tsdb.SeriesList, timeRange timestamp.TimeRange,
 			return iters, nil
 		}()
 		if err != nil {
-			return nil, err
+			return nil, nil, err
 		}
 		if len(itersInSeries) > 0 {
 			itersInShard = append(itersInShard, itersInSeries...)
 		}
 	}
-	return itersInShard, nil
+	return itersInShard, closers, nil
 }
 
-var DefaultLimit uint32 = 100
+var DefaultLimit uint32 = 20
 
 type Tag struct {
 	familyName, name string
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index c9147d4..69df5b3 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -19,6 +19,7 @@ package measure
 
 import (
 	"fmt"
+	"io"
 	"time"
 
 	"go.uber.org/multierr"
@@ -132,7 +133,14 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor.
 			b.Filter(i.filter)
 		})
 	}
-	iters, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...)
+	iters, closers, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...)
+	if len(closers) > 0 {
+		defer func(closers []io.Closer) {
+			for _, c := range closers {
+				_ = c.Close()
+			}
+		}(closers)
+	}
 	if innerErr != nil {
 		return nil, innerErr
 	}
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_global.go b/pkg/query/logical/stream/stream_plan_indexscan_global.go
index 75549e1..549a83d 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_global.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_global.go
@@ -18,6 +18,7 @@
 package stream
 
 import (
+	"context"
 	"fmt"
 	"io"
 	"time"
@@ -96,7 +97,9 @@ func (t *globalIndexScan) executeForShard(ec executor.StreamExecutionContext, sh
 			return elementsInShard, errors.WithStack(err)
 		}
 		err = func() error {
-			item, closer, errInner := series.Get(itemID)
+			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+			defer cancel()
+			item, closer, errInner := series.Get(ctx, itemID)
 			defer func(closer io.Closer) {
 				if closer != nil {
 					_ = closer.Close()
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 6cc4403..b33e4ff 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -19,6 +19,7 @@ package stream
 
 import (
 	"fmt"
+	"io"
 	"time"
 
 	"google.golang.org/protobuf/types/known/timestamppb"
@@ -77,7 +78,14 @@ func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) ([]*streamv
 			b.Filter(i.filter)
 		})
 	}
-	iters, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...)
+	iters, closers, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...)
+	if len(closers) > 0 {
+		defer func(closers []io.Closer) {
+			for _, c := range closers {
+				_ = c.Close()
+			}
+		}(closers)
+	}
 	if innerErr != nil {
 		return nil, innerErr
 	}
diff --git a/banyand/tsdb/bucket/bucket_suite_test.go b/pkg/test/flags/flags.go
similarity index 70%
copy from banyand/tsdb/bucket/bucket_suite_test.go
copy to pkg/test/flags/flags.go
index 59ca1fe..b2f83f5 100644
--- a/banyand/tsdb/bucket/bucket_suite_test.go
+++ b/pkg/test/flags/flags.go
@@ -14,24 +14,26 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package bucket_test
+package flags
 
 import (
-	"testing"
-
-	. "github.com/onsi/ginkgo/v2"
-	. "github.com/onsi/gomega"
+	"time"
+)
 
-	"github.com/apache/skywalking-banyandb/pkg/logger"
+var (
+	eventuallyTimeout string
+	EventuallyTimeout time.Duration
+	LogLevel          = "debug"
 )
 
-func TestBucket(t *testing.T) {
-	RegisterFailHandler(Fail)
-	BeforeSuite(func() {
-		Expect(logger.Init(logger.Logging{
-			Env:   "dev",
-			Level: "warn",
-		})).Should(Succeed())
-	})
-	RunSpecs(t, "Bucket Suite")
+func init() {
+	if eventuallyTimeout == "" {
+		EventuallyTimeout = time.Second * 3
+		return
+	}
+	d, err := time.ParseDuration(eventuallyTimeout)
+	if err != nil {
+		panic(err)
+	}
+	EventuallyTimeout = d
 }
diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go
index 9b3a190..849a059 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -27,6 +27,7 @@ import (
 	"google.golang.org/grpc/credentials/insecure"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
 	"github.com/apache/skywalking-banyandb/pkg/test/setup"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -51,7 +52,7 @@ var (
 var _ = SynchronizedBeforeSuite(func() []byte {
 	Expect(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	})).To(Succeed())
 	var addr string
 	addr, _, deferFunc = setup.SetUp()
diff --git a/test/integration/query/query_suite_test.go b/test/integration/load/load_suite_test.go
similarity index 61%
copy from test/integration/query/query_suite_test.go
copy to test/integration/load/load_suite_test.go
index 0bb67be..b7430f2 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/load/load_suite_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package integration_query_test
+package integration_load_test
 
 import (
 	"testing"
@@ -29,17 +29,13 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
 	"github.com/apache/skywalking-banyandb/pkg/test/setup"
-	"github.com/apache/skywalking-banyandb/pkg/timestamp"
-	cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
-	cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
 	cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
 	cases_stream_data "github.com/apache/skywalking-banyandb/test/cases/stream/data"
-	cases_topn "github.com/apache/skywalking-banyandb/test/cases/topn"
 )
 
-func TestIntegrationQuery(t *testing.T) {
+func TestIntegrationLoad(t *testing.T) {
 	RegisterFailHandler(Fail)
-	RunSpecs(t, "Integration Query Suite")
+	RunSpecs(t, "Integration Load Suite", Label("integration", "slow"))
 }
 
 var (
@@ -60,18 +56,26 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 		grpclib.WithTransportCredentials(insecure.NewCredentials()),
 	)
 	Expect(err).NotTo(HaveOccurred())
-	ns := timestamp.NowMilli().UnixNano()
-	now = time.Unix(0, ns-ns%int64(time.Minute))
-	interval := 500 * time.Millisecond
-	// stream
-	cases_stream_data.Write(conn, "data.json", now, interval)
-	// measure
-	interval = time.Minute
-	cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
-	cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
-	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
-	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
-	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+	days := 7
+	hours := 24
+	minutes := 60
+	interval := 10 * time.Second
+	c := time.Now()
+	for i := 0; i < days; i++ {
+		date := c.Add(-time.Hour * time.Duration((days-i)*24))
+		for h := 0; h < hours; h++ {
+			hour := date.Add(time.Hour * time.Duration(h))
+			start := time.Now()
+			for j := 0; j < minutes; j++ {
+				n := hour.Add(time.Minute * time.Duration(j))
+				ns := n.UnixNano()
+				now = time.Unix(0, ns-ns%int64(time.Minute))
+				// stream
+				cases_stream_data.Write(conn, "data.json", now, interval)
+			}
+			GinkgoWriter.Printf("written stream in %s took %s \n", hour, time.Since(start))
+		}
+	}
 	Expect(conn.Close()).To(Succeed())
 	return []byte(addr)
 }, func(address []byte) {
@@ -85,14 +89,6 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 		Connection: connection,
 		BaseTime:   now,
 	}
-	cases_measure.SharedContext = helpers.SharedContext{
-		Connection: connection,
-		BaseTime:   now,
-	}
-	cases_topn.SharedContext = helpers.SharedContext{
-		Connection: connection,
-		BaseTime:   now,
-	}
 	Expect(err).NotTo(HaveOccurred())
 })
 
diff --git a/test/integration/other/other_suite_test.go b/test/integration/other/other_suite_test.go
index 4eaee84..eaf2474 100644
--- a/test/integration/other/other_suite_test.go
+++ b/test/integration/other/other_suite_test.go
@@ -24,6 +24,7 @@ import (
 	gm "github.com/onsi/gomega"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestIntegrationOther(t *testing.T) {
@@ -34,6 +35,6 @@ func TestIntegrationOther(t *testing.T) {
 var _ = g.BeforeSuite(func() {
 	gm.Expect(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	})).To(gm.Succeed())
 })
diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go
index 0bb67be..69be763 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -27,6 +27,7 @@ import (
 	"google.golang.org/grpc/credentials/insecure"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
 	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
 	"github.com/apache/skywalking-banyandb/pkg/test/setup"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -51,7 +52,7 @@ var (
 var _ = SynchronizedBeforeSuite(func() []byte {
 	Expect(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "warn",
+		Level: flags.LogLevel,
 	})).To(Succeed())
 	var addr string
 	addr, _, deferFunc = setup.SetUp()