You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/18 11:03:33 UTC

[skywalking-banyandb] 01/01: Introduce the clock mock to test block controller

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch patch-metadata
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 25ed40db6b82c4f66b1020ad879eae84e1fcc5ff
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Apr 18 10:48:23 2022 +0000

    Introduce the clock mock to test block controller
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/tsdb/block.go           |   5 +-
 banyand/tsdb/bucket/bucket.go   |   8 +-
 banyand/tsdb/bucket/queue.go    |  11 +
 banyand/tsdb/bucket/strategy.go |   1 +
 banyand/tsdb/segment.go         |  40 ++--
 banyand/tsdb/shard.go           |  33 ++-
 banyand/tsdb/shard_test.go      | 436 +++++++++++++++++++++++++++++-----------
 banyand/tsdb/tsdb.go            |  20 +-
 banyand/tsdb/tsdb_suite_test.go |   2 +-
 go.mod                          |   6 +-
 go.sum                          |   3 +-
 pkg/timestamp/clock.go          |  67 ++++++
 12 files changed, 465 insertions(+), 167 deletions(-)

diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 88d88c2..e5af100 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -82,7 +82,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	if err != nil {
 		return nil, err
 	}
-	id := uint16(opts.blockSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
+	id := GenerateInternalID(opts.blockSize.Unit, suffixInteger)
 	timeRange := timestamp.NewTimeRange(opts.startTime, opts.blockSize.NextTime(opts.startTime), true, false)
 	encodingMethodObject := ctx.Value(encodingMethodKey)
 	if encodingMethodObject == nil {
@@ -91,13 +91,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 			DecoderPool: encoding.NewPlainDecoderPool(0),
 		}
 	}
