You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/18 11:03:32 UTC

[skywalking-banyandb] branch patch-metadata created (now 25ed40d)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch patch-metadata
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at 25ed40d  Introduce the clock mock to test block controller

This branch includes the following new commits:

     new 25ed40d  Introduce the clock mock to test block controller

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: Introduce the clock mock to test block controller

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch patch-metadata
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 25ed40db6b82c4f66b1020ad879eae84e1fcc5ff
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Apr 18 10:48:23 2022 +0000

    Introduce the clock mock to test block controller
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/tsdb/block.go           |   5 +-
 banyand/tsdb/bucket/bucket.go   |   8 +-
 banyand/tsdb/bucket/queue.go    |  11 +
 banyand/tsdb/bucket/strategy.go |   1 +
 banyand/tsdb/segment.go         |  40 ++--
 banyand/tsdb/shard.go           |  33 ++-
 banyand/tsdb/shard_test.go      | 436 +++++++++++++++++++++++++++++-----------
 banyand/tsdb/tsdb.go            |  20 +-
 banyand/tsdb/tsdb_suite_test.go |   2 +-
 go.mod                          |   6 +-
 go.sum                          |   3 +-
 pkg/timestamp/clock.go          |  67 ++++++
 12 files changed, 465 insertions(+), 167 deletions(-)

diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 88d88c2..e5af100 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -82,7 +82,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	if err != nil {
 		return nil, err
 	}
-	id := uint16(opts.blockSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
+	id := GenerateInternalID(opts.blockSize.Unit, suffixInteger)
 	timeRange := timestamp.NewTimeRange(opts.startTime, opts.blockSize.NextTime(opts.startTime), true, false)
 	encodingMethodObject := ctx.Value(encodingMethodKey)
 	if encodingMethodObject == nil {
@@ -91,13 +91,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 			DecoderPool: encoding.NewPlainDecoderPool(0),
 		}
 	}
