You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/02 04:22:13 UTC
[skywalking-banyandb] 01/01: Fix bugs of tsdb
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch tsdb
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c875b64d7666db702ae2e35082f362d966311031
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Nov 2 04:16:18 2022 +0000
Fix bugs of tsdb
* Strategy manager failed to create the current block if there are
blocks existed
* A non-aligned block overflows the segment
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
Makefile | 2 +-
banyand/tsdb/block.go | 7 +-
banyand/tsdb/bucket/bucket.go | 4 +-
banyand/tsdb/bucket/strategy.go | 14 +-
banyand/tsdb/bucket/strategy_test.go | 4 +-
banyand/tsdb/segment.go | 71 ++++----
banyand/tsdb/series.go | 2 +-
banyand/tsdb/seriesdb.go | 6 +-
banyand/tsdb/shard.go | 47 +++---
banyand/tsdb/shard_test.go | 62 +++++++
banyand/tsdb/tsdb.go | 4 +
banyand/tsdb/tsdb_test.go | 52 ++++--
pkg/timestamp/range.go | 4 +
.../data/testdata/service_cpm_minute_data.json | 182 ---------------------
14 files changed, 190 insertions(+), 271 deletions(-)
diff --git a/Makefile b/Makefile
index db19f92..37eee1c 100644
--- a/Makefile
+++ b/Makefile
@@ -65,7 +65,7 @@ test-coverage: default ## Run the unit tests in all projects with coverage analy
include scripts/build/ginkgo.mk
test-ci: $(GINKGO) ## Run the unit tests in CI
- $(GINKGO) -v --race --cover --covermode atomic --coverprofile=coverage.out ./...
+ $(GINKGO) --race --cover --covermode atomic --coverprofile=coverage.out ./...
##@ Code quality targets
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 0043528..3bc87ea 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -80,7 +80,7 @@ type block struct {
type blockOpts struct {
segID uint16
blockSize IntervalRule
- startTime time.Time
+ timeRange timestamp.TimeRange
suffix string
path string
queue bucket.Queue
@@ -92,15 +92,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
return nil, err
}
id := GenerateInternalID(opts.blockSize.Unit, suffixInteger)
- timeRange := timestamp.NewTimeRange(opts.startTime, opts.blockSize.NextTime(opts.startTime), true, false)
clock, _ := timestamp.GetClock(ctx)
b = &block{
segID: opts.segID,
blockID: id,
path: opts.path,
l: logger.Fetch(ctx, "block"),
- TimeRange: timeRange,
- Reporter: bucket.NewTimeBasedReporter(timeRange, clock),
+ TimeRange: opts.timeRange,
+ Reporter: bucket.NewTimeBasedReporter(opts.timeRange, clock),
flushCh: make(chan struct{}, 1),
ref: &atomic.Int32{},
closed: &atomic.Bool{},
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index ad5b051..5838c26 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -24,7 +24,7 @@ import (
)
type Controller interface {
- Current() Reporter
+ Current() (Reporter, error)
Next() (Reporter, error)
OnMove(prev, next Reporter)
}
@@ -42,6 +42,8 @@ type Reporter interface {
String() string
}
+var _ Reporter = (*timeBasedReporter)(nil)
+
type timeBasedReporter struct {
timestamp.TimeRange
reporterStopCh chan struct{}
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index de8011c..4a8d14e 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -82,16 +82,17 @@ func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error)
if strategy.logger == nil {
strategy.logger = logger.GetLogger("bucket-strategy")
}
+ c, err := ctrl.Current()
+ if err != nil {
+ return nil, err
+ }
+ strategy.current.Store(c)
return strategy, nil
}
func (s *Strategy) Run() {
- for s.current.Load() == nil {
- s.current.Store(s.ctrl.Current())
- }
go func(s *Strategy) {
for {
-
c := s.current.Load().(Reporter).Report()
if !s.observe(c) {
return
@@ -110,7 +111,6 @@ func (s *Strategy) String() string {
}
func (s *Strategy) observe(c Channel) bool {
- var err error
var next Reporter
moreBucket := true
for {
@@ -122,11 +122,13 @@ func (s *Strategy) observe(c Channel) bool {
ratio := Ratio(status.Volume) / Ratio(status.Capacity)
atomic.StoreUint64(&s.currentRatio, math.Float64bits(float64(ratio)))
if ratio >= s.ratio && next == nil && moreBucket {
- next, err = s.ctrl.Next()
+ n, err := s.ctrl.Next()
if errors.Is(err, ErrNoMoreBucket) {
moreBucket = false
} else if err != nil {
s.logger.Err(err).Msg("failed to create the next bucket")
+ } else {
+ next = n
}
}
if ratio >= 1.0 {
diff --git a/banyand/tsdb/bucket/strategy_test.go b/banyand/tsdb/bucket/strategy_test.go
index aca1022..00f6ef5 100644
--- a/banyand/tsdb/bucket/strategy_test.go
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -109,10 +109,10 @@ func (c *controller) Next() (bucket.Reporter, error) {
return c.reporter, nil
}
-func (c *controller) Current() bucket.Reporter {
+func (c *controller) Current() (bucket.Reporter, error) {
c.mux.RLock()
defer c.mux.RUnlock()
- return c.reporter
+ return c.reporter, nil
}
func (c *controller) OnMove(prev bucket.Reporter, next bucket.Reporter) {
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index b516bd2..3393f67 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -60,7 +60,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
return nil, err
}
id := GenerateInternalID(segmentSize.Unit, suffixInteger)
- timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false)
+ timeRange := timestamp.NewSectionTimeRange(startTime, segmentSize.NextTime(startTime))
l := logger.Fetch(ctx, "segment")
segCtx := context.WithValue(ctx, logger.ContextKey, l)
clock, segCtx := timestamp.GetClock(segCtx)
@@ -73,8 +73,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
TimeRange: timeRange,
Reporter: bucket.NewTimeBasedReporter(timeRange, clock),
}
- isHead := s.End.After(clock.Now())
- err = s.blockController.open(isHead)
+ err = s.blockController.open()
if err != nil {
return nil, err
}
@@ -101,7 +100,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
}
}
}
- if !isHead {
+ if !s.End.After(clock.Now()) {
return
}
s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l))
@@ -175,33 +174,31 @@ func newBlockController(segCtx context.Context, segID uint16, location string, s
}
}
-func (bc *blockController) Current() bucket.Reporter {
- bc.RLock()
- defer bc.RUnlock()
+func (bc *blockController) Current() (bucket.Reporter, error) {
now := bc.clock.Now()
- for _, s := range bc.lst {
- if s.suffix == bc.Format(now) {
- return s
+ ns := uint64(now.UnixNano())
+ if b := func() bucket.Reporter {
+ bc.RLock()
+ defer bc.RUnlock()
+ for _, s := range bc.lst {
+ if s.Contains(ns) {
+ return s
+ }
}
+ return nil
+ }(); b != nil {
+ return b, nil
}
- // return the latest segment before now
- if len(bc.lst) > 0 {
- return bc.lst[len(bc.lst)-1]
- }
- return nil
+ return bc.create(now)
}
func (bc *blockController) Next() (bucket.Reporter, error) {
- b := bc.Current().(*block)
- reporter, err := bc.create(
- bc.blockSize.NextTime(b.Start))
- if errors.Is(err, ErrEndOfSegment) {
- return nil, bucket.ErrNoMoreBucket
- }
+ c, err := bc.Current()
if err != nil {
return nil, err
}
- return reporter, err
+ b := c.(*block)
+ return bc.create(bc.blockSize.NextTime(b.Start))
}
func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
@@ -327,8 +324,8 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) {
panic("invalid interval unit")
}
-func (bc *blockController) open(createIfEmpty bool) error {
- err := WalkDir(
+func (bc *blockController) open() error {
+ return WalkDir(
bc.location,
segPathPrefix,
func(suffix, absolutePath string) error {
@@ -337,19 +334,6 @@ func (bc *blockController) open(createIfEmpty bool) error {
_, err := bc.load(suffix, absolutePath)
return err
})
- if err != nil {
- return err
- }
- if !createIfEmpty {
- return nil
- }
- if bc.Current() == nil {
- _, err := bc.create(bc.clock.Now())
- if err != nil {
- return err
- }
- }
- return nil
}
func (bc *blockController) create(startTime time.Time) (*block, error) {
@@ -357,11 +341,16 @@ func (bc *blockController) create(startTime time.Time) (*block, error) {
startTime = bc.segTimeRange.Start
}
if !startTime.Before(bc.segTimeRange.End) {
- return nil, ErrEndOfSegment
+ return nil, bucket.ErrNoMoreBucket
}
bc.Lock()
defer bc.Unlock()
suffix := bc.Format(startTime)
+ for _, b := range bc.lst {
+ if b.suffix == suffix {
+ return b, nil
+ }
+ }
segPath, err := mkdir(blockTemplate, bc.location, suffix)
if err != nil {
return nil, err
@@ -374,6 +363,10 @@ func (bc *blockController) load(suffix, path string) (b *block, err error) {
if err != nil {
return nil, err
}
+ endTime := bc.blockSize.NextTime(starTime)
+ if endTime.After(bc.segTimeRange.End) {
+ endTime = bc.segTimeRange.End
+ }
if b, err = newBlock(
common.SetPosition(bc.segCtx, func(p common.Position) common.Position {
p.Block = suffix
@@ -382,7 +375,7 @@ func (bc *blockController) load(suffix, path string) (b *block, err error) {
blockOpts{
segID: bc.segID,
path: path,
- startTime: starTime,
+ timeRange: timestamp.NewSectionTimeRange(starTime, endTime),
suffix: suffix,
blockSize: bc.blockSize,
queue: bc.blockQueue,
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 2b9c355..c0956de 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -140,7 +140,7 @@ func (s *series) Create(t time.Time) (SeriesSpan, error) {
Msg("load a series span")
return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil
}
- b, err := s.blockDB.create(tr)
+ b, err := s.blockDB.create(t)
if err != nil {
return nil, err
}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 61332f7..7825338 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -24,6 +24,7 @@ import (
"math"
"sort"
"sync"
+ "time"
"go.uber.org/multierr"
@@ -148,7 +149,7 @@ type SeriesDatabase interface {
type blockDatabase interface {
shardID() common.ShardID
span(timeRange timestamp.TimeRange) ([]BlockDelegate, error)
- create(timeRange timestamp.TimeRange) (BlockDelegate, error)
+ create(ts time.Time) (BlockDelegate, error)
block(id GlobalItemID) (BlockDelegate, error)
}
@@ -266,9 +267,10 @@ func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error)
return result, nil
}
-func (s *seriesDB) create(timeRange timestamp.TimeRange) (BlockDelegate, error) {
+func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) {
s.Lock()
defer s.Unlock()
+ timeRange := timestamp.NewInclusiveTimeRange(ts, ts)
ss := s.segCtrl.span(timeRange)
if len(ss) > 0 {
s := ss[0]
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 4f3c168..277ee03 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -293,24 +293,30 @@ func (sc *segmentController) segments() (ss []*segment) {
return r
}
-func (sc *segmentController) Current() bucket.Reporter {
- sc.RLock()
- defer sc.RUnlock()
+func (sc *segmentController) Current() (bucket.Reporter, error) {
now := sc.clock.Now()
- for _, s := range sc.lst {
- if s.suffix == sc.Format(now) {
- return s
+ ns := uint64(now.UnixNano())
+ if b := func() bucket.Reporter {
+ sc.RLock()
+ defer sc.RUnlock()
+ for _, s := range sc.lst {
+ if s.Contains(ns) {
+ return s
+ }
}
+ return nil
+ }(); b != nil {
+ return b, nil
}
- // return the latest segment before now
- if len(sc.lst) > 0 {
- return sc.lst[len(sc.lst)-1]
- }
- return nil
+ return sc.create(sc.Format(now), true)
}
func (sc *segmentController) Next() (bucket.Reporter, error) {
- seg := sc.Current().(*segment)
+ c, err := sc.Current()
+ if err != nil {
+ return nil, err
+ }
+ seg := c.(*segment)
reporter, err := sc.create(sc.Format(
sc.segmentSize.NextTime(seg.Start)), true)
if errors.Is(err, ErrEndOfSegment) {
@@ -351,7 +357,7 @@ func (sc *segmentController) Parse(value string) (time.Time, error) {
}
func (sc *segmentController) open() error {
- err := WalkDir(
+ return WalkDir(
sc.location,
segPathPrefix,
func(suffix, absolutePath string) error {
@@ -363,21 +369,16 @@ func (sc *segmentController) open() error {
}
return err
})
- if err != nil {
- return err
- }
- if sc.Current() == nil {
- _, err = sc.create(sc.Format(sc.clock.Now()), true)
- if err != nil {
- return err
- }
- }
- return nil
}
func (sc *segmentController) create(suffix string, createBlockIfEmpty bool) (*segment, error) {
sc.Lock()
defer sc.Unlock()
+ for _, s := range sc.lst {
+ if s.suffix == suffix {
+ return s, nil
+ }
+ }
segPath, err := mkdir(segTemplate, sc.location, suffix)
if err != nil {
return nil, err
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 350085a..1e1197e 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -540,5 +540,67 @@ var _ = Describe("Shard", func() {
},
}))
})
+ It("creates arbitrary blocks", func() {
+ clock.Set(time.Date(1970, 0o1, 0o1, 1, 0, 0, 0, time.Local))
+ By("open 1 block")
+ var err error
+ shard, err = tsdb.OpenShard(timestamp.SetClock(context.Background(), clock), common.ShardID(0), tmp,
+ tsdb.IntervalRule{
+ Unit: tsdb.DAY,
+ Num: 1,
+ },
+ tsdb.IntervalRule{
+ Unit: tsdb.HOUR,
+ Num: 12,
+ },
+ tsdb.IntervalRule{
+ Unit: tsdb.DAY,
+ Num: 7,
+ },
+ 2,
+ )
+ Expect(err).NotTo(HaveOccurred())
+ By("01/01 00:01 1st block is opened")
+ t1 := clock.Now()
+ Eventually(func() []tsdb.BlockState {
+ return shard.State().Blocks
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o1),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+ },
+ }))
+ By("01/01 11:00 2nd block is opened")
+ clock.Add(10 * time.Hour)
+ t2 := clock.Now().Add(2 * time.Hour)
+ Eventually(func() []tsdb.BlockState {
+ if clock.TriggerTimer() {
+ GinkgoWriter.Println("01/01 10:00 has been triggered")
+ }
+ return shard.State().Blocks
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o1),
+ },
+ TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 13),
+ },
+ // The last block only takes 11 hours to align the segment's size
+ TimeRange: timestamp.NewTimeRangeDuration(t2, 11*time.Hour, true, false),
+ },
+ }))
+ Eventually(func() []tsdb.BlockID {
+ return shard.State().OpenBlocks
+ }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
+ })
})
})
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index dc6b62a..5710837 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"sync"
+ "time"
"github.com/pkg/errors"
"go.uber.org/multierr"
@@ -174,6 +175,9 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
if opts.TTL.Num == 0 {
return nil, errors.Wrap(ErrOpenDatabase, "ttl is absent")
}
+ if opts.SegmentInterval.EstimatedDuration() > 24*time.Hour {
+ return nil, errors.Wrap(ErrOpenDatabase, "segment interval should not be greater than 24 hours")
+ }
db := &database{
location: opts.Location,
shardNum: opts.ShardNum,
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index fe6c9a6..684269f 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -30,15 +30,16 @@ import (
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
func TestOpenDatabase(t *testing.T) {
tester := assert.New(t)
req := require.New(t)
tempDir, deferFunc := test.Space(req)
- openDatabase(req, tempDir)
+ openDatabase(context.Background(), req, tempDir)
defer deferFunc()
- verifyDatabaseStructure(tester, tempDir)
+ verifyDatabaseStructure(tester, tempDir, time.Now())
}
func TestReOpenDatabase(t *testing.T) {
@@ -46,32 +47,63 @@ func TestReOpenDatabase(t *testing.T) {
req := require.New(t)
tempDir, deferFunc := test.Space(req)
defer deferFunc()
- db := openDatabase(req, tempDir)
+ db := openDatabase(context.Background(), req, tempDir)
req.NoError(db.Close())
- verifyDatabaseStructure(tester, tempDir)
- db = openDatabase(req, tempDir)
+ verifyDatabaseStructure(tester, tempDir, time.Now())
+ db = openDatabase(context.Background(), req, tempDir)
req.NoError(db.Close())
- verifyDatabaseStructure(tester, tempDir)
+ verifyDatabaseStructure(tester, tempDir, time.Now())
}
-func verifyDatabaseStructure(tester *assert.Assertions, tempDir string) {
+func TestReOpenDatabaseNextBlock(t *testing.T) {
+ tester := assert.New(t)
+ req := require.New(t)
+ tempDir, deferFunc := test.Space(req)
+ defer deferFunc()
+ clock := timestamp.NewMockClock()
+ clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+ db := openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+ req.NoError(db.Close())
+ verifyDatabaseStructure(tester, tempDir, clock.Now())
+ clock.Add(5 * time.Hour)
+ db = openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+ req.NoError(db.Close())
+ verifyDatabaseStructure(tester, tempDir, clock.Now())
+}
+
+func TestReOpenDatabaseNextDay(t *testing.T) {
+ tester := assert.New(t)
+ req := require.New(t)
+ tempDir, deferFunc := test.Space(req)
+ defer deferFunc()
+ clock := timestamp.NewMockClock()
+ clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+ db := openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+ req.NoError(db.Close())
+ verifyDatabaseStructure(tester, tempDir, clock.Now())
+ clock.Add(26 * time.Hour)
+ db = openDatabase(timestamp.SetClock(context.Background(), clock), req, tempDir)
+ req.NoError(db.Close())
+ verifyDatabaseStructure(tester, tempDir, clock.Now())
+}
+
+func verifyDatabaseStructure(tester *assert.Assertions, tempDir string, now time.Time) {
shardPath := fmt.Sprintf(shardTemplate, tempDir, 0)
validateDirectory(tester, shardPath)
seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
validateDirectory(tester, seriesPath)
- now := time.Now()
segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segDayFormat))
validateDirectory(tester, segPath)
validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, now.Format(blockHourFormat)))
}
-func openDatabase(t *require.Assertions, path string) (db Database) {
+func openDatabase(ctx context.Context, t *require.Assertions, path string) (db Database) {
t.NoError(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
}))
db, err := OpenDatabase(
- context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")),
+ context.WithValue(ctx, logger.ContextKey, logger.GetLogger("test")),
DatabaseOpts{
Location: path,
ShardNum: 1,
diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go
index 3c5b77b..46ac360 100644
--- a/pkg/timestamp/range.go
+++ b/pkg/timestamp/range.go
@@ -84,6 +84,10 @@ func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration) Time
return NewTimeRangeDuration(start, duration, true, true)
}
+func NewSectionTimeRange(start, end time.Time) TimeRange {
+ return NewTimeRange(start, end, true, false)
+}
+
func NewTimeRange(start, end time.Time, includeStart, includeEnd bool) TimeRange {
return TimeRange{
Start: start,
diff --git a/test/cases/stream/data/testdata/service_cpm_minute_data.json b/test/cases/stream/data/testdata/service_cpm_minute_data.json
deleted file mode 100644
index 877290c..0000000
--- a/test/cases/stream/data/testdata/service_cpm_minute_data.json
+++ /dev/null
@@ -1,182 +0,0 @@
-[
- {
- "tag_families": [
- {
- "tags": [
- {
- "id": {
- "value": "1"
- }
- },
- {
- "str": {
- "value": "entity_1"
- }
- }
- ]
- }
- ],
- "fields": [
- {
- "int": {
- "value": 100
- }
- },
- {
- "int": {
- "value": 1
- }
- }
- ]
- },
- {
- "tag_families": [
- {
- "tags": [
- {
- "id": {
- "value": "4"
- }
- },
- {
- "str": {
- "value": "entity_2"
- }
- }
- ]
- }
- ],
- "fields": [
- {
- "int": {
- "value": 100
- }
- },
- {
- "int": {
- "value": 1
- }
- }
- ]
- },
- {
- "tag_families": [
- {
- "tags": [
- {
- "id": {
- "value": "5"
- }
- },
- {
- "str": {
- "value": "entity_2"
- }
- }
- ]
- }
- ],
- "fields": [
- {
- "int": {
- "value": 100
- }
- },
- {
- "int": {
- "value": 1
- }
- }
- ]
- },
- {
- "tag_families": [
- {
- "tags": [
- {
- "id": {
- "value": "6"
- }
- },
- {
- "str": {
- "value": "entity_3"
- }
- }
- ]
- }
- ],
- "fields": [
- {
- "int": {
- "value": 100
- }
- },
- {
- "int": {
- "value": 5
- }
- }
- ]
- },
- {
- "tag_families": [
- {
- "tags": [
- {
- "id": {
- "value": "2"
- }
- },
- {
- "str": {
- "value": "entity_1"
- }
- }
- ]
- }
- ],
- "fields": [
- {
- "int": {
- "value": 50
- }
- },
- {
- "int": {
- "value": 4
- }
- }
- ]
- },
- {
- "tag_families": [
- {
- "tags": [
- {
- "id": {
- "value": "3"
- }
- },
- {
- "str": {
- "value": "entity_1"
- }
- }
- ]
- }
- ],
- "fields": [
- {
- "int": {
- "value": 300
- }
- },
- {
- "int": {
- "value": 5
- }
- }
- ]
- }
-]