+	clock, _ := timestamp.GetClock(ctx)
 	b = &block{
 		segID:          opts.segID,
 		blockID:        id,
 		path:           opts.path,
 		l:              logger.Fetch(ctx, "block"),
 		TimeRange:      timeRange,
-		Reporter:       bucket.NewTimeBasedReporter(timeRange),
+		Reporter:       bucket.NewTimeBasedReporter(timeRange, clock),
 		closed:         atomic.NewBool(true),
 		encodingMethod: encodingMethodObject.(EncodingMethod),
 	}
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index dc1b77f..615293b 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -45,12 +45,14 @@ type Reporter interface {
 type timeBasedReporter struct {
 	timestamp.TimeRange
 	reporterStopCh chan struct{}
+	clock          timestamp.Clock
 }
 
-func NewTimeBasedReporter(timeRange timestamp.TimeRange) Reporter {
+func NewTimeBasedReporter(timeRange timestamp.TimeRange, clock timestamp.Clock) Reporter {
 	return &timeBasedReporter{
 		TimeRange:      timeRange,
 		reporterStopCh: make(chan struct{}),
+		clock:          clock,
 	}
 }
 
@@ -62,14 +64,14 @@ func (tr *timeBasedReporter) Report() Channel {
 	}
 	go func() {
 		defer close(ch)
-		ticker := time.NewTicker(interval)
+		ticker := tr.clock.Ticker(interval)
 		defer ticker.Stop()
 		for {
 			select {
 			case <-ticker.C:
 				status := Status{
 					Capacity: int(tr.End.UnixNano() - tr.Start.UnixNano()),
-					Volume:   int(time.Now().UnixNano() - tr.Start.UnixNano()),
+					Volume:   int(tr.clock.Now().UnixNano() - tr.Start.UnixNano()),
 				}
 				ch <- status
 				if status.Volume >= status.Capacity {
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index e26109d..85b73fa 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -29,6 +29,7 @@ type EvictFn func(id interface{})
 type Queue interface {
 	Push(id interface{})
 	Len() int
+	All() []interface{}
 }
 
 const (
@@ -114,6 +115,16 @@ func (q *lruQueue) Len() int {
 	return q.recent.Len() + q.frequent.Len()
 }
 
+func (q *lruQueue) All() []interface{} {
+	q.lock.RLock()
+	defer q.lock.RUnlock()
+	all := make([]interface{}, q.recent.Len()+q.frequent.Len()+q.recentEvict.Len())
+	copy(all, q.recent.Keys())
+	copy(all[q.recent.Len():], q.frequent.Keys())
+	copy(all[q.recent.Len()+q.frequent.Len():], q.recentEvict.Keys())
+	return all
+}
+
 func (q *lruQueue) ensureSpace(recentEvict bool) {
 	recentLen := q.recent.Len()
 	freqLen := q.frequent.Len()
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 581ebd8..e31cb74 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -124,5 +124,6 @@ func (s *Strategy) Run() {
 }
 
 func (s *Strategy) Close() {
+	s.ctrl.OnMove(s.current, nil)
 	close(s.stopCh)
 }
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index bf50669..42756c5 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -47,16 +47,17 @@ type segment struct {
 	blockManageStrategy *bucket.Strategy
 }
 
-func openSegment(ctx context.Context, startTime time.Time, path, suffix string, segmentSize, blockSize IntervalRule, blockQueue bucket.Queue) (s *segment, err error) {
+func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
+	segmentSize, blockSize IntervalRule, blockQueue bucket.Queue) (s *segment, err error) {
 	suffixInteger, err := strconv.Atoi(suffix)
 	if err != nil {
 		return nil, err
 	}
-	// TODO: fix id overflow
-	id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
+	id := GenerateInternalID(segmentSize.Unit, suffixInteger)
 	timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false)
 	l := logger.Fetch(ctx, "segment")
 	segCtx := context.WithValue(ctx, logger.ContextKey, l)
+	clock, segCtx := timestamp.GetClock(segCtx)
 	s = &segment{
 		id:              id,
 		path:            path,
@@ -64,7 +65,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 		l:               l,
 		blockController: newBlockController(segCtx, id, path, timeRange, blockSize, l, blockQueue),
 		TimeRange:       timeRange,
-		Reporter:        bucket.NewTimeBasedReporter(timeRange),
+		Reporter:        bucket.NewTimeBasedReporter(timeRange, clock),
 	}
 	err = s.blockController.open()
 	if err != nil {
@@ -87,7 +88,6 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 }
 
 func (s *segment) close() {
-	s.blockManageStrategy.Close()
 	s.blockController.close()
 	s.globalIndex.Close()
 	s.Stop()
@@ -114,12 +114,14 @@ type blockController struct {
 	blockSize    IntervalRule
 	lst          []*block
 	blockQueue   bucket.Queue
+	clock        timestamp.Clock
 
 	l *logger.Logger
 }
 
 func newBlockController(segCtx context.Context, segID uint16, location string, segTimeRange timestamp.TimeRange,
 	blockSize IntervalRule, l *logger.Logger, blockQueue bucket.Queue) *blockController {
+	clock, _ := timestamp.GetClock(segCtx)
 	return &blockController{
 		segCtx:       segCtx,
 		segID:        segID,
@@ -128,13 +130,14 @@ func newBlockController(segCtx context.Context, segID uint16, location string, s
 		segTimeRange: segTimeRange,
 		blockQueue:   blockQueue,
 		l:            l,
+		clock:        clock,
 	}
 }
 
 func (bc *blockController) Current() bucket.Reporter {
 	bc.RLock()
 	defer bc.RUnlock()
-	now := time.Now()
+	now := bc.clock.Now()
 	for _, s := range bc.lst {
 		if s.suffix == bc.Format(now) {
 			return s
@@ -162,11 +165,18 @@ func (bc *blockController) Next() (bucket.Reporter, error) {
 }
 
 func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
-	bc.l.Info().Stringer("prev", prev).Stringer("next", next).Msg("move to the next block")
-	bc.blockQueue.Push(BlockID{
-		SegID:   bc.segID,
-		BlockID: prev.(*block).blockID,
-	})
+	event := bc.l.Info()
+	if prev != nil {
+		event.Stringer("prev", prev)
+		bc.blockQueue.Push(BlockID{
+			SegID:   bc.segID,
+			BlockID: prev.(*block).blockID,
+		})
+	}
+	if next != nil {
+		event.Stringer("next", next)
+	}
+	event.Msg("move to the next block")
 }
 
 func (bc *blockController) Format(tm time.Time) string {
@@ -175,8 +185,6 @@ func (bc *blockController) Format(tm time.Time) string {
 		return tm.Format(blockHourFormat)
 	case DAY:
 		return tm.Format(blockDayFormat)
-	case MILLISECOND:
-		return tm.Format(millisecondFormat)
 	}
 	panic("invalid interval unit")
 }
@@ -187,8 +195,6 @@ func (bc *blockController) Parse(value string) (time.Time, error) {
 		return time.Parse(blockHourFormat, value)
 	case DAY:
 		return time.Parse(blockDayFormat, value)
-	case MILLISECOND:
-		return time.Parse(millisecondFormat, value)
 	}
 	panic("invalid interval unit")
 }
@@ -285,8 +291,6 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) {
 	case DAY:
 		return time.Date(startTime.Year(), startTime.Month(),
 			t.Day(), t.Hour(), 0, 0, 0, startTime.Location()), nil
-	case MILLISECOND:
-		return time.ParseInLocation(millisecondFormat, suffix, startTime.Location())
 	}
 	panic("invalid interval unit")
 }
@@ -303,7 +307,7 @@ func (bc *blockController) open() error {
 		return err
 	}
 	if bc.Current() == nil {
-		b, err := bc.create(time.Now())
+		b, err := bc.create(bc.clock.Now())
 		if err != nil {
 			return err
 		}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 3299e57..b3d3ca6 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -119,7 +119,7 @@ func (s *shard) Index() IndexDatabase {
 func (s *shard) State() (shardState ShardState) {
 	for _, seg := range s.segmentController.segments() {
 		for _, b := range seg.blockController.blocks() {
-			shardState.OpenedBlocks = append(shardState.OpenedBlocks, BlockState{
+			shardState.Blocks = append(shardState.Blocks, BlockState{
 				ID: BlockID{
 					SegID:   b.segID,
 					BlockID: b.blockID,
@@ -129,6 +129,11 @@ func (s *shard) State() (shardState ShardState) {
 			})
 		}
 	}
+	all := s.segmentController.blockQueue.All()
+	shardState.OpenBlocks = make([]BlockID, len(all))
+	for i, v := range s.segmentController.blockQueue.All() {
+		shardState.OpenBlocks[i] = v.(BlockID)
+	}
 	return shardState
 }
 
@@ -143,8 +148,7 @@ func (s *shard) Close() error {
 type IntervalUnit int
 
 const (
-	MILLISECOND IntervalUnit = iota // only for testing
-	HOUR
+	HOUR IntervalUnit = iota
 	DAY
 )
 
@@ -154,9 +158,6 @@ func (iu IntervalUnit) String() string {
 		return "hour"
 	case DAY:
 		return "day"
-	case MILLISECOND:
-		return "millis"
-
 	}
 	panic("invalid interval unit")
 }
@@ -172,8 +173,6 @@ func (ir IntervalRule) NextTime(current time.Time) time.Time {
 		return current.Add(time.Hour * time.Duration(ir.Num))
 	case DAY:
 		return current.AddDate(0, 0, ir.Num)
-	case MILLISECOND:
-		return current.Add(time.Millisecond * time.Duration(ir.Num))
 	}
 	panic("invalid interval unit")
 }
@@ -184,8 +183,6 @@ func (ir IntervalRule) EstimatedDuration() time.Duration {
 		return time.Hour * time.Duration(ir.Num)
 	case DAY:
 		return 24 * time.Hour * time.Duration(ir.Num)
-	case MILLISECOND:
-		return time.Microsecond * time.Duration(ir.Num)
 	}
 	panic("invalid interval unit")
 }
@@ -198,18 +195,21 @@ type segmentController struct {
 	blockSize   IntervalRule
 	lst         []*segment
 	blockQueue  bucket.Queue
+	clock       timestamp.Clock
 
 	l *logger.Logger
 }
 
-func newSegmentController(shardCtx context.Context, location string,
-	segmentSize, blockSize IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController, error) {
+func newSegmentController(shardCtx context.Context, location string, segmentSize, blockSize IntervalRule,
+	openedBlockSize int, l *logger.Logger) (*segmentController, error) {
+	clock, _ := timestamp.GetClock(shardCtx)
 	sc := &segmentController{
 		shardCtx:    shardCtx,
 		location:    location,
 		segmentSize: segmentSize,
 		blockSize:   blockSize,
 		l:           l,
+		clock:       clock,
 	}
 	var err error
 	sc.blockQueue, err = bucket.NewQueue(openedBlockSize, func(id interface{}) {
@@ -266,7 +266,7 @@ func (sc *segmentController) segments() (ss []*segment) {
 func (sc *segmentController) Current() bucket.Reporter {
 	sc.RLock()
 	defer sc.RUnlock()
-	now := time.Now()
+	now := sc.clock.Now()
 	for _, s := range sc.lst {
 		if s.suffix == sc.Format(now) {
 			return s
@@ -293,6 +293,7 @@ func (sc *segmentController) OnMove(prev bucket.Reporter, next bucket.Reporter)
 	event := sc.l.Info()
 	if prev != nil {
 		event.Stringer("prev", prev)
+		prev.(*segment).blockManageStrategy.Close()
 	}
 	if next != nil {
 		event.Stringer("next", next)
@@ -306,8 +307,6 @@ func (sc *segmentController) Format(tm time.Time) string {
 		return tm.Format(segHourFormat)
 	case DAY:
 		return tm.Format(segDayFormat)
-	case MILLISECOND:
-		return tm.Format(millisecondFormat)
 	}
 	panic("invalid interval unit")
 }
@@ -318,8 +317,6 @@ func (sc *segmentController) Parse(value string) (time.Time, error) {
 		return time.ParseInLocation(segHourFormat, value, time.Local)
 	case DAY:
 		return time.ParseInLocation(segDayFormat, value, time.Local)
-	case MILLISECOND:
-		return time.ParseInLocation(millisecondFormat, value, time.Local)
 	}
 	panic("invalid interval unit")
 }
@@ -339,7 +336,7 @@ func (sc *segmentController) open() error {
 		return err
 	}
 	if sc.Current() == nil {
-		_, err = sc.create(sc.Format(time.Now()))
+		_, err = sc.create(sc.Format(sc.clock.Now()))
 		if err != nil {
 			return err
 		}
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 647970e..9a3005f 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -19,9 +19,6 @@ package tsdb_test
 
 import (
 	"context"
-	"errors"
-	"os"
-	"path"
 	"time"
 
 	. "github.com/onsi/ginkgo/v2"
@@ -30,150 +27,359 @@ import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-var defaultEventallyTimeout = time.Minute
-
 var _ = Describe("Shard", func() {
 	Describe("Generate segments and blocks", func() {
 		var tmp string
 		var deferFn func()
 		var shard tsdb.Shard
+		var clock timestamp.MockClock
 
 		BeforeEach(func() {
 			var err error
 			tmp, deferFn, err = test.NewSpace()
 			Expect(err).NotTo(HaveOccurred())
+			clock = timestamp.NewMockClock()
+			Expect(err).NotTo(HaveOccurred())
 		})
 		AfterEach(func() {
 			shard.Close()
 			deferFn()
 		})
 		It("generates several segments and blocks", func() {
+			By("open 4 blocks")
 			var err error
-			shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp,
-				tsdb.IntervalRule{
-					Unit: tsdb.MILLISECOND,
-					Num:  3000,
-				},
-				tsdb.IntervalRule{
-					Unit: tsdb.MILLISECOND,
-					Num:  1000,
-				},
-				1<<4,
-			)
-			Expect(err).NotTo(HaveOccurred())
-			segDirectories := make([]string, 3)
-			Eventually(func() int {
-				num := 0
-				err := tsdb.WalkDir(tmp+"/shard-0", "seg-", func(suffix, absolutePath string) error {
-					if num < 3 {
-						segDirectories[num] = absolutePath
-					}
-					num++
-					return nil
-				})
-				Expect(err).NotTo(HaveOccurred())
-				return num
-			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 3))
-			for _, d := range segDirectories {
-				Eventually(func() int {
-					num := 0
-					err := tsdb.WalkDir(d, "block-", func(suffix, absolutePath string) error {
-						num++
-						return nil
-					})
-					Expect(err).NotTo(HaveOccurred())
-					return num
-				}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 3))
-			}
-		})
-		It("closes blocks", func() {
-			var err error
-			shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp,
+			shard, err = tsdb.OpenShard(timestamp.SetClock(context.Background(), clock), common.ShardID(0), tmp,
 				tsdb.IntervalRule{
 					Unit: tsdb.DAY,
 					Num:  1,
 				},
 				tsdb.IntervalRule{
-					Unit: tsdb.MILLISECOND,
-					Num:  1000,
+					Unit: tsdb.HOUR,
+					Num:  12,
 				},
 				2,
 			)
 			Expect(err).NotTo(HaveOccurred())
-			var segDirectory string
-			Eventually(func() int {
-				num := 0
-				errInternal := tsdb.WalkDir(tmp+"/shard-0", "seg-", func(suffix, absolutePath string) error {
-					if num < 1 {
-						segDirectory = absolutePath
-					}
-					num++
-					return nil
-				})
-				Expect(errInternal).NotTo(HaveOccurred())
-				return num
-			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
-			Eventually(func() int {
-				num := 0
-				errInternal := tsdb.WalkDir(segDirectory, "block-", func(suffix, absolutePath string) error {
-					if _, err := os.Stat(path.Join(absolutePath, "store", "LOCK")); errors.Is(err, os.ErrNotExist) {
-						num++
-					}
-					return nil
-				})
-				Expect(errInternal).NotTo(HaveOccurred())
-				return num
-			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
-		})
-		It("reopens closed blocks", func() {
-			var err error
-			shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp,
-				tsdb.IntervalRule{
-					Unit: tsdb.MILLISECOND,
-					Num:  3000,
+			By("1st block is opened")
+			t1 := clock.Now()
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
 				},
-				tsdb.IntervalRule{
-					Unit: tsdb.MILLISECOND,
-					Num:  1000,
+			}))
+			By("2nd block is opened")
+			// 01 10:00
+			clock.Add(10 * time.Hour)
+			t2 := clock.Now().Add(2 * time.Hour)
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
 				},
-				2,
-			)
-			Expect(err).NotTo(HaveOccurred())
-			Eventually(func() int {
-				num := 0
-				for _, bs := range shard.State().OpenedBlocks {
-					if !bs.Closed {
-						num++
-					}
-				}
-				return num
-			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 2))
-			var closedBlocks []tsdb.BlockState
-			Eventually(func() int {
-				closedBlocks = nil
-				for _, ob := range shard.State().OpenedBlocks {
-					if ob.Closed {
-						closedBlocks = append(closedBlocks, ob)
-					}
-				}
-				return len(closedBlocks)
-			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+				},
+			}))
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{}))
+			By("moves to the 2nd block")
+			// 01 13:00
+			clock.Add(3 * time.Hour)
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+			}))
+			By("3rd block is opened")
+			// 01 22:00
+			clock.Add(9 * time.Hour)
+			t3 := clock.Now().Add(2 * time.Hour)
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+				},
+			}))
+			By("moves to 3rd block")
+			// 02 01:00
+			clock.Add(3 * time.Hour)
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
+			}))
+			By("4th block is opened")
+			// 02 10:00
+			clock.Add(9 * time.Hour)
+			t4 := clock.Now().Add(2 * time.Hour)
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+				},
+			}))
+			By("moves to 4th block")
+			// 02 13:00
+			clock.Add(3 * time.Hour)
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+			}))
+			By("5th block is opened")
+			// 02 22:00
+			clock.Add(9 * time.Hour)
+			t5 := clock.Now().Add(2 * time.Hour)
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
+				},
+			}))
+			By("close 1st block by adding 5th block")
+			// 03 01:00
+			clock.Add(3 * time.Hour)
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
+			}))
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+					Closed:    true,
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
+				},
+			}))
+			By("reopen 1st block")
 			series, err := shard.Series().GetByID(common.SeriesID(11))
 			Expect(err).NotTo(HaveOccurred())