+	clock, _ := timestamp.GetClock(ctx)
 	b = &block{
 		segID:          opts.segID,
 		blockID:        id,
 		path:           opts.path,
 		l:              logger.Fetch(ctx, "block"),
 		TimeRange:      timeRange,
-		Reporter:       bucket.NewTimeBasedReporter(timeRange),
+		Reporter:       bucket.NewTimeBasedReporter(timeRange, clock),
 		closed:         atomic.NewBool(true),
 		encodingMethod: encodingMethodObject.(EncodingMethod),
 	}
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index dc1b77f..615293b 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -45,12 +45,14 @@ type Reporter interface {
 type timeBasedReporter struct {
 	timestamp.TimeRange
 	reporterStopCh chan struct{}
+	clock          timestamp.Clock
 }
 
-func NewTimeBasedReporter(timeRange timestamp.TimeRange) Reporter {
+func NewTimeBasedReporter(timeRange timestamp.TimeRange, clock timestamp.Clock) Reporter {
 	return &timeBasedReporter{
 		TimeRange:      timeRange,
 		reporterStopCh: make(chan struct{}),
+		clock:          clock,
 	}
 }
 
@@ -62,14 +64,14 @@ func (tr *timeBasedReporter) Report() Channel {
 	}
 	go func() {
 		defer close(ch)
-		ticker := time.NewTicker(interval)
+		ticker := tr.clock.Ticker(interval)
 		defer ticker.Stop()
 		for {
 			select {
 			case <-ticker.C:
 				status := Status{
 					Capacity: int(tr.End.UnixNano() - tr.Start.UnixNano()),
-					Volume:   int(time.Now().UnixNano() - tr.Start.UnixNano()),
+					Volume:   int(tr.clock.Now().UnixNano() - tr.Start.UnixNano()),
 				}
 				ch <- status
 				if status.Volume >= status.Capacity {
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index e26109d..85b73fa 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -29,6 +29,7 @@ type EvictFn func(id interface{})
 type Queue interface {
 	Push(id interface{})
 	Len() int
+	All() []interface{}
 }
 
 const (
@@ -114,6 +115,16 @@ func (q *lruQueue) Len() int {
 	return q.recent.Len() + q.frequent.Len()
 }
 
+func (q *lruQueue) All() []interface{} {
+	q.lock.RLock()
+	defer q.lock.RUnlock()
+	all := make([]interface{}, q.recent.Len()+q.frequent.Len()+q.recentEvict.Len())
+	copy(all, q.recent.Keys())
+	copy(all[q.recent.Len():], q.frequent.Keys())
+	copy(all[q.recent.Len()+q.frequent.Len():], q.recentEvict.Keys())
+	return all
+}
+
 func (q *lruQueue) ensureSpace(recentEvict bool) {
 	recentLen := q.recent.Len()
 	freqLen := q.frequent.Len()
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 581ebd8..e31cb74 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -124,5 +124,6 @@ func (s *Strategy) Run() {
 }
 
 func (s *Strategy) Close() {
+	s.ctrl.OnMove(s.current, nil)
 	close(s.stopCh)
 }
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index bf50669..42756c5 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -47,16 +47,17 @@ type segment struct {
 	blockManageStrategy *bucket.Strategy
 }
 
-func openSegment(ctx context.Context, startTime time.Time, path, suffix string, segmentSize, blockSize IntervalRule, blockQueue bucket.Queue) (s *segment, err error) {
+func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
+	segmentSize, blockSize IntervalRule, blockQueue bucket.Queue) (s *segment, err error) {
 	suffixInteger, err := strconv.Atoi(suffix)
 	if err != nil {
 		return nil, err
 	}
-	// TODO: fix id overflow
-	id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
+	id := GenerateInternalID(segmentSize.Unit, suffixInteger)
 	timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false)
 	l := logger.Fetch(ctx, "segment")
 	segCtx := context.WithValue(ctx, logger.ContextKey, l)
+	clock, segCtx := timestamp.GetClock(segCtx)
 	s = &segment{
 		id:              id,
 		path:            path,
@@ -64,7 +65,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 		l:               l,
 		blockController: newBlockController(segCtx, id, path, timeRange, blockSize, l, blockQueue),
 		TimeRange:       timeRange,
-		Reporter:        bucket.NewTimeBasedReporter(timeRange),
+		Reporter:        bucket.NewTimeBasedReporter(timeRange, clock),
 	}
 	err = s.blockController.open()
 	if err != nil {
@@ -87,7 +88,6 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 }
 
 func (s *segment) close() {
-	s.blockManageStrategy.Close()
 	s.blockController.close()
 	s.globalIndex.Close()
 	s.Stop()
@@ -114,12 +114,14 @@ type blockController struct {
 	blockSize    IntervalRule
 	lst          []*block
 	blockQueue   bucket.Queue
+	clock        timestamp.Clock
 
 	l *logger.Logger
 }
 
 func newBlockController(segCtx context.Context, segID uint16, location string, segTimeRange timestamp.TimeRange,
 	blockSize IntervalRule, l *logger.Logger, blockQueue bucket.Queue) *blockController {
+	clock, _ := timestamp.GetClock(segCtx)
 	return &blockController{
 		segCtx:       segCtx,
 		segID:        segID,
@@ -128,13 +130,14 @@ func newBlockController(segCtx context.Context, segID uint16, location string, s
 		segTimeRange: segTimeRange,
 		blockQueue:   blockQueue,
 		l:            l,
+		clock:        clock,
 	}
 }
 
 func (bc *blockController) Current() bucket.Reporter {
 	bc.RLock()
 	defer bc.RUnlock()
-	now := time.Now()
+	now := bc.clock.Now()
 	for _, s := range bc.lst {
 		if s.suffix == bc.Format(now) {
 			return s
@@ -162,11 +165,18 @@ func (bc *blockController) Next() (bucket.Reporter, error) {
 }
 
 func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
-	bc.l.Info().Stringer("prev", prev).Stringer("next", next).Msg("move to the next block")
-	bc.blockQueue.Push(BlockID{
-		SegID:   bc.segID,
-		BlockID: prev.(*block).blockID,
-	})
+	event := bc.l.Info()
+	if prev != nil {
+		event.Stringer("prev", prev)
+		bc.blockQueue.Push(BlockID{
+			SegID:   bc.segID,
+			BlockID: prev.(*block).blockID,
+		})
+	}
+	if next != nil {
+		event.Stringer("next", next)
+	}
+	event.Msg("move to the next block")
 }
 
 func (bc *blockController) Format(tm time.Time) string {
@@ -175,8 +185,6 @@ func (bc *blockController) Format(tm time.Time) string {
 		return tm.Format(blockHourFormat)
 	case DAY:
 		return tm.Format(blockDayFormat)
-	case MILLISECOND:
-		return tm.Format(millisecondFormat)
 	}
 	panic("invalid interval unit")
 }
@@ -187,8 +195,6 @@ func (bc *blockController) Parse(value string) (time.Time, error) {
 		return time.Parse(blockHourFormat, value)
 	case DAY:
 		return time.Parse(blockDayFormat, value)
-	case MILLISECOND:
-		return time.Parse(millisecondFormat, value)
 	}
 	panic("invalid interval unit")
 }
@@ -285,8 +291,6 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) {
 	case DAY:
 		return time.Date(startTime.Year(), startTime.Month(),
 			t.Day(), t.Hour(), 0, 0, 0, startTime.Location()), nil
-	case MILLISECOND:
-		return time.ParseInLocation(millisecondFormat, suffix, startTime.Location())
 	}
 	panic("invalid interval unit")
 }
@@ -303,7 +307,7 @@ func (bc *blockController) open() error {
 		return err
 	}
 	if bc.Current() == nil {
-		b, err := bc.create(time.Now())
+		b, err := bc.create(bc.clock.Now())
 		if err != nil {
 			return err
 		}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 3299e57..b3d3ca6 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -119,7 +119,7 @@ func (s *shard) Index() IndexDatabase {
 func (s *shard) State() (shardState ShardState) {
 	for _, seg := range s.segmentController.segments() {
 		for _, b := range seg.blockController.blocks() {
-			shardState.OpenedBlocks = append(shardState.OpenedBlocks, BlockState{
+			shardState.Blocks = append(shardState.Blocks, BlockState{
 				ID: BlockID{
 					SegID:   b.segID,
 					BlockID: b.blockID,
@@ -129,6 +129,11 @@ func (s *shard) State() (shardState ShardState) {
 			})
 		}
 	}
+	all := s.segmentController.blockQueue.All()
+	shardState.OpenBlocks = make([]BlockID, len(all))
+	for i, v := range s.segmentController.blockQueue.All() {
+		shardState.OpenBlocks[i] = v.(BlockID)
+	}
 	return shardState
 }
 
@@ -143,8 +148,7 @@ func (s *shard) Close() error {
 type IntervalUnit int
 
 const (
-	MILLISECOND IntervalUnit = iota // only for testing
-	HOUR
+	HOUR IntervalUnit = iota
 	DAY
 )
 
@@ -154,9 +158,6 @@ func (iu IntervalUnit) String() string {
 		return "hour"
 	case DAY:
 		return "day"
-	case MILLISECOND:
-		return "millis"
-
 	}
 	panic("invalid interval unit")
 }
@@ -172,8 +173,6 @@ func (ir IntervalRule) NextTime(current time.Time) time.Time {
 		return current.Add(time.Hour * time.Duration(ir.Num))
 	case DAY:
 		return current.AddDate(0, 0, ir.Num)
-	case MILLISECOND:
-		return current.Add(time.Millisecond * time.Duration(ir.Num))
 	}
 	panic("invalid interval unit")
 }
@@ -184,8 +183,6 @@ func (ir IntervalRule) EstimatedDuration() time.Duration {
 		return time.Hour * time.Duration(ir.Num)
 	case DAY:
 		return 24 * time.Hour * time.Duration(ir.Num)
-	case MILLISECOND:
-		return time.Microsecond * time.Duration(ir.Num)
 	}
 	panic("invalid interval unit")
 }
@@ -198,18 +195,21 @@ type segmentController struct {
 	blockSize   IntervalRule
 	lst         []*segment
 	blockQueue  bucket.Queue
+	clock       timestamp.Clock
 
 	l *logger.Logger
 }
 
-func newSegmentController(shardCtx context.Context, location string,
-	segmentSize, blockSize IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController, error) {
+func newSegmentController(shardCtx context.Context, location string, segmentSize, blockSize IntervalRule,
+	openedBlockSize int, l *logger.Logger) (*segmentController, error) {
+	clock, _ := timestamp.GetClock(shardCtx)
 	sc := &segmentController{
 		shardCtx:    shardCtx,
 		location:    location,
 		segmentSize: segmentSize,
 		blockSize:   blockSize,
 		l:           l,
+		clock:       clock,
 	}
 	var err error
 	sc.blockQueue, err = bucket.NewQueue(openedBlockSize, func(id interface{}) {
@@ -266,7 +266,7 @@ func (sc *segmentController) segments() (ss []*segment) {
 func (sc *segmentController) Current() bucket.Reporter {
 	sc.RLock()
 	defer sc.RUnlock()
-	now := time.Now()
+	now := sc.clock.Now()
 	for _, s := range sc.lst {
 		if s.suffix == sc.Format(now) {
 			return s
@@ -293,6 +293,7 @@ func (sc *segmentController) OnMove(prev bucket.Reporter, next bucket.Reporter)
 	event := sc.l.Info()
 	if prev != nil {
 		event.Stringer("prev", prev)
+		prev.(*segment).blockManageStrategy.Close()
 	}
 	if next != nil {
 		event.Stringer("next", next)
@@ -306,8 +307,6 @@ func (sc *segmentController) Format(tm time.Time) string {
 		return tm.Format(segHourFormat)
 	case DAY:
 		return tm.Format(segDayFormat)
-	case MILLISECOND:
-		return tm.Format(millisecondFormat)
 	}
 	panic("invalid interval unit")
 }
@@ -318,8 +317,6 @@ func (sc *segmentController) Parse(value string) (time.Time, error) {
 		return time.ParseInLocation(segHourFormat, value, time.Local)
 	case DAY:
 		return time.ParseInLocation(segDayFormat, value, time.Local)
-	case MILLISECOND:
-		return time.ParseInLocation(millisecondFormat, value, time.Local)
 	}
 	panic("invalid interval unit")
 }
@@ -339,7 +336,7 @@ func (sc *segmentController) open() error {
 		return err
 	}
 	if sc.Current() == nil {
-		_, err = sc.create(sc.Format(time.Now()))
+		_, err = sc.create(sc.Format(sc.clock.Now()))
 		if err != nil {
 			return err
 		}
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 647970e..9a3005f 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -19,9 +19,6 @@ package tsdb_test
 
 import (
 	"context"
-	"errors"
-	"os"
-	"path"
 	"time"
 
 	. "github.com/onsi/ginkgo/v2"
@@ -30,150 +27,359 @@ import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-var defaultEventallyTimeout = time.Minute
-
 var _ = Describe("Shard", func() {
 	Describe("Generate segments and blocks", func() {
 		var tmp string
 		var deferFn func()
 		var shard tsdb.Shard
+		var clock timestamp.MockClock
 
 		BeforeEach(func() {
 			var err error
 			tmp, deferFn, err = test.NewSpace()
 			Expect(err).NotTo(HaveOccurred())
+			clock = timestamp.NewMockClock()
+			Expect(err).NotTo(HaveOccurred())
 		})
 		AfterEach(func() {
 			shard.Close()
 			deferFn()
 		})
 		It("generates several segments and blocks", func() {
+			By("open 4 blocks")
 			var err error
-			shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp,
-				tsdb.IntervalRule{
-					Unit: tsdb.MILLISECOND,
-					Num:  3000,
-				},
-				tsdb.IntervalRule{
-					Unit: tsdb.MILLISECOND,
-					Num:  1000,
-				},
-				1<<4,
-			)
-			Expect(err).NotTo(HaveOccurred())
-			segDirectories := make([]string, 3)
-			Eventually(func() int {
-				num := 0
-				err := tsdb.WalkDir(tmp+"/shard-0", "seg-", func(suffix, absolutePath string) error {
-					if num < 3 {
-						segDirectories[num] = absolutePath
-					}
-					num++
-					return nil
-				})
-				Expect(err).NotTo(HaveOccurred())
-				return num
-			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 3))
-			for _, d := range segDirectories {
-				Eventually(func() int {
-					num := 0
-					err := tsdb.WalkDir(d, "block-", func(suffix, absolutePath string) error {
-						num++
-						return nil
-					})
-					Expect(err).NotTo(HaveOccurred())
-					return num
-				}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 3))
-			}
-		})
-		It("closes blocks", func() {
-			var err error
-			shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp,
+			shard, err = tsdb.OpenShard(timestamp.SetClock(context.Background(), clock), common.ShardID(0), tmp,
 				tsdb.IntervalRule{
 					Unit: tsdb.DAY,
 					Num:  1,
 				},
 				tsdb.IntervalRule{
-					Unit: tsdb.MILLISECOND,
-					Num:  1000,
+					Unit: tsdb.HOUR,
+					Num:  12,
 				},
 				2,
 			)
 			Expect(err).NotTo(HaveOccurred())
-			var segDirectory string
-			Eventually(func() int {
-				num := 0
-				errInternal := tsdb.WalkDir(tmp+"/shard-0", "seg-", func(suffix, absolutePath string) error {
-					if num < 1 {
-						segDirectory = absolutePath
-					}
-					num++
-					return nil
-				})
-				Expect(errInternal).NotTo(HaveOccurred())
-				return num
-			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
-			Eventually(func() int {
-				num := 0
-				errInternal := tsdb.WalkDir(segDirectory, "block-", func(suffix, absolutePath string) error {
-					if _, err := os.Stat(path.Join(absolutePath, "store", "LOCK")); errors.Is(err, os.ErrNotExist) {
-						num++
-					}
-					return nil
-				})
-				Expect(errInternal).NotTo(HaveOccurred())
-				return num
-			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
-		})
-		It("reopens closed blocks", func() {
-			var err error
-			shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp,
-				tsdb.IntervalRule{
-					Unit: tsdb.MILLISECOND,
-					Num:  3000,
+			By("1st block is opened")
+			t1 := clock.Now()
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
 				},
-				tsdb.IntervalRule{
-					Unit: tsdb.MILLISECOND,
-					Num:  1000,
+			}))
+			By("2nd block is opened")
+			// 01 10:00
+			clock.Add(10 * time.Hour)
+			t2 := clock.Now().Add(2 * time.Hour)
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
 				},
-				2,
-			)
-			Expect(err).NotTo(HaveOccurred())
-			Eventually(func() int {
-				num := 0
-				for _, bs := range shard.State().OpenedBlocks {
-					if !bs.Closed {
-						num++
-					}
-				}
-				return num
-			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 2))
-			var closedBlocks []tsdb.BlockState
-			Eventually(func() int {
-				closedBlocks = nil
-				for _, ob := range shard.State().OpenedBlocks {
-					if ob.Closed {
-						closedBlocks = append(closedBlocks, ob)
-					}
-				}
-				return len(closedBlocks)
-			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+				},
+			}))
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{}))
+			By("moves to the 2nd block")
+			// 01 13:00
+			clock.Add(3 * time.Hour)
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+			}))
+			By("3rd block is opened")
+			// 01 22:00
+			clock.Add(9 * time.Hour)
+			t3 := clock.Now().Add(2 * time.Hour)
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+				},
+			}))
+			By("moves to 3rd block")
+			// 02 01:00
+			clock.Add(3 * time.Hour)
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
+			}))
+			By("4th block is opened")
+			// 02 10:00
+			clock.Add(9 * time.Hour)
+			t4 := clock.Now().Add(2 * time.Hour)
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+				},
+			}))
+			By("moves to 4th block")
+			// 02 13:00
+			clock.Add(3 * time.Hour)
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+			}))
+			By("5th block is opened")
+			// 02 22:00
+			clock.Add(9 * time.Hour)
+			t5 := clock.Now().Add(2 * time.Hour)
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
+				},
+			}))
+			By("close 1st block by adding 5th block")
+			// 03 01:00
+			clock.Add(3 * time.Hour)
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
+			}))
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+					Closed:    true,
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
+				},
+			}))
+			By("reopen 1st block")
 			series, err := shard.Series().GetByID(common.SeriesID(11))
 			Expect(err).NotTo(HaveOccurred())
