You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/18 11:03:33 UTC
[skywalking-banyandb] 01/01: Introduce the clock mock to test block controller
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch patch-metadata
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 25ed40db6b82c4f66b1020ad879eae84e1fcc5ff
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Apr 18 10:48:23 2022 +0000
Introduce the clock mock to test block controller
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/tsdb/block.go | 5 +-
banyand/tsdb/bucket/bucket.go | 8 +-
banyand/tsdb/bucket/queue.go | 11 +
banyand/tsdb/bucket/strategy.go | 1 +
banyand/tsdb/segment.go | 40 ++--
banyand/tsdb/shard.go | 33 ++-
banyand/tsdb/shard_test.go | 436 +++++++++++++++++++++++++++++-----------
banyand/tsdb/tsdb.go | 20 +-
banyand/tsdb/tsdb_suite_test.go | 2 +-
go.mod | 6 +-
go.sum | 3 +-
pkg/timestamp/clock.go | 67 ++++++
12 files changed, 465 insertions(+), 167 deletions(-)
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 88d88c2..e5af100 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -82,7 +82,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
if err != nil {
return nil, err
}
- id := uint16(opts.blockSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
+ id := GenerateInternalID(opts.blockSize.Unit, suffixInteger)
timeRange := timestamp.NewTimeRange(opts.startTime, opts.blockSize.NextTime(opts.startTime), true, false)
encodingMethodObject := ctx.Value(encodingMethodKey)
if encodingMethodObject == nil {
@@ -91,13 +91,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
DecoderPool: encoding.NewPlainDecoderPool(0),
}
}
+ clock, _ := timestamp.GetClock(ctx)
b = &block{
segID: opts.segID,
blockID: id,
path: opts.path,
l: logger.Fetch(ctx, "block"),
TimeRange: timeRange,
- Reporter: bucket.NewTimeBasedReporter(timeRange),
+ Reporter: bucket.NewTimeBasedReporter(timeRange, clock),
closed: atomic.NewBool(true),
encodingMethod: encodingMethodObject.(EncodingMethod),
}
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index dc1b77f..615293b 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -45,12 +45,14 @@ type Reporter interface {
type timeBasedReporter struct {
timestamp.TimeRange
reporterStopCh chan struct{}
+ clock timestamp.Clock
}
-func NewTimeBasedReporter(timeRange timestamp.TimeRange) Reporter {
+func NewTimeBasedReporter(timeRange timestamp.TimeRange, clock timestamp.Clock) Reporter {
return &timeBasedReporter{
TimeRange: timeRange,
reporterStopCh: make(chan struct{}),
+ clock: clock,
}
}
@@ -62,14 +64,14 @@ func (tr *timeBasedReporter) Report() Channel {
}
go func() {
defer close(ch)
- ticker := time.NewTicker(interval)
+ ticker := tr.clock.Ticker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
status := Status{
Capacity: int(tr.End.UnixNano() - tr.Start.UnixNano()),
- Volume: int(time.Now().UnixNano() - tr.Start.UnixNano()),
+ Volume: int(tr.clock.Now().UnixNano() - tr.Start.UnixNano()),
}
ch <- status
if status.Volume >= status.Capacity {
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index e26109d..85b73fa 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -29,6 +29,7 @@ type EvictFn func(id interface{})
type Queue interface {
Push(id interface{})
Len() int
+ All() []interface{}
}
const (
@@ -114,6 +115,16 @@ func (q *lruQueue) Len() int {
return q.recent.Len() + q.frequent.Len()
}
+func (q *lruQueue) All() []interface{} {
+ q.lock.RLock()
+ defer q.lock.RUnlock()
+ all := make([]interface{}, q.recent.Len()+q.frequent.Len()+q.recentEvict.Len())
+ copy(all, q.recent.Keys())
+ copy(all[q.recent.Len():], q.frequent.Keys())
+ copy(all[q.recent.Len()+q.frequent.Len():], q.recentEvict.Keys())
+ return all
+}
+
func (q *lruQueue) ensureSpace(recentEvict bool) {
recentLen := q.recent.Len()
freqLen := q.frequent.Len()
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 581ebd8..e31cb74 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -124,5 +124,6 @@ func (s *Strategy) Run() {
}
func (s *Strategy) Close() {
+ s.ctrl.OnMove(s.current, nil)
close(s.stopCh)
}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index bf50669..42756c5 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -47,16 +47,17 @@ type segment struct {
blockManageStrategy *bucket.Strategy
}
-func openSegment(ctx context.Context, startTime time.Time, path, suffix string, segmentSize, blockSize IntervalRule, blockQueue bucket.Queue) (s *segment, err error) {
+func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
+ segmentSize, blockSize IntervalRule, blockQueue bucket.Queue) (s *segment, err error) {
suffixInteger, err := strconv.Atoi(suffix)
if err != nil {
return nil, err
}
- // TODO: fix id overflow
- id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
+ id := GenerateInternalID(segmentSize.Unit, suffixInteger)
timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false)
l := logger.Fetch(ctx, "segment")
segCtx := context.WithValue(ctx, logger.ContextKey, l)
+ clock, segCtx := timestamp.GetClock(segCtx)
s = &segment{
id: id,
path: path,
@@ -64,7 +65,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
l: l,
blockController: newBlockController(segCtx, id, path, timeRange, blockSize, l, blockQueue),
TimeRange: timeRange,
- Reporter: bucket.NewTimeBasedReporter(timeRange),
+ Reporter: bucket.NewTimeBasedReporter(timeRange, clock),
}
err = s.blockController.open()
if err != nil {
@@ -87,7 +88,6 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
}
func (s *segment) close() {
- s.blockManageStrategy.Close()
s.blockController.close()
s.globalIndex.Close()
s.Stop()
@@ -114,12 +114,14 @@ type blockController struct {
blockSize IntervalRule
lst []*block
blockQueue bucket.Queue
+ clock timestamp.Clock
l *logger.Logger
}
func newBlockController(segCtx context.Context, segID uint16, location string, segTimeRange timestamp.TimeRange,
blockSize IntervalRule, l *logger.Logger, blockQueue bucket.Queue) *blockController {
+ clock, _ := timestamp.GetClock(segCtx)
return &blockController{
segCtx: segCtx,
segID: segID,
@@ -128,13 +130,14 @@ func newBlockController(segCtx context.Context, segID uint16, location string, s
segTimeRange: segTimeRange,
blockQueue: blockQueue,
l: l,
+ clock: clock,
}
}
func (bc *blockController) Current() bucket.Reporter {
bc.RLock()
defer bc.RUnlock()
- now := time.Now()
+ now := bc.clock.Now()
for _, s := range bc.lst {
if s.suffix == bc.Format(now) {
return s
@@ -162,11 +165,18 @@ func (bc *blockController) Next() (bucket.Reporter, error) {
}
func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
- bc.l.Info().Stringer("prev", prev).Stringer("next", next).Msg("move to the next block")
- bc.blockQueue.Push(BlockID{
- SegID: bc.segID,
- BlockID: prev.(*block).blockID,
- })
+ event := bc.l.Info()
+ if prev != nil {
+ event.Stringer("prev", prev)
+ bc.blockQueue.Push(BlockID{
+ SegID: bc.segID,
+ BlockID: prev.(*block).blockID,
+ })
+ }
+ if next != nil {
+ event.Stringer("next", next)
+ }
+ event.Msg("move to the next block")
}
func (bc *blockController) Format(tm time.Time) string {
@@ -175,8 +185,6 @@ func (bc *blockController) Format(tm time.Time) string {
return tm.Format(blockHourFormat)
case DAY:
return tm.Format(blockDayFormat)
- case MILLISECOND:
- return tm.Format(millisecondFormat)
}
panic("invalid interval unit")
}
@@ -187,8 +195,6 @@ func (bc *blockController) Parse(value string) (time.Time, error) {
return time.Parse(blockHourFormat, value)
case DAY:
return time.Parse(blockDayFormat, value)
- case MILLISECOND:
- return time.Parse(millisecondFormat, value)
}
panic("invalid interval unit")
}
@@ -285,8 +291,6 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) {
case DAY:
return time.Date(startTime.Year(), startTime.Month(),
t.Day(), t.Hour(), 0, 0, 0, startTime.Location()), nil
- case MILLISECOND:
- return time.ParseInLocation(millisecondFormat, suffix, startTime.Location())
}
panic("invalid interval unit")
}
@@ -303,7 +307,7 @@ func (bc *blockController) open() error {
return err
}
if bc.Current() == nil {
- b, err := bc.create(time.Now())
+ b, err := bc.create(bc.clock.Now())
if err != nil {
return err
}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 3299e57..b3d3ca6 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -119,7 +119,7 @@ func (s *shard) Index() IndexDatabase {
func (s *shard) State() (shardState ShardState) {
for _, seg := range s.segmentController.segments() {
for _, b := range seg.blockController.blocks() {
- shardState.OpenedBlocks = append(shardState.OpenedBlocks, BlockState{
+ shardState.Blocks = append(shardState.Blocks, BlockState{
ID: BlockID{
SegID: b.segID,
BlockID: b.blockID,
@@ -129,6 +129,11 @@ func (s *shard) State() (shardState ShardState) {
})
}
}
+ all := s.segmentController.blockQueue.All()
+ shardState.OpenBlocks = make([]BlockID, len(all))
+ for i, v := range s.segmentController.blockQueue.All() {
+ shardState.OpenBlocks[i] = v.(BlockID)
+ }
return shardState
}
@@ -143,8 +148,7 @@ func (s *shard) Close() error {
type IntervalUnit int
const (
- MILLISECOND IntervalUnit = iota // only for testing
- HOUR
+ HOUR IntervalUnit = iota
DAY
)
@@ -154,9 +158,6 @@ func (iu IntervalUnit) String() string {
return "hour"
case DAY:
return "day"
- case MILLISECOND:
- return "millis"
-
}
panic("invalid interval unit")
}
@@ -172,8 +173,6 @@ func (ir IntervalRule) NextTime(current time.Time) time.Time {
return current.Add(time.Hour * time.Duration(ir.Num))
case DAY:
return current.AddDate(0, 0, ir.Num)
- case MILLISECOND:
- return current.Add(time.Millisecond * time.Duration(ir.Num))
}
panic("invalid interval unit")
}
@@ -184,8 +183,6 @@ func (ir IntervalRule) EstimatedDuration() time.Duration {
return time.Hour * time.Duration(ir.Num)
case DAY:
return 24 * time.Hour * time.Duration(ir.Num)
- case MILLISECOND:
- return time.Microsecond * time.Duration(ir.Num)
}
panic("invalid interval unit")
}
@@ -198,18 +195,21 @@ type segmentController struct {
blockSize IntervalRule
lst []*segment
blockQueue bucket.Queue
+ clock timestamp.Clock
l *logger.Logger
}
-func newSegmentController(shardCtx context.Context, location string,
- segmentSize, blockSize IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController, error) {
+func newSegmentController(shardCtx context.Context, location string, segmentSize, blockSize IntervalRule,
+ openedBlockSize int, l *logger.Logger) (*segmentController, error) {
+ clock, _ := timestamp.GetClock(shardCtx)
sc := &segmentController{
shardCtx: shardCtx,
location: location,
segmentSize: segmentSize,
blockSize: blockSize,
l: l,
+ clock: clock,
}
var err error
sc.blockQueue, err = bucket.NewQueue(openedBlockSize, func(id interface{}) {
@@ -266,7 +266,7 @@ func (sc *segmentController) segments() (ss []*segment) {
func (sc *segmentController) Current() bucket.Reporter {
sc.RLock()
defer sc.RUnlock()
- now := time.Now()
+ now := sc.clock.Now()
for _, s := range sc.lst {
if s.suffix == sc.Format(now) {
return s
@@ -293,6 +293,7 @@ func (sc *segmentController) OnMove(prev bucket.Reporter, next bucket.Reporter)
event := sc.l.Info()
if prev != nil {
event.Stringer("prev", prev)
+ prev.(*segment).blockManageStrategy.Close()
}
if next != nil {
event.Stringer("next", next)
@@ -306,8 +307,6 @@ func (sc *segmentController) Format(tm time.Time) string {
return tm.Format(segHourFormat)
case DAY:
return tm.Format(segDayFormat)
- case MILLISECOND:
- return tm.Format(millisecondFormat)
}
panic("invalid interval unit")
}
@@ -318,8 +317,6 @@ func (sc *segmentController) Parse(value string) (time.Time, error) {
return time.ParseInLocation(segHourFormat, value, time.Local)
case DAY:
return time.ParseInLocation(segDayFormat, value, time.Local)
- case MILLISECOND:
- return time.ParseInLocation(millisecondFormat, value, time.Local)
}
panic("invalid interval unit")
}
@@ -339,7 +336,7 @@ func (sc *segmentController) open() error {
return err
}
if sc.Current() == nil {
- _, err = sc.create(sc.Format(time.Now()))
+ _, err = sc.create(sc.Format(sc.clock.Now()))
if err != nil {
return err
}
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 647970e..9a3005f 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -19,9 +19,6 @@ package tsdb_test
import (
"context"
- "errors"
- "os"
- "path"
"time"
. "github.com/onsi/ginkgo/v2"
@@ -30,150 +27,359 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
-var defaultEventallyTimeout = time.Minute
-
var _ = Describe("Shard", func() {
Describe("Generate segments and blocks", func() {
var tmp string
var deferFn func()
var shard tsdb.Shard
+ var clock timestamp.MockClock
BeforeEach(func() {
var err error
tmp, deferFn, err = test.NewSpace()
Expect(err).NotTo(HaveOccurred())
+ clock = timestamp.NewMockClock()
+ Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
shard.Close()
deferFn()
})
It("generates several segments and blocks", func() {
+ By("open 4 blocks")
var err error
- shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp,
- tsdb.IntervalRule{
- Unit: tsdb.MILLISECOND,
- Num: 3000,
- },
- tsdb.IntervalRule{
- Unit: tsdb.MILLISECOND,
- Num: 1000,
- },
- 1<<4,
- )
- Expect(err).NotTo(HaveOccurred())
- segDirectories := make([]string, 3)
- Eventually(func() int {
- num := 0
- err := tsdb.WalkDir(tmp+"/shard-0", "seg-", func(suffix, absolutePath string) error {
- if num < 3 {
- segDirectories[num] = absolutePath
- }
- num++
- return nil
- })
- Expect(err).NotTo(HaveOccurred())
- return num
- }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 3))
- for _, d := range segDirectories {
- Eventually(func() int {
- num := 0
- err := tsdb.WalkDir(d, "block-", func(suffix, absolutePath string) error {
- num++
- return nil
- })
- Expect(err).NotTo(HaveOccurred())
- return num
- }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 3))
- }
- })
- It("closes blocks", func() {
- var err error
- shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp,
+ shard, err = tsdb.OpenShard(timestamp.SetClock(context.Background(), clock), common.ShardID(0), tmp,
tsdb.IntervalRule{
Unit: tsdb.DAY,
Num: 1,
},
tsdb.IntervalRule{
- Unit: tsdb.MILLISECOND,
- Num: 1000,
+ Unit: tsdb.HOUR,
+ Num: 12,
},
2,
)
Expect(err).NotTo(HaveOccurred())
- var segDirectory string
- Eventually(func() int {
- num := 0
- errInternal := tsdb.WalkDir(tmp+"/shard-0", "seg-", func(suffix, absolutePath string) error {
- if num < 1 {
- segDirectory = absolutePath
- }
- num++
- return nil
- })
- Expect(errInternal).NotTo(HaveOccurred())
- return num
- }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
- Eventually(func() int {
- num := 0
- errInternal := tsdb.WalkDir(segDirectory, "block-", func(suffix, absolutePath string) error {
- if _, err := os.Stat(path.Join(absolutePath, "store", "LOCK")); errors.Is(err, os.ErrNotExist) {
- num++
- }
- return nil
- })
- Expect(errInternal).NotTo(HaveOccurred())
- return num
- }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
- })
- It("reopens closed blocks", func() {
- var err error
- shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp,
- tsdb.IntervalRule{
- Unit: tsdb.MILLISECOND,
- Num: 3000,
+ By("1st block is opened")
+ t1 := clock.Now()
+ Eventually(func() []tsdb.BlockState {
+ return shard.State().Blocks
+ }).Should(Equal([]tsdb.BlockState{
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
},
- tsdb.IntervalRule{
- Unit: tsdb.MILLISECOND,
- Num: 1000,
+ }))
+ By("2nd block is opened")
+ // 01 10:00
+ clock.Add(10 * time.Hour)
+ t2 := clock.Now().Add(2 * time.Hour)
+ Eventually(func() []tsdb.BlockState {
+ return shard.State().Blocks
+ }).Should(Equal([]tsdb.BlockState{
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
},
- 2,
- )
- Expect(err).NotTo(HaveOccurred())
- Eventually(func() int {
- num := 0
- for _, bs := range shard.State().OpenedBlocks {
- if !bs.Closed {
- num++
- }
- }
- return num
- }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 2))
- var closedBlocks []tsdb.BlockState
- Eventually(func() int {
- closedBlocks = nil
- for _, ob := range shard.State().OpenedBlocks {
- if ob.Closed {
- closedBlocks = append(closedBlocks, ob)
- }
- }
- return len(closedBlocks)
- }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+ },
+ }))
+ Eventually(func() []tsdb.BlockID {
+ return shard.State().OpenBlocks
+ }).Should(Equal([]tsdb.BlockID{}))
+ By("moves to the 2nd block")
+ // 01 13:00
+ clock.Add(3 * time.Hour)
+ Eventually(func() []tsdb.BlockID {
+ return shard.State().OpenBlocks
+ }).Should(Equal([]tsdb.BlockID{
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ }))
+ By("3rd block is opened")
+ // 01 22:00
+ clock.Add(9 * time.Hour)
+ t3 := clock.Now().Add(2 * time.Hour)
+ Eventually(func() []tsdb.BlockState {
+ return shard.State().Blocks
+ }).Should(Equal([]tsdb.BlockState{
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+ },
+ }))
+ By("moves to 3rd block")
+ // 02 01:00
+ clock.Add(3 * time.Hour)
+ Eventually(func() []tsdb.BlockID {
+ return shard.State().OpenBlocks
+ }).Should(Equal([]tsdb.BlockID{
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ }))
+ By("4th block is opened")
+ // 02 10:00
+ clock.Add(9 * time.Hour)
+ t4 := clock.Now().Add(2 * time.Hour)
+ Eventually(func() []tsdb.BlockState {
+ return shard.State().Blocks
+ }).Should(Equal([]tsdb.BlockState{
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+ },
+ }))
+ By("moves to 4th block")
+ // 02 13:00
+ clock.Add(3 * time.Hour)
+ Eventually(func() []tsdb.BlockID {
+ return shard.State().OpenBlocks
+ }).Should(Equal([]tsdb.BlockID{
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ }))
+ By("5th block is opened")
+ // 02 22:00
+ clock.Add(9 * time.Hour)
+ t5 := clock.Now().Add(2 * time.Hour)
+ Eventually(func() []tsdb.BlockState {
+ return shard.State().Blocks
+ }).Should(Equal([]tsdb.BlockState{
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
+ },
+ }))
+ By("close 1st block by adding 5th block")
+ // 03 01:00
+ clock.Add(3 * time.Hour)
+ Eventually(func() []tsdb.BlockID {
+ return shard.State().OpenBlocks
+ }).Should(Equal([]tsdb.BlockID{
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ }))
+ Eventually(func() []tsdb.BlockState {
+ return shard.State().Blocks
+ }).Should(Equal([]tsdb.BlockState{
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+ Closed: true,
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
+ },
+ }))
+ By("reopen 1st block")
series, err := shard.Series().GetByID(common.SeriesID(11))
Expect(err).NotTo(HaveOccurred())
- writeFn := func(bs tsdb.BlockState) {
- span, err := series.Span(bs.TimeRange)
- Expect(err).NotTo(HaveOccurred())
- defer span.Close()
- writer, err := span.WriterBuilder().Family([]byte("test"), []byte("test")).Time(bs.TimeRange.Start).Build()
- Expect(err).NotTo(HaveOccurred())
- _, err = writer.Write()
- Expect(err).NotTo(HaveOccurred())
- }
- for _, bs := range closedBlocks {
- writeFn(bs)
- }
+ t1Range := timestamp.NewInclusiveTimeRangeDuration(t1, 1*time.Hour)
+ span, err := series.Span(t1Range)
+ Expect(err).NotTo(HaveOccurred())
+ defer span.Close()
+ writer, err := span.WriterBuilder().Family([]byte("test"), []byte("test")).Time(t1Range.End).Build()
+ Expect(err).NotTo(HaveOccurred())
+ _, err = writer.Write()
+ Expect(err).NotTo(HaveOccurred())
+ Eventually(func() []tsdb.BlockID {
+ return shard.State().OpenBlocks
+ }).Should(Equal([]tsdb.BlockID{
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ {
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ }))
+ Eventually(func() []tsdb.BlockState {
+ return shard.State().Blocks
+ }).Should(Equal([]tsdb.BlockState{
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+ Closed: true,
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
+ },
+ }))
})
})
})
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index cf40414..8537752 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -49,11 +49,10 @@ const (
blockTemplate = rootPrefix + blockPathPrefix + "-%s"
globalIndexTemplate = rootPrefix + "index"
- segHourFormat = "2006010215"
- segDayFormat = "20060102"
- millisecondFormat = "20060102150405000"
- blockHourFormat = "15"
- blockDayFormat = "0102"
+ segHourFormat = "2006010215"
+ segDayFormat = "20060102"
+ blockHourFormat = "15"
+ blockDayFormat = "0102"
dirPerm = 0700
)
@@ -104,13 +103,18 @@ type BlockID struct {
BlockID uint16
}
+func GenerateInternalID(unit IntervalUnit, suffix int) uint16 {
+ return uint16(unit)<<12 | ((uint16(suffix) << 4) >> 4)
+}
+
type BlockState struct {
ID BlockID
TimeRange timestamp.TimeRange
Closed bool
}
type ShardState struct {
- OpenedBlocks []BlockState
+ Blocks []BlockState
+ OpenBlocks []BlockID
}
type database struct {
@@ -154,14 +158,14 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
return nil, err
}
segmentSize := opts.SegmentSize
- if segmentSize.Unit == MILLISECOND {
+ if segmentSize.Num == 0 {
segmentSize = IntervalRule{
Unit: DAY,
Num: 1,
}
}
blockSize := opts.BlockSize
- if blockSize.Unit == MILLISECOND {
+ if blockSize.Num == 0 {
blockSize = IntervalRule{
Unit: HOUR,
Num: 2,
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index c761503..07ab946 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -34,6 +34,6 @@ func TestTsdb(t *testing.T) {
var _ = BeforeSuite(func() {
Expect(logger.Init(logger.Logging{
Env: "dev",
- Level: "info",
+ Level: "debug",
})).Should(Succeed())
})
diff --git a/go.mod b/go.mod
index e56cf81..dead45b 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.17
require (
github.com/RoaringBitmap/roaring v0.9.1
+ github.com/benbjohnson/clock v1.3.0
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/badger/v3 v3.2011.1
github.com/dgraph-io/ristretto v0.1.0
@@ -107,4 +108,7 @@ require (
sigs.k8s.io/yaml v1.2.0 // indirect
)
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4
+replace (
+ github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8
+ github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4
+)
diff --git a/go.sum b/go.sum
index 5057b2a..4a5d9d8 100644
--- a/go.sum
+++ b/go.sum
@@ -49,6 +49,8 @@ github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bih
github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4 h1:iLwRXI6WHBMb2VkWrlYKIFngPKwgs2OnjliXdMB5DY0=
github.com/SkyAPM/badger/v3 v3.0.0-20220403004319-fea65bd5e9e4/go.mod h1:Q0luV7nB94o3Bl4hYqAPy03+QTtLxs9pWdUEQb0i0K0=
+github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8 h1:TK3KN7H7ROhT0/sfY8JWWa5xHOo5jUqW7Stc/VxNJyA=
+github.com/SkyAPM/clock v1.3.1-0.20220416123716-97dcb111a8d8/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -59,7 +61,6 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
-github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
diff --git a/pkg/timestamp/clock.go b/pkg/timestamp/clock.go
new file mode 100644
index 0000000..feba400
--- /dev/null
+++ b/pkg/timestamp/clock.go
@@ -0,0 +1,67 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+ "context"
+ "time"
+
+ "github.com/benbjohnson/clock"
+)
+
+// Clock represents an interface contains all functions in the standard library time.
+type Clock interface {
+ clock.Clock
+}
+
+type MockClock interface {
+ clock.Clock
+ // Add moves the current time of the mock clock forward by the specified duration.
+ Add(d time.Duration)
+}
+
+// NewClock returns an instance of a real-time clock.
+func NewClock() Clock {
+ return clock.New()
+}
+
+// NewMockClock returns an instance of a mock clock.
+func NewMockClock() MockClock {
+ return clock.NewMock()
+}
+
+var clockKey = contextClockKey{}
+
+type contextClockKey struct{}
+
+// GetClock returns a Clock from the context. If the context doesn't contains
+// a Clock, a real-time one will be created and returned with a child context
+// which contains the new one.
+func GetClock(ctx context.Context) (Clock, context.Context) {
+ c := ctx.Value(clockKey)
+ if c == nil {
+ realClock := NewClock()
+ return realClock, SetClock(ctx, realClock)
+ }
+ return c.(Clock), ctx
+}
+
+// SetContext returns a sub context with the passed Clock
+func SetClock(ctx context.Context, clock Clock) context.Context {
+ return context.WithValue(ctx, clockKey, clock)
+}