-			writeFn := func(bs tsdb.BlockState) {
-				span, err := series.Span(bs.TimeRange)
-				Expect(err).NotTo(HaveOccurred())
-				defer span.Close()
-				writer, err := span.WriterBuilder().Family([]byte("test"), []byte("test")).Time(bs.TimeRange.Start).Build()
-				Expect(err).NotTo(HaveOccurred())
-				_, err = writer.Write()
-				Expect(err).NotTo(HaveOccurred())
-			}
-			for _, bs := range closedBlocks {
-				writeFn(bs)
-			}
+			t1Range := timestamp.NewInclusiveTimeRangeDuration(t1, 1*time.Hour)
+			span, err := series.Span(t1Range)
+			Expect(err).NotTo(HaveOccurred())
+			defer span.Close()
+			writer, err := span.WriterBuilder().Family([]byte("test"), []byte("test")).Time(t1Range.End).Build()
+			Expect(err).NotTo(HaveOccurred())
+			_, err = writer.Write()
+			Expect(err).NotTo(HaveOccurred())
+			Eventually(func() []tsdb.BlockID {
+				return shard.State().OpenBlocks
+			}).Should(Equal([]tsdb.BlockID{
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+				{
+					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+				},
+			}))
+			Eventually(func() []tsdb.BlockState {
+				return shard.State().Blocks
+			}).Should(Equal([]tsdb.BlockState{
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+					Closed:    true,
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+				},
+				{
+					ID: tsdb.BlockID{
+						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+						BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+					},
+					TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
+				},
+			}))
 		})
 	})
 })
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index cf40414..8537752 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -49,11 +49,10 @@ const (
 	blockTemplate       = rootPrefix + blockPathPrefix + "-%s"
 	globalIndexTemplate = rootPrefix + "index"
 
-	segHourFormat     = "2006010215"
-	segDayFormat      = "20060102"
-	millisecondFormat = "20060102150405000"
-	blockHourFormat   = "15"
-	blockDayFormat    = "0102"
+	segHourFormat   = "2006010215"
+	segDayFormat    = "20060102"
+	blockHourFormat = "15"
+	blockDayFormat  = "0102"
 
 	dirPerm = 0700
 )