-			writeFn := func(bs tsdb.BlockState) {
-				span, err := series.Span(bs.TimeRange)
-				Expect(err).NotTo(HaveOccurred())
-				defer span.Close()
-				writer, err := span.WriterBuilder().Family([]byte("test"), []byte("test")).Time(bs.TimeRange.Start).Build()
-				Expect(err).NotTo(HaveOccurred())
-				_, err = writer.Write()
-				Expect(err).NotTo(HaveOccurred())
-			}
-			for _, bs := range closedBlocks {
-				writeFn(bs)
-			}
+			t1Range := timestamp.NewInclusiveTimeRangeDuration(t1, 1*time.Hour)
+			span, err := series.Span(t1Range)
+			Expect(err).NotTo(HaveOccurred())
+			defer span.Close()
+			writer, err := span.WriterBuilder().Family([]byte("test"), []byte("test")).Time(t1Range.End).Build()
+			Expect(err).NotTo(HaveOccurred())
+			_, err = writer.Write()
+			Expect(err).NotTo(HaveOccurred())
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+			}))
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+					Closed:    true,
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
+				},
+			}))
 		})
 	})
 })
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index cf40414..8537752 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -49,11 +49,10 @@ const (
 	blockTemplate       = rootPrefix + blockPathPrefix + "-%s"
 	globalIndexTemplate = rootPrefix + "index"
 
-	segHourFormat     = "2006010215"
-	segDayFormat      = "20060102"
-	millisecondFormat = "20060102150405000"
-	blockHourFormat   = "15"
-	blockDayFormat    = "0102"
+	segHourFormat   = "2006010215"
+	segDayFormat    = "20060102"
+	blockHourFormat = "15"
+	blockDayFormat  = "0102"
 
 	dirPerm = 0700
 )
