You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/02 04:22:13 UTC

[skywalking-banyandb] 01/01: Fix bugs of tsdb

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch tsdb
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c875b64d7666db702ae2e35082f362d966311031
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Nov 2 04:16:18 2022 +0000

    Fix bugs of tsdb
    
    * Strategy manager failed to create the current block if there are
      blocks existed
    * A non-aligned block overflows the segment
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 Makefile                                           |   2 +-
 banyand/tsdb/block.go                              |   7 +-
 banyand/tsdb/bucket/bucket.go                      |   4 +-
 banyand/tsdb/bucket/strategy.go                    |  14 +-
 banyand/tsdb/bucket/strategy_test.go               |   4 +-
 banyand/tsdb/segment.go                            |  71 ++++----
 banyand/tsdb/series.go                             |   2 +-
 banyand/tsdb/seriesdb.go                           |   6 +-
 banyand/tsdb/shard.go                              |  47 +++---
 banyand/tsdb/shard_test.go                         |  62 +++++++
 banyand/tsdb/tsdb.go                               |   4 +
 banyand/tsdb/tsdb_test.go                          |  52 ++++--
 pkg/timestamp/range.go                             |   4 +
 .../data/testdata/service_cpm_minute_data.json     | 182 ---------------------
 14 files changed, 190 insertions(+), 271 deletions(-)

diff --git a/Makefile b/Makefile
index db19f92..37eee1c 100644
--- a/Makefile
+++ b/Makefile
@@ -65,7 +65,7 @@ test-coverage: default ## Run the unit tests in all projects with coverage analy
 include scripts/build/ginkgo.mk
 
 test-ci: $(GINKGO) ## Run the unit tests in CI