@@ -104,13 +103,18 @@ type BlockID struct {
 	BlockID uint16
 }
 
+func GenerateInternalID(unit IntervalUnit, suffix int) uint16 {
+	return uint16(unit)<<12 | ((uint16(suffix) << 4) >> 4)
+}
+
 type BlockState struct {
 	ID        BlockID
 	TimeRange timestamp.TimeRange
 	Closed    bool
 }
 type ShardState struct {
-	OpenedBlocks []BlockState
+	Blocks     []BlockState
+	OpenBlocks []BlockID
 }
 
 type database struct {
@@ -154,14 +158,14 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 		return nil, err
 	}
 	segmentSize := opts.SegmentSize
-	if segmentSize.Unit == MILLISECOND {
+	if segmentSize.Num == 0 {
 		segmentSize = IntervalRule{
 			Unit: DAY,
 			Num:  1,
 		}
 	}
 	blockSize := opts.BlockSize
-	if blockSize.Unit == MILLISECOND {
+	if blockSize.Num == 0 {
 		blockSize = IntervalRule{
 			Unit: HOUR,
 			Num:  2,
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index c761503..07ab946 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -34,6 +34,6 @@ func TestTsdb(t *testing.T) {
 var _ = BeforeSuite(func() {
 	Expect(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "info",
+		Level: "debug",
 	})).Should(Succeed())
 })
diff --git a/go.mod b/go.mod
index e56cf81..dead45b 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.17
 
 require (
 	github.com/RoaringBitmap/roaring v0.9.1
+	github.com/benbjohnson/clock v1.3.0
 	github.com/cespare/xxhash v1.1.0
 	github.com/dgraph-io/badger/v3 v3.2011.1
 	github.com/dgraph-io/ristretto v0.1.0
@@ -107,4 +108,7 @@ require (
 	sigs.k8s.io/yaml v1.2.0 // indirect
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4
+replace (
+	github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8
+	github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4
+)
diff --git a/go.sum b/go.sum
index 5057b2a..4a5d9d8 100644
--- a/go.sum
+++ b/go.sum
@@ -49,6 +49,8 @@ github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bih
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
 github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4 h1:iLwRXI6WHBMb2VkWrlYKIFngPKwgs2OnjliXdMB5DY0=
 github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4/go.mod h1:Q0luV7nB94o3Bl4hYqAPy03+QTtLxs9pWdUEQb0i0K0=
+github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8 h1:TK3KN7H7ROhT0/sfY8JWWa5xHOo5jUqW7Stc/VxNJyA=
+github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -59,7 +61,6 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC
 github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
 github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
-github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
 github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
diff --git a/pkg/timestamp/clock.go b/pkg/timestamp/clock.go
new file mode 100644
index 0000000..feba400
--- /dev/null
+++ b/pkg/timestamp/clock.go
@@ -0,0 +1,67 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+	"context"
+	"time"
+
+	"github.com/benbjohnson/clock"
+)
+
+// Clock represents an interface contains all functions in the standard library time.
+type Clock interface {
+	clock.Clock
+}
+
+type MockClock interface {
+	clock.Clock
+	// Add moves the current time of the mock clock forward by the specified duration.
+	Add(d time.Duration)
+}
+
+// NewClock returns an instance of a real-time clock.
+func NewClock() Clock {
+	return clock.New()
+}
+
+// NewMockClock returns an instance of a mock clock.
+func NewMockClock() MockClock {
+	return clock.NewMock()
+}
+
+var clockKey = contextClockKey{}
+
+type contextClockKey struct{}
+
+// GetClock returns a Clock from the context. If the context doesn't contains
+// a Clock, a real-time one will be created and returned with a child context
+// which contains the new one.
+func GetClock(ctx context.Context) (Clock, context.Context) {
+	c := ctx.Value(clockKey)
+	if c == nil {
+		realClock := NewClock()
+		return realClock, SetClock(ctx, realClock)
+	}
+	return c.(Clock), ctx
+}
+
+// SetContext returns a sub context with the passed Clock
+func SetClock(ctx context.Context, clock Clock) context.Context {
+	return context.WithValue(ctx, clockKey, clock)
+}