@@ -104,13 +103,18 @@ type BlockID struct {
 	BlockID uint16
 }
 
+func GenerateInternalID(unit IntervalUnit, suffix int) uint16 {
+	return uint16(unit)<<12 | ((uint16(suffix) << 4) >> 4)
+}
+
 type BlockState struct {
 	ID        BlockID
 	TimeRange timestamp.TimeRange
 	Closed    bool
 }
 type ShardState struct {
-	OpenedBlocks []BlockState
+	Blocks     []BlockState
+	OpenBlocks []BlockID
 }
 
 type database struct {
@@ -154,14 +158,14 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 		return nil, err
 	}
 	segmentSize := opts.SegmentSize
-	if segmentSize.Unit == MILLISECOND {
+	if segmentSize.Num == 0 {
 		segmentSize = IntervalRule{
 			Unit: DAY,
 			Num:  1,
 		}
 	}
 	blockSize := opts.BlockSize
-	if blockSize.Unit == MILLISECOND {
+	if blockSize.Num == 0 {
 		blockSize = IntervalRule{
 			Unit: HOUR,
 			Num:  2,
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index c761503..07ab946 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -34,6 +34,6 @@ func TestTsdb(t *testing.T) {
 var _ = BeforeSuite(func() {
 	Expect(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "info",
+		Level: "debug",
 	})).Should(Succeed())
 })
diff --git a/go.mod b/go.mod
index e56cf81..dead45b 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.17
 
 require (
 	github.com/RoaringBitmap/roaring v0.9.1
+	github.com/benbjohnson/clock v1.3.0
 	github.com/cespare/xxhash v1.1.0
 	github.com/dgraph-io/badger/v3 v3.2011.1
 	github.com/dgraph-io/ristretto v0.1.0
@@ -107,4 +108,7 @@ require (
 	sigs.k8s.io/yaml v1.2.0 // indirect
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4
+replace (
+	github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8
+	github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4
+)
diff --git a/go.sum b/go.sum
index 5057b2a..4a5d9d8 100644
--- a/go.sum
+++ b/go.sum
@@ -49,6 +49,8 @@ github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bih
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
 github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4 h1:iLwRXI6WHBMb2VkWrlYKIFngPKwgs2OnjliXdMB5DY0=
 github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4/go.mod h1:Q0luV7nB94o3Bl4hYqAPy03+QTtLxs9pWdUEQb0i0K0=
+github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8 h1:TK3KN7H7ROhT0/sfY8JWWa5xHOo5jUqW7Stc/VxNJyA=
+github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -59,7 +61,6 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC
 github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
 github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
-github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
 github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
diff --git a/pkg/timestamp/clock.go b/pkg/timestamp/clock.go
new file mode 100644
index 0000000..feba400
--- /dev/null
+++ b/pkg/timestamp/clock.go
@@ -0,0 +1,67 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+	"context"
+	"time"
+
+	"github.com/benbjohnson/clock"
+)
+
+// Clock represents an interface contains all functions in the standard library time.
+type Clock interface {
+	clock.Clock
+}
+
+type MockClock interface {
+	clock.Clock
+	// Add moves the current time of the mock clock forward by the specified duration.
+	Add(d time.Duration)
+}
+
+// NewClock returns an instance of a real-time clock.
+func NewClock() Clock {
+	return clock.New()
+}
+
+// NewMockClock returns an instance of a mock clock.
+func NewMockClock() MockClock {
+	return clock.NewMock()
+}
+
+var clockKey = contextClockKey{}
+
+type contextClockKey struct{}
+
+// GetClock returns a Clock from the context. If the context doesn't contains
+// a Clock, a real-time one will be created and returned with a child context
+// which contains the new one.
+func GetClock(ctx context.Context) (Clock, context.Context) {
+	c := ctx.Value(clockKey)
+	if c == nil {
+		realClock := NewClock()
+		return realClock, SetClock(ctx, realClock)
+	}
+	return c.(Clock), ctx
+}
+
+// SetContext returns a sub context with the passed Clock
+func SetClock(ctx context.Context, clock Clock) context.Context {
+	return context.WithValue(ctx, clockKey, clock)
+}