You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/02 04:22:12 UTC

[skywalking-banyandb] branch tsdb created (now c875b64)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch tsdb
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at c875b64  Fix bugs of tsdb

This branch includes the following new commits:

     new c875b64  Fix bugs of tsdb

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: Fix bugs of tsdb

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch tsdb
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c875b64d7666db702ae2e35082f362d966311031
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Nov 2 04:16:18 2022 +0000

    Fix bugs of tsdb
    
    * Strategy manager failed to create the current block if there are
      blocks existed
    * A non-aligned block overflows the segment
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 Makefile                                           |   2 +-
 banyand/tsdb/block.go                              |   7 +-
 banyand/tsdb/bucket/bucket.go                      |   4 +-
 banyand/tsdb/bucket/strategy.go                    |  14 +-
 banyand/tsdb/bucket/strategy_test.go               |   4 +-
 banyand/tsdb/segment.go                            |  71 ++++----
 banyand/tsdb/series.go                             |   2 +-
 banyand/tsdb/seriesdb.go                           |   6 +-
 banyand/tsdb/shard.go                              |  47 +++---
 banyand/tsdb/shard_test.go                         |  62 +++++++
 banyand/tsdb/tsdb.go                               |   4 +
 banyand/tsdb/tsdb_test.go                          |  52 ++++--
 pkg/timestamp/range.go                             |   4 +
 .../data/testdata/service_cpm_minute_data.json     | 182 ---------------------
 14 files changed, 190 insertions(+), 271 deletions(-)

diff --git a/Makefile b/Makefile
index db19f92..37eee1c 100644
--- a/Makefile
+++ b/Makefile
@@ -65,7 +65,7 @@ test-coverage: default ## Run the unit tests in all projects with coverage analy
 include scripts/build/ginkgo.mk
 
 test-ci: $(GINKGO) ## Run the unit tests in CI