-	$(GINKGO) -v --race --cover --covermode atomic --coverprofile=coverage.out ./... 
+	$(GINKGO) --race --cover --covermode atomic --coverprofile=coverage.out ./... 
 
 ##@ Code quality targets
 
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 0043528..3bc87ea 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -80,7 +80,7 @@ type block struct {
 type blockOpts struct {
 	segID     uint16
 	blockSize IntervalRule
-	startTime time.Time
+	timeRange timestamp.TimeRange
 	suffix    string
 	path      string
 	queue     bucket.Queue
@@ -92,15 +92,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 		return nil, err
 	}
 	id := GenerateInternalID(opts.blockSize.Unit, suffixInteger)
-	timeRange := timestamp.NewTimeRange(opts.startTime, opts.blockSize.NextTime(opts.startTime), true, false)
 	clock, _ := timestamp.GetClock(ctx)
 	b = &block{
 		segID:     opts.segID,
 		blockID:   id,
 		path:      opts.path,
 		l:         logger.Fetch(ctx, "block"),
-		TimeRange: timeRange,
-		Reporter:  bucket.NewTimeBasedReporter(timeRange, clock),
+		TimeRange: opts.timeRange,
+		Reporter:  bucket.NewTimeBasedReporter(opts.timeRange, clock),
 		flushCh:   make(chan struct{}, 1),
 		ref:       &atomic.Int32{},
 		closed:    &atomic.Bool{},
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index ad5b051..5838c26 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -24,7 +24,7 @@ import (
 )
 
 type Controller interface {
-	Current() Reporter
+	Current() (Reporter, error)
 	Next() (Reporter, error)
 	OnMove(prev, next Reporter)
 }
@@ -42,6 +42,8 @@ type Reporter interface {
 	String() string
 }
 
+var _ Reporter = (*timeBasedReporter)(nil)
+
 type timeBasedReporter struct {
 	timestamp.TimeRange
 	reporterStopCh chan struct{}
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index de8011c..4a8d14e 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -82,16 +82,17 @@ func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error)
 	if strategy.logger == nil {
 		strategy.logger = logger.GetLogger("bucket-strategy")
 	}
+	c, err := ctrl.Current()
+	if err != nil {
+		return nil, err
+	}
+	strategy.current.Store(c)
 	return strategy, nil
 }
 
 func (s *Strategy) Run() {
-	for s.current.Load() == nil {
-		s.current.Store(s.ctrl.Current())
-	}
 	go func(s *Strategy) {
 		for {
-
 			c := s.current.Load().(Reporter).Report()
 			if !s.observe(c) {
 				return
@@ -110,7 +111,6 @@ func (s *Strategy) String() string {
 }
 
 func (s *Strategy) observe(c Channel) bool {
-	var err error
 	var next Reporter
 	moreBucket := true
 	for {
@@ -122,11 +122,13 @@ func (s *Strategy) observe(c Channel) bool {
 			ratio := Ratio(status.Volume) / Ratio(status.Capacity)
 			atomic.StoreUint64(&s.currentRatio, math.Float64bits(float64(ratio)))
 			if ratio >= s.ratio && next == nil && moreBucket {
-				next, err = s.ctrl.Next()
+				n, err := s.ctrl.Next()
 				if errors.Is(err, ErrNoMoreBucket) {
 					moreBucket = false
 				} else if err != nil {
 					s.logger.Err(err).Msg("failed to create the next bucket")
+				} else {
+					next = n
 				}
 			}
 			if ratio >= 1.0 {
diff --git a/banyand/tsdb/bucket/strategy_test.go b/banyand/tsdb/bucket/strategy_test.go
index aca1022..00f6ef5 100644
--- a/banyand/tsdb/bucket/strategy_test.go
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -109,10 +109,10 @@ func (c *controller) Next() (bucket.Reporter, error) {
 	return c.reporter, nil
 }
 
-func (c *controller) Current() bucket.Reporter {
+func (c *controller) Current() (bucket.Reporter, error) {
 	c.mux.RLock()
 	defer c.mux.RUnlock()
-	return c.reporter
+	return c.reporter, nil
 }
 
 func (c *controller) OnMove(prev bucket.Reporter, next bucket.Reporter) {
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index b516bd2..3393f67 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -60,7 +60,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 		return nil, err
 	}
 	id := GenerateInternalID(segmentSize.Unit, suffixInteger)
-	timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false)
+	timeRange := timestamp.NewSectionTimeRange(startTime, segmentSize.NextTime(startTime))
 	l := logger.Fetch(ctx, "segment")
 	segCtx := context.WithValue(ctx, logger.ContextKey, l)
 	clock, segCtx := timestamp.GetClock(segCtx)
@@ -73,8 +73,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 		TimeRange:       timeRange,
 		Reporter:        bucket.NewTimeBasedReporter(timeRange, clock),
 	}
-	isHead := s.End.After(clock.Now())
-	err = s.blockController.open(isHead)
+	err = s.blockController.open()
 	if err != nil {
 		return nil, err
 	}
@@ -101,7 +100,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 			}
 		}
 	}
-	if !isHead {
+	if !s.End.After(clock.Now()) {
 		return
 	}
 	s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l))
@@ -175,33 +174,31 @@ func newBlockController(segCtx context.Context, segID uint16, location string, s
 	}
 }
 
-func (bc *blockController) Current() bucket.Reporter {
-	bc.RLock()
-	defer bc.RUnlock()
+func (bc *blockController) Current() (bucket.Reporter, error) {
 	now := bc.clock.Now()
-	for _, s := range bc.lst {
-		if s.suffix == bc.Format(now) {
-			return s
+	ns := uint64(now.UnixNano())
+	if b := func() bucket.Reporter {
+		bc.RLock()
+		defer bc.RUnlock()
+		for _, s := range bc.lst {
+			if s.Contains(ns) {
+				return s
+			}
 		}
+		return nil
+	}(); b != nil {
+		return b, nil
 	}
-	// return the latest segment before now
-	if len(bc.lst) > 0 {
-		return bc.lst[len(bc.lst)-1]
-	}
-	return nil
+	return bc.create(now)
 }
 
 func (bc *blockController) Next() (bucket.Reporter, error) {
-	b := bc.Current().(*block)
-	reporter, err := bc.create(
-		bc.blockSize.NextTime(b.Start))
-	if errors.Is(err, ErrEndOfSegment) {
-		return nil, bucket.ErrNoMoreBucket
-	}
+	c, err := bc.Current()
 	if err != nil {
 		return nil, err
 	}
-	return reporter, err
+	b := c.(*block)
+	return bc.create(bc.blockSize.NextTime(b.Start))
 }
 
 func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
@@ -327,8 +324,8 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) {
 	panic("invalid interval unit")
 }
 
-func (bc *blockController) open(createIfEmpty bool) error {
-	err := WalkDir(
+func (bc *blockController) open() error {
+	return WalkDir(
 		bc.location,
 		segPathPrefix,
 		func(suffix, absolutePath string) error {
@@ -337,19 +334,6 @@ func (bc *blockController) open(createIfEmpty bool) error {
 			_, err := bc.load(suffix, absolutePath)
 			return err
 		})
-	if err != nil {
-		return err
-	}
-	if !createIfEmpty {
-		return nil
-	}
-	if bc.Current() == nil {
-		_, err := bc.create(bc.clock.Now())
-		if err != nil {
-			return err
-		}
-	}
-	return nil
 }
 
 func (bc *blockController) create(startTime time.Time) (*block, error) {
@@ -357,11 +341,16 @@ func (bc *blockController) create(startTime time.Time) (*block, error) {
 		startTime = bc.segTimeRange.Start
 	}
 	if !startTime.Before(bc.segTimeRange.End) {
-		return nil, ErrEndOfSegment
+		return nil, bucket.ErrNoMoreBucket
 	}
 	bc.Lock()
 	defer bc.Unlock()
 	suffix := bc.Format(startTime)
+	for _, b := range bc.lst {
+		if b.suffix == suffix {
+			return b, nil
+		}
+	}
 	segPath, err := mkdir(blockTemplate, bc.location, suffix)
 	if err != nil {
 		return nil, err
@@ -374,6 +363,10 @@ func (bc *blockController) load(suffix, path string) (b *block, err error) {
 	if err != nil {
 		return nil, err
 	}
+	endTime := bc.blockSize.NextTime(starTime)
+	if endTime.After(bc.segTimeRange.End) {
+		endTime = bc.segTimeRange.End
+	}
 	if b, err = newBlock(
 		common.SetPosition(bc.segCtx, func(p common.Position) common.Position {
 			p.Block = suffix
@@ -382,7 +375,7 @@ func (bc *blockController) load(suffix, path string) (b *block, err error) {
 		blockOpts{
 			segID:     bc.segID,
 			path:      path,
-			startTime: starTime,
+			timeRange: timestamp.NewSectionTimeRange(starTime, endTime),
 			suffix:    suffix,
 			blockSize: bc.blockSize,
 			queue:     bc.blockQueue,
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 2b9c355..c0956de 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -140,7 +140,7 @@ func (s *series) Create(t time.Time) (SeriesSpan, error) {
 			Msg("load a series span")
 		return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil
 	}
-	b, err := s.blockDB.create(tr)
+	b, err := s.blockDB.create(t)
 	if err != nil {
 		return nil, err
 	}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 61332f7..7825338 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -24,6 +24,7 @@ import (
 	"math"
 	"sort"
 	"sync"
+	"time"
 
 	"go.uber.org/multierr"
 
@@ -148,7 +149,7 @@ type SeriesDatabase interface {
 type blockDatabase interface {
 	shardID() common.ShardID
 	span(timeRange timestamp.TimeRange) ([]BlockDelegate, error)
-	create(timeRange timestamp.TimeRange) (BlockDelegate, error)
+	create(ts time.Time) (BlockDelegate, error)
 	block(id GlobalItemID) (BlockDelegate, error)
 }
 
@@ -266,9 +267,10 @@ func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error)
 	return result, nil
 }
 
-func (s *seriesDB) create(timeRange timestamp.TimeRange) (BlockDelegate, error) {
+func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) {
 	s.Lock()
 	defer s.Unlock()
+	timeRange := timestamp.NewInclusiveTimeRange(ts, ts)
 	ss := s.segCtrl.span(timeRange)
 	if len(ss) > 0 {
 		s := ss[0]
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 4f3c168..277ee03 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -293,24 +293,30 @@ func (sc *segmentController) segments() (ss []*segment) {
 	return r
 }
 
-func (sc *segmentController) Current() bucket.Reporter {
-	sc.RLock()
-	defer sc.RUnlock()
+func (sc *segmentController) Current() (bucket.Reporter, error) {
 	now := sc.clock.Now()
-	for _, s := range sc.lst {
-		if s.suffix == sc.Format(now) {
-			return s
+	ns := uint64(now.UnixNano())
+	if b := func() bucket.Reporter {
+		sc.RLock()
+		defer sc.RUnlock()
+		for _, s := range sc.lst {
+			if s.Contains(ns) {
+				return s
+			}
 		}
+		return nil
+	}(); b != nil {
+		return b, nil
 	}
-	// return the latest segment before now
-	if len(sc.lst) > 0 {
-		return sc.lst[len(sc.lst)-1]
-	}
-	return nil
+	return sc.create(sc.Format(now), true)
 }
 
 func (sc *segmentController) Next() (bucket.Reporter, error) {
-	seg := sc.Current().(*segment)
+	c, err := sc.Current()
+	if err != nil {
+		return nil, err
+	}
+	seg := c.(*segment)
 	reporter, err := sc.create(sc.Format(
 		sc.segmentSize.NextTime(seg.Start)), true)
 	if errors.Is(err, ErrEndOfSegment) {
@@ -351,7 +357,7 @@ func (sc *segmentController) Parse(value string) (time.Time, error) {
 }
 
 func (sc *segmentController) open() error {
-	err := WalkDir(
+	return WalkDir(
 		sc.location,
 		segPathPrefix,
 		func(suffix, absolutePath string) error {
@@ -363,21 +369,16 @@ func (sc *segmentController) open() error {
 			}
 			return err
 		})
-	if err != nil {
-		return err
-	}
-	if sc.Current() == nil {
-		_, err = sc.create(sc.Format(sc.clock.Now()), true)
-		if err != nil {
-			return err
-		}
-	}
-	return nil
 }
 
 func (sc *segmentController) create(suffix string, createBlockIfEmpty bool) (*segment, error) {
 	sc.Lock()
 	defer sc.Unlock()
+	for _, s := range sc.lst {
+		if s.suffix == suffix {
+			return s, nil
+		}
+	}
 	segPath, err := mkdir(segTemplate, sc.location, suffix)
 	if err != nil {
 		return nil, err
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 350085a..1e1197e 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -540,5 +540,67 @@ var _ = Describe("Shard", func() {
 				},
 			}))
 		})
+		It("creates arbitrary blocks", func() {
+			clock.Set(time.Date(1970, 0o1, 0o1, 1, 0, 0, 0, time.Local))
+			By("open 1 block")
+			var err error
+			shard, err = tsdb.OpenShard(timestamp.SetClock(context.Background(), clock), common.ShardID(0), tmp,
+				tsdb.IntervalRule{
+					Unit: tsdb.DAY,
+					Num:  1,
+				},
+				tsdb.IntervalRule{
+					Unit: tsdb.HOUR,
+					Num:  12,
+				},
+				tsdb.IntervalRule{
+					Unit: tsdb.DAY,
+					Num:  7,
+				},
+				2,
+			)
+			Expect(err).NotTo(HaveOccurred())
+			By("01/01 00:01 1st block is opened")
+			t1 := clock.Now()
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o1),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+			}))
+			By("01/01 11:00 2nd block is opened")
+			clock.Add(10 * time.Hour)
+			t2 := clock.Now().Add(2 * time.Hour)
+			Eventually(func() []tsdb.BlockState {
+				if clock.TriggerTimer() {
+					GinkgoWriter.Println("01/01 10:00 has been triggered")
+				}
+				return shard.State().Blocks
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o1),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 13),
+					},
+					// The last block only takes 11 hours to align the segment's size
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 11*time.Hour, true, false),
+				},
+			}))
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
+		})
 	})
 })
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index dc6b62a..5710837 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -25,6 +25,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"time"
 
 	"github.com/pkg/errors"
 	"go.uber.org/multierr"
@@ -174,6 +175,9 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 	if opts.TTL.Num == 0 {
 		return nil, errors.Wrap(ErrOpenDatabase, "ttl is absent")
 	}
+	if opts.SegmentInterval.EstimatedDuration() > 24*time.Hour {
+		return nil, errors.Wrap(ErrOpenDatabase, "segment interval should not be greater than 24 hours")
+	}
 	db := &database{
 		location:    opts.Location,
 		shardNum:    opts.ShardNum,
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index fe6c9a6..684269f 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -30,15 +30,16 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 func TestOpenDatabase(t *testing.T) {
 	tester := assert.New(t)
 	req := require.New(t)
 	tempDir, deferFunc := test.Space(req)
-	openDatabase(req, tempDir)
+	openDatabase(context.Background(), req, tempDir)
 	defer deferFunc()
-	verifyDatabaseStructure(tester, tempDir)
+	verifyDatabaseStructure(tester, tempDir, time.Now())
 }
 
 func TestReOpenDatabase(t *testing.T) {
@@ -46,32 +47,63 @@ func TestReOpenDatabase(t *testing.T) {
 	req := require.New(t)
 	tempDir, deferFunc := test.Space(req)
 	defer deferFunc()
-	db := openDatabase(req, tempDir)
+	db := openDatabase(context.Background(), req, tempDir)
 	req.NoError(db.Close())
-	verifyDatabaseStructure(tester, tempDir)
-	db = openDatabase(req, tempDir)
+	verifyDatabaseStructure(tester, tempDir, time.Now())
+	db = openDatabase(context.Background(), req, tempDir)
 	req.NoError(db.Close())
-	verifyDatabaseStructure(tester, tempDir)
+	verifyDatabaseStructure(tester, tempDir, time.Now())
 }
 
-func verifyDatabaseStructure(tester *assert.Assertions, tempDir string) {
+func TestReOpenDatabaseNextBlock(t *testing.T) {
+	tester := assert.New(t)
+	req := require.New(t)
+	tempDir, deferFunc := test.Space(req)
+	defer deferFunc()
+	clock := timestamp.NewMockClock()
+	clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+	db := openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+	req.NoError(db.Close())
+	verifyDatabaseStructure(tester, tempDir, clock.Now())
+	clock.Add(5 * time.Hour)
+	db = openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+	req.NoError(db.Close())
+	verifyDatabaseStructure(tester, tempDir, clock.Now())
+}
+
+func TestReOpenDatabaseNextDay(t *testing.T) {
+	tester := assert.New(t)
+	req := require.New(t)
+	tempDir, deferFunc := test.Space(req)
+	defer deferFunc()
+	clock := timestamp.NewMockClock()
+	clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+	db := openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+	req.NoError(db.Close())
+	verifyDatabaseStructure(tester, tempDir, clock.Now())
+	clock.Add(26 * time.Hour)
+	db = openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+	req.NoError(db.Close())
+	verifyDatabaseStructure(tester, tempDir, clock.Now())
+}
+
+func verifyDatabaseStructure(tester *assert.Assertions, tempDir string, now time.Time) {
 	shardPath := fmt.Sprintf(shardTemplate, tempDir, 0)
 	validateDirectory(tester, shardPath)
 	seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
 	validateDirectory(tester, seriesPath)
-	now := time.Now()
 	segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segDayFormat))
 	validateDirectory(tester, segPath)
 	validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, now.Format(blockHourFormat)))
 }
 
-func openDatabase(t *require.Assertions, path string) (db Database) {
+func openDatabase(ctx context.Context, t *require.Assertions, path string) (db Database) {
 	t.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
 		Level: "warn",
 	}))
 	db, err := OpenDatabase(
-		context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")),
+		context.WithValue(ctx, logger.ContextKey, logger.GetLogger("test")),
 		DatabaseOpts{
 			Location: path,
 			ShardNum: 1,
diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go
index 3c5b77b..46ac360 100644
--- a/pkg/timestamp/range.go
+++ b/pkg/timestamp/range.go
@@ -84,6 +84,10 @@ func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration) Time
 	return NewTimeRangeDuration(start, duration, true, true)
 }
 
+func NewSectionTimeRange(start, end time.Time) TimeRange {
+	return NewTimeRange(start, end, true, false)
+}
+
 func NewTimeRange(start, end time.Time, includeStart, includeEnd bool) TimeRange {
 	return TimeRange{
 		Start:        start,
diff --git a/test/cases/stream/data/testdata/service_cpm_minute_data.json b/test/cases/stream/data/testdata/service_cpm_minute_data.json
deleted file mode 100644
index 877290c..0000000
--- a/test/cases/stream/data/testdata/service_cpm_minute_data.json
+++ /dev/null
@@ -1,182 +0,0 @@
-[
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "1"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_1"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 100
-        }
-      },
-      {
-        "int": {
-          "value": 1
-        }
-      }
-    ]
-  },
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "4"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_2"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 100
-        }
-      },
-      {
-        "int": {
-          "value": 1
-        }
-      }
-    ]
-  },
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "5"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_2"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 100
-        }
-      },
-      {
-        "int": {
-          "value": 1
-        }
-      }
-    ]
-  },
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "6"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_3"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 100
-        }
-      },
-      {
-        "int": {
-          "value": 5
-        }
-      }
-    ]
-  },
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "2"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_1"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 50
-        }
-      },
-      {
-        "int": {
-          "value": 4
-        }
-      }
-    ]
-  },
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "3"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_1"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 300
-        }
-      },
-      {
-        "int": {
-          "value": 5
-        }
-      }
-    ]
-  }
-]