-	$(GINKGO) -v --race --cover --covermode atomic --coverprofile=coverage.out ./... 
+	$(GINKGO) --race --cover --covermode atomic --coverprofile=coverage.out ./... 
 
 ##@ Code quality targets
 
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 0043528..3bc87ea 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -80,7 +80,7 @@ type block struct {
 type blockOpts struct {
 	segID     uint16
 	blockSize IntervalRule
-	startTime time.Time
+	timeRange timestamp.TimeRange
 	suffix    string
 	path      string
 	queue     bucket.Queue
@@ -92,15 +92,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 		return nil, err
 	}
 	id := GenerateInternalID(opts.blockSize.Unit, suffixInteger)
-	timeRange := timestamp.NewTimeRange(opts.startTime, opts.blockSize.NextTime(opts.startTime), true, false)
 	clock, _ := timestamp.GetClock(ctx)
 	b = &block{
 		segID:     opts.segID,
 		blockID:   id,
 		path:      opts.path,
 		l:         logger.Fetch(ctx, "block"),
-		TimeRange: timeRange,
-		Reporter:  bucket.NewTimeBasedReporter(timeRange, clock),
+		TimeRange: opts.timeRange,
+		Reporter:  bucket.NewTimeBasedReporter(opts.timeRange, clock),
 		flushCh:   make(chan struct{}, 1),
 		ref:       &atomic.Int32{},
 		closed:    &atomic.Bool{},
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index ad5b051..5838c26 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -24,7 +24,7 @@ import (
 )
 
 type Controller interface {
-	Current() Reporter
+	Current() (Reporter, error)
 	Next() (Reporter, error)
 	OnMove(prev, next Reporter)
 }
@@ -42,6 +42,8 @@ type Reporter interface {
 	String() string
 }
 
+var _ Reporter = (*timeBasedReporter)(nil)
+
 type timeBasedReporter struct {
 	timestamp.TimeRange
 	reporterStopCh chan struct{}
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index de8011c..4a8d14e 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -82,16 +82,17 @@ func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error)
 	if strategy.logger == nil {
 		strategy.logger = logger.GetLogger("bucket-strategy")
 	}
+	c, err := ctrl.Current()
+	if err != nil {
+		return nil, err
+	}
+	strategy.current.Store(c)
 	return strategy, nil
 }
 
 func (s *Strategy) Run() {
-	for s.current.Load() == nil {
-		s.current.Store(s.ctrl.Current())
-	}
 	go func(s *Strategy) {
 		for {
-
 			c := s.current.Load().(Reporter).Report()
 			if !s.observe(c) {
 				return
@@ -110,7 +111,6 @@ func (s *Strategy) String() string {
 }
 
 func (s *Strategy) observe(c Channel) bool {
-	var err error
 	var next Reporter
 	moreBucket := true
 	for {
@@ -122,11 +122,13 @@ func (s *Strategy) observe(c Channel) bool {
 			ratio := Ratio(status.Volume) / Ratio(status.Capacity)
 			atomic.StoreUint64(&s.currentRatio, math.Float64bits(float64(ratio)))
 			if ratio >= s.ratio && next == nil && moreBucket {
-				next, err = s.ctrl.Next()
+				n, err := s.ctrl.Next()
 				if errors.Is(err, ErrNoMoreBucket) {
 					moreBucket = false
 				} else if err != nil {
 					s.logger.Err(err).Msg("failed to create the next bucket")
+				} else {
+					next = n
 				}
 			}
 			if ratio >= 1.0 {
diff --git a/banyand/tsdb/bucket/strategy_test.go b/banyand/tsdb/bucket/strategy_test.go
index aca1022..00f6ef5 100644
--- a/banyand/tsdb/bucket/strategy_test.go
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -109,10 +109,10 @@ func (c *controller) Next() (bucket.Reporter, error) {
 	return c.reporter, nil
 }
 
-func (c *controller) Current() bucket.Reporter {
+func (c *controller) Current() (bucket.Reporter, error) {
 	c.mux.RLock()
 	defer c.mux.RUnlock()
-	return c.reporter
+	return c.reporter, nil
 }
 
 func (c *controller) OnMove(prev bucket.Reporter, next bucket.Reporter) {
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index b516bd2..3393f67 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -60,7 +60,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 		return nil, err
 	}
 	id := GenerateInternalID(segmentSize.Unit, suffixInteger)
-	timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false)
+	timeRange := timestamp.NewSectionTimeRange(startTime, segmentSize.NextTime(startTime))
 	l := logger.Fetch(ctx, "segment")
 	segCtx := context.WithValue(ctx, logger.ContextKey, l)
 	clock, segCtx := timestamp.GetClock(segCtx)
@@ -73,8 +73,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 		TimeRange:       timeRange,
 		Reporter:        bucket.NewTimeBasedReporter(timeRange, clock),
 	}
-	isHead := s.End.After(clock.Now())
-	err = s.blockController.open(isHead)
+	err = s.blockController.open()
 	if err != nil {
 		return nil, err
 	}
@@ -101,7 +100,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 			}
 		}
 	}
-	if !isHead {
+	if !s.End.After(clock.Now()) {
 		return
 	}
 	s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l))
@@ -175,33 +174,31 @@ func newBlockController(segCtx context.Context, segID uint16, location string, s
 	}
 }
 
-func (bc *blockController) Current() bucket.Reporter {
-	bc.RLock()
-	defer bc.RUnlock()
+func (bc *blockController) Current() (bucket.Reporter, error) {
 	now := bc.clock.Now()
-	for _, s := range bc.lst {
-		if s.suffix == bc.Format(now) {
-			return s
+	ns := uint64(now.UnixNano())
+	if b := func() bucket.Reporter {
+		bc.RLock()
+		defer bc.RUnlock()
+		for _, s := range bc.lst {
+			if s.Contains(ns) {
+				return s
+			}
 		}
+		return nil
+	}(); b != nil {
+		return b, nil
 	}
-	// return the latest segment before now
-	if len(bc.lst) > 0 {
-		return bc.lst[len(bc.lst)-1]
-	}
-	return nil
+	return bc.create(now)
 }
 
 func (bc *blockController) Next() (bucket.Reporter, error) {
-	b := bc.Current().(*block)
-	reporter, err := bc.create(
-		bc.blockSize.NextTime(b.Start))
-	if errors.Is(err, ErrEndOfSegment) {
-		return nil, bucket.ErrNoMoreBucket
-	}
+	c, err := bc.Current()
 	if err != nil {
 		return nil, err
 	}
-	return reporter, err
+	b := c.(*block)
+	return bc.create(bc.blockSize.NextTime(b.Start))
 }
 
 func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
@@ -327,8 +324,8 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) {
 	panic("invalid interval unit")
 }
 
-func (bc *blockController) open(createIfEmpty bool) error {
-	err := WalkDir(
+func (bc *blockController) open() error {
+	return WalkDir(
 		bc.location,
 		segPathPrefix,
 		func(suffix, absolutePath string) error {
@@ -337,19 +334,6 @@ func (bc *blockController) open(createIfEmpty bool) error {
 			_, err := bc.load(suffix, absolutePath)
 			return err
 		})
-	if err != nil {
-		return err
-	}
-	if !createIfEmpty {
-		return nil
-	}
-	if bc.Current() == nil {
-		_, err := bc.create(bc.clock.Now())
-		if err != nil {
-			return err
-		}
-	}
-	return nil
 }
 
 func (bc *blockController) create(startTime time.Time) (*block, error) {
@@ -357,11 +341,16 @@ func (bc *blockController) create(startTime time.Time) (*block, error) {
 		startTime = bc.segTimeRange.Start
 	}
 	if !startTime.Before(bc.segTimeRange.End) {
-		return nil, ErrEndOfSegment
+		return nil, bucket.ErrNoMoreBucket
 	}
 	bc.Lock()
 	defer bc.Unlock()
 	suffix := bc.Format(startTime)
+	for _, b := range bc.lst {
+		if b.suffix == suffix {
+			return b, nil
+		}
+	}
 	segPath, err := mkdir(blockTemplate, bc.location, suffix)
 	if err != nil {
 		return nil, err
@@ -374,6 +363,10 @@ func (bc *blockController) load(suffix, path string) (b *block, err error) {
 	if err != nil {
 		return nil, err
 	}
+	endTime := bc.blockSize.NextTime(starTime)
+	if endTime.After(bc.segTimeRange.End) {
+		endTime = bc.segTimeRange.End
+	}
 	if b, err = newBlock(
 		common.SetPosition(bc.segCtx, func(p common.Position) common.Position {
 			p.Block = suffix
@@ -382,7 +375,7 @@ func (bc *blockController) load(suffix, path string) (b *block, err error) {
 		blockOpts{
 			segID:     bc.segID,
 			path:      path,
-			startTime: starTime,
+			timeRange: timestamp.NewSectionTimeRange(starTime, endTime),
 			suffix:    suffix,
 			blockSize: bc.blockSize,
 			queue:     bc.blockQueue,
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 2b9c355..c0956de 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -140,7 +140,7 @@ func (s *series) Create(t time.Time) (SeriesSpan, error) {
 			Msg("load a series span")
 		return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil
 	}
-	b, err := s.blockDB.create(tr)
+	b, err := s.blockDB.create(t)
 	if err != nil {
 		return nil, err
 	}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 61332f7..7825338 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -24,6 +24,7 @@ import (
 	"math"
 	"sort"
 	"sync"
+	"time"
 
 	"go.uber.org/multierr"
 
@@ -148,7 +149,7 @@ type SeriesDatabase interface {
 type blockDatabase interface {
 	shardID() common.ShardID
 	span(timeRange timestamp.TimeRange) ([]BlockDelegate, error)
-	create(timeRange timestamp.TimeRange) (BlockDelegate, error)
+	create(ts time.Time) (BlockDelegate, error)
 	block(id GlobalItemID) (BlockDelegate, error)
 }
 
@@ -266,9 +267,10 @@ func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error)
 	return result, nil
 }
 
-func (s *seriesDB) create(timeRange timestamp.TimeRange) (BlockDelegate, error) {
+func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) {
 	s.Lock()
 	defer s.Unlock()
+	timeRange := timestamp.NewInclusiveTimeRange(ts, ts)
 	ss := s.segCtrl.span(timeRange)
 	if len(ss) > 0 {
 		s := ss[0]
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 4f3c168..277ee03 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -293,24 +293,30 @@ func (sc *segmentController) segments() (ss []*segment) {
 	return r
 }
 
-func (sc *segmentController) Current() bucket.Reporter {
-	sc.RLock()
-	defer sc.RUnlock()
+func (sc *segmentController) Current() (bucket.Reporter, error) {
 	now := sc.clock.Now()
-	for _, s := range sc.lst {
-		if s.suffix == sc.Format(now) {
-			return s
+	ns := uint64(now.UnixNano())
+	if b := func() bucket.Reporter {
+		sc.RLock()
+		defer sc.RUnlock()
+		for _, s := range sc.lst {
+			if s.Contains(ns) {
+				return s
+			}
 		}
+		return nil
+	}(); b != nil {
+		return b, nil
 	}
-	// return the latest segment before now
-	if len(sc.lst) > 0 {
-		return sc.lst[len(sc.lst)-1]
-	}
-	return nil
+	return sc.create(sc.Format(now), true)
 }
 
 func (sc *segmentController) Next() (bucket.Reporter, error) {
-	seg := sc.Current().(*segment)
+	c, err := sc.Current()
+	if err != nil {
+		return nil, err
+	}
+	seg := c.(*segment)
 	reporter, err := sc.create(sc.Format(
 		sc.segmentSize.NextTime(seg.Start)), true)
 	if errors.Is(err, ErrEndOfSegment) {
@@ -351,7 +357,7 @@ func (sc *segmentController) Parse(value string) (time.Time, error) {
 }
 
 func (sc *segmentController) open() error {
-	err := WalkDir(
+	return WalkDir(
 		sc.location,
 		segPathPrefix,
 		func(suffix, absolutePath string) error {
@@ -363,21 +369,16 @@ func (sc *segmentController) open() error {
 			}
 			return err
 		})
-	if err != nil {
-		return err
-	}
-	if sc.Current() == nil {
-		_, err = sc.create(sc.Format(sc.clock.Now()), true)
-		if err != nil {
-			return err
-		}
-	}
-	return nil
 }
 
 func (sc *segmentController) create(suffix string, createBlockIfEmpty bool) (*segment, error) {
 	sc.Lock()
 	defer sc.Unlock()
+	for _, s := range sc.lst {
+		if s.suffix == suffix {
+			return s, nil
+		}
+	}
 	segPath, err := mkdir(segTemplate, sc.location, suffix)
 	if err != nil {
 		return nil, err
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 350085a..1e1197e 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -540,5 +540,67 @@ var _ = Describe("Shard", func() {
 				},
 			}))
 		})
+		It("creates arbitrary blocks", func() {
+			clock.Set(time.Date(1970, 0o1, 0o1, 1, 0, 0, 0, time.Local))
+			By("open 1 block")
+			var err error
+			shard, err = tsdb.OpenShard(timestamp.SetClock(context.Background(), clock), common.ShardID(0), tmp,
+				tsdb.IntervalRule{
+					Unit: tsdb.DAY,
+					Num:  1,
+				},
+				tsdb.IntervalRule{
+					Unit: tsdb.HOUR,
+					Num:  12,
+				},
+				tsdb.IntervalRule{
+					Unit: tsdb.DAY,
+					Num:  7,
+				},
+				2,
+			)
+			Expect(err).NotTo(HaveOccurred())
+			By("01/01 00:01 1st block is opened")
+			t1 := clock.Now()
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o1),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+			}))
+			By("01/01 11:00 2nd block is opened")
+			clock.Add(10 * time.Hour)
+			t2 := clock.Now().Add(2 * time.Hour)
+			Eventually(func() []tsdb.BlockState {
+				if clock.TriggerTimer() {
+					GinkgoWriter.Println("01/01 10:00 has been triggered")
+				}
+				return shard.State().Blocks
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o1),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 13),
+					},
+					// The last block only takes 11 hours to align the segment's size
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 11*time.Hour, true, false),
+				},
+			}))
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
+		})
 	})
 })
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index dc6b62a..5710837 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -25,6 +25,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"time"
 
 	"github.com/pkg/errors"
 	"go.uber.org/multierr"
@@ -174,6 +175,9 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 	if opts.TTL.Num == 0 {
 		return nil, errors.Wrap(ErrOpenDatabase, "ttl is absent")
 	}
+	if opts.SegmentInterval.EstimatedDuration() > 24*time.Hour {
+		return nil, errors.Wrap(ErrOpenDatabase, "segment interval should not be greater than 24 hours")
+	}
 	db := &database{
 		location:    opts.Location,
 		shardNum:    opts.ShardNum,
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index fe6c9a6..684269f 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -30,15 +30,16 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 func TestOpenDatabase(t *testing.T) {
 	tester := assert.New(t)
 	req := require.New(t)
 	tempDir, deferFunc := test.Space(req)
-	openDatabase(req, tempDir)
+	openDatabase(context.Background(), req, tempDir)
 	defer deferFunc()
-	verifyDatabaseStructure(tester, tempDir)
+	verifyDatabaseStructure(tester, tempDir, time.Now())
 }
 
 func TestReOpenDatabase(t *testing.T) {
@@ -46,32 +47,63 @@ func TestReOpenDatabase(t *testing.T) {
 	req := require.New(t)
 	tempDir, deferFunc := test.Space(req)
 	defer deferFunc()
-	db := openDatabase(req, tempDir)
+	db := openDatabase(context.Background(), req, tempDir)
 	req.NoError(db.Close())
-	verifyDatabaseStructure(tester, tempDir)
-	db = openDatabase(req, tempDir)
+	verifyDatabaseStructure(tester, tempDir, time.Now())
+	db = openDatabase(context.Background(), req, tempDir)
 	req.NoError(db.Close())
-	verifyDatabaseStructure(tester, tempDir)
+	verifyDatabaseStructure(tester, tempDir, time.Now())
 }
 
-func verifyDatabaseStructure(tester *assert.Assertions, tempDir string) {
+func TestReOpenDatabaseNextBlock(t *testing.T) {
+	tester := assert.New(t)
+	req := require.New(t)
+	tempDir, deferFunc := test.Space(req)
+	defer deferFunc()
+	clock := timestamp.NewMockClock()
+	clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+	db := openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+	req.NoError(db.Close())
+	verifyDatabaseStructure(tester, tempDir, clock.Now())
+	clock.Add(5 * time.Hour)
+	db = openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+	req.NoError(db.Close())
+	verifyDatabaseStructure(tester, tempDir, clock.Now())
+}
+
+func TestReOpenDatabaseNextDay(t *testing.T) {
+	tester := assert.New(t)
+	req := require.New(t)
+	tempDir, deferFunc := test.Space(req)
+	defer deferFunc()
+	clock := timestamp.NewMockClock()
+	clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+	db := openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+	req.NoError(db.Close())
+	verifyDatabaseStructure(tester, tempDir, clock.Now())
+	clock.Add(26 * time.Hour)
+	db = openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+	req.NoError(db.Close())
+	verifyDatabaseStructure(tester, tempDir, clock.Now())
+}
+
+func verifyDatabaseStructure(tester *assert.Assertions, tempDir string, now time.Time) {
 	shardPath := fmt.Sprintf(shardTemplate, tempDir, 0)
 	validateDirectory(tester, shardPath)
 	seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
 	validateDirectory(tester, seriesPath)
-	now := time.Now()
 	segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segDayFormat))
 	validateDirectory(tester, segPath)
 	validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, now.Format(blockHourFormat)))
 }
 
-func openDatabase(t *require.Assertions, path string) (db Database) {
+func openDatabase(ctx context.Context, t *require.Assertions, path string) (db Database) {
 	t.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
 		Level: "warn",
 	}))
 	db, err := OpenDatabase(
-		context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")),
+		context.WithValue(ctx, logger.ContextKey, logger.GetLogger("test")),
 		DatabaseOpts{
 			Location: path,
 			ShardNum: 1,
diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go
index 3c5b77b..46ac360 100644
--- a/pkg/timestamp/range.go
+++ b/pkg/timestamp/range.go
@@ -84,6 +84,10 @@ func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration) Time
 	return NewTimeRangeDuration(start, duration, true, true)
 }
 
+func NewSectionTimeRange(start, end time.Time) TimeRange {
+	return NewTimeRange(start, end, true, false)
+}
+
 func NewTimeRange(start, end time.Time, includeStart, includeEnd bool) TimeRange {
 	return TimeRange{
 		Start:        start,
diff --git a/test/cases/stream/data/testdata/service_cpm_minute_data.json b/test/cases/stream/data/testdata/service_cpm_minute_data.json
deleted file mode 100644
index 877290c..0000000
--- a/test/cases/stream/data/testdata/service_cpm_minute_data.json
+++ /dev/null
@@ -1,182 +0,0 @@
-[
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "1"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_1"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 100
-        }
-      },
-      {
-        "int": {
-          "value": 1
-        }
-      }
-    ]
-  },
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "4"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_2"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 100
-        }
-      },
-      {
-        "int": {
-          "value": 1
-        }
-      }
-    ]
-  },
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "5"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_2"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 100
-        }
-      },
-      {
-        "int": {
-          "value": 1
-        }
-      }
-    ]
-  },
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "6"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_3"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 100
-        }
-      },
-      {
-        "int": {
-          "value": 5
-        }
-      }
-    ]
-  },
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "2"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_1"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 50
-        }
-      },
-      {
-        "int": {
-          "value": 4
-        }
-      }
-    ]
-  },
-  {
-    "tag_families": [
-      {
-        "tags": [
-          {
-            "id": {
-              "value": "3"
-            }
-          },
-          {
-            "str": {
-              "value": "entity_1"
-            }
-          }
-        ]
-      }
-    ],
-    "fields": [
-      {
-        "int": {
-          "value": 300
-        }
-      },
-      {
-        "int": {
-          "value": 5
-        }
-      }
-    ]
-  }
-]