You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2022/01/22 12:32:41 UTC

[GitHub] [skywalking-banyandb] hanahmily opened a new pull request #70: Bucket strategy managment and segment controller

hanahmily opened a new pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70


   These changes tend to introduce a bucket-based strategy manager to move segments and blocks forward. The new manager enhancements the segment in this PR. The block's going to implement it in the following up commits.
   
   Caveat: the manager doesn't clean up segments/blocks, which are not the current reporter. Another resource controller will take it according to the memory state. 
   
   Signed-off-by: Gao Hongtao <ha...@gmail.com>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#issuecomment-1019700019


   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#discussion_r790384299



##########
File path: banyand/tsdb/segment.go
##########
@@ -19,42 +19,74 @@ package tsdb
 
 import (
 	"context"
+	"strconv"
 	"sync"
 	"time"
 
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type segment struct {
-	path string
+	id     uint16
+	path   string
+	suffix string
 
 	lst         []*block
 	globalIndex kv.Store
 	sync.Mutex
-	l         *logger.Logger
-	startTime time.Time
-	endTime   time.Time
+	l              *logger.Logger
+	reporterStopCh chan struct{}
+	TimeRange
 }
 
-func (s *segment) contains(ts time.Time) bool {
-	greaterAndEqualStart := s.startTime.Equal(ts) || s.startTime.Before(ts)
-	if s.endTime.IsZero() {
-		return greaterAndEqualStart
+func (s *segment) Report() bucket.Channel {
+	ch := make(bucket.Channel)
+	interval := s.Duration() >> 4

Review comment:
       get rid of setting the interval by hand. The interval = the segment duration / 16




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 merged pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
lujiajing1126 merged pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#discussion_r790380383



##########
File path: banyand/tsdb/bucket/strategy.go
##########
@@ -0,0 +1,131 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bucket
+
+import (
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	ErrInvalidParameter = errors.New("parameters are invalid")
+	ErrNoMoreBucket     = errors.New("no more buckets")
+)
+
+type Ratio float64
+
+type Strategy struct {
+	optionsErr error
+	ratio      Ratio
+	ctrl       Controller
+	current    Reporter
+	next       Reporter
+	logger     *logger.Logger
+	stopCh     chan struct{}
+}
+
+type StrategyOptions func(*Strategy)
+
+func WithNextThreshold(ratio Ratio) StrategyOptions {
+	return func(s *Strategy) {
+		if ratio > 1.0 {
+			s.optionsErr = multierr.Append(s.optionsErr,
+				errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", ratio))
+			return
+		}
+		s.ratio = ratio
+	}
+}
+
+func WithLogger(logger *logger.Logger) StrategyOptions {
+	return func(s *Strategy) {
+		s.logger = logger
+	}
+}
+
+func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) {
+	if ctrl == nil {
+		return nil, errors.Wrap(ErrInvalidParameter, "controller is absent")
+	}
+	strategy := &Strategy{
+		ctrl:   ctrl,
+		ratio:  0.8,
+		stopCh: make(chan struct{}),
+	}
+	for _, opt := range options {
+		opt(strategy)
+	}
+	if strategy.optionsErr != nil {
+		return nil, strategy.optionsErr
+	}
+	if strategy.logger == nil {
+		strategy.logger = logger.GetLogger("bucket-strategy")
+	}
+	return strategy, nil
+}
+
+func (s *Strategy) Run() {
+	reset := func() {
+		for s.current == nil {
+			s.current = s.ctrl.Current()
+		}
+		s.next = nil
+	}
+	reset()
+	go func(s *Strategy) {
+		var err error
+		for {
+		bucket:
+			c := s.current.Report()
+			for {
+				select {
+				case status, more := <-c:
+					if !more {
+						reset()
+						goto bucket
+					}
+					ratio := Ratio(status.Volume) / Ratio(status.Capacity)
+					if ratio >= s.ratio && s.next == nil {
+						s.next, err = s.ctrl.Next()
+						if errors.Is(err, ErrNoMoreBucket) {
+							return
+						}
+						if err != nil {

Review comment:
       The loop keeps calling the `next` continually till it successfully. This piece could fix accidental failures.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#discussion_r790383831



##########
File path: banyand/tsdb/segment.go
##########
@@ -19,42 +19,74 @@ package tsdb
 
 import (
 	"context"
+	"strconv"
 	"sync"
 	"time"
 
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type segment struct {
-	path string
+	id     uint16
+	path   string
+	suffix string
 
 	lst         []*block
 	globalIndex kv.Store
 	sync.Mutex
-	l         *logger.Logger
-	startTime time.Time
-	endTime   time.Time
+	l              *logger.Logger
+	reporterStopCh chan struct{}
+	TimeRange
 }
 
-func (s *segment) contains(ts time.Time) bool {
-	greaterAndEqualStart := s.startTime.Equal(ts) || s.startTime.Before(ts)
-	if s.endTime.IsZero() {
-		return greaterAndEqualStart
+func (s *segment) Report() bucket.Channel {
+	ch := make(bucket.Channel)
+	interval := s.Duration() >> 4
+	if interval < 100*time.Millisecond {
+		interval = 100 * time.Millisecond
 	}
-	return greaterAndEqualStart && s.endTime.After(ts)
+	go func() {
+		defer close(ch)
+		for {
+			ticker := time.NewTicker(interval)
+			defer ticker.Stop()

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#discussion_r790235421



##########
File path: banyand/tsdb/segment.go
##########
@@ -19,42 +19,74 @@ package tsdb
 
 import (
 	"context"
+	"strconv"
 	"sync"
 	"time"
 
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type segment struct {
-	path string
+	id     uint16
+	path   string
+	suffix string
 
 	lst         []*block
 	globalIndex kv.Store
 	sync.Mutex
-	l         *logger.Logger
-	startTime time.Time
-	endTime   time.Time
+	l              *logger.Logger
+	reporterStopCh chan struct{}
+	TimeRange
 }
 
-func (s *segment) contains(ts time.Time) bool {
-	greaterAndEqualStart := s.startTime.Equal(ts) || s.startTime.Before(ts)
-	if s.endTime.IsZero() {
-		return greaterAndEqualStart
+func (s *segment) Report() bucket.Channel {
+	ch := make(bucket.Channel)
+	interval := s.Duration() >> 4

Review comment:
       why we need to divide by 2^4?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#discussion_r790380581



##########
File path: banyand/tsdb/shard.go
##########
@@ -52,66 +54,234 @@ func (s *shard) Index() IndexDatabase {
 	return s.indexDatabase
 }
 
-func openShard(ctx context.Context, id common.ShardID, location string) (*shard, error) {
-	s := &shard{
-		id:       id,
-		location: location,
-	}
-	parentLogger := ctx.Value(logger.ContextKey)
-	if parentLogger != nil {
-		if pl, ok := parentLogger.(*logger.Logger); ok {
-			s.logger = pl.Named("shard" + strconv.Itoa(int(id)))
-		}
+func OpenShard(ctx context.Context, id common.ShardID, root string, intervalRule IntervalRule) (Shard, error) {
+	path, err := mkdir(shardTemplate, root, int(id))
+	if err != nil {
+		return nil, errors.Wrapf(err, "make the directory of the shard %d ", int(id))
 	}
-	loadSeg := func(path string) error {
-		seg, err := newSegment(ctx, path)
-		if err != nil {
-			return err
-		}
-		{
-			s.Lock()
-			defer s.Unlock()
-			s.lst = append(s.lst, seg)
-		}
-		s.logger.Info().Int("size", len(s.lst)).Msg("seg size")
-		return nil
+	l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id)))
+	l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a shard")
+	s := &shard{
+		id:                id,
+		segmentController: newSegmentController(path, intervalRule),
+		l:                 l,
 	}
-	err := walkDir(location, segPathPrefix, func(_, absolutePath string) error {
-		s.logger.Info().Str("path", absolutePath).Msg("loading a segment")
-		return loadSeg(absolutePath)
-	})
+	shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
+	err = s.segmentController.open(shardCtx)
 	if err != nil {
 		return nil, err
 	}
-	if len(s.lst) < 1 {
-		var segPath string
-		segPath, err = mkdir(segTemplate, location, time.Now().Format(segFormat))
-		if err != nil {
-			return nil, err
-		}
-		s.logger.Info().Str("path", segPath).Msg("creating a new segment")
-		err = loadSeg(segPath)
-		if err != nil {
-			return nil, err
-		}
-	}
-	seriesPath, err := mkdir(seriesTemplate, s.location)
+	seriesPath, err := mkdir(seriesTemplate, path)
 	if err != nil {
 		return nil, err
 	}
-	sdb, err := newSeriesDataBase(ctx, s.id, seriesPath, s.lst)
+	sdb, err := newSeriesDataBase(shardCtx, s.id, seriesPath, s.segmentController)
 	if err != nil {
 		return nil, err
 	}
 	s.seriesDatabase = sdb
-	idb, err := newIndexDatabase(ctx, s.id, s.lst)
+	idb, err := newIndexDatabase(shardCtx, s.id, s.segmentController)
 	if err != nil {
 		return nil, err
 	}
 	s.indexDatabase = idb
+	s.segmentManageStrategy, err = bucket.NewStrategy(s.segmentController, bucket.WithLogger(s.l))
+	if err != nil {
+		return nil, err
+	}
+	s.segmentManageStrategy.Run()
 	return s, nil
 }
 
 func (s *shard) Close() error {
+	s.segmentManageStrategy.Close()
+	s.segmentController.close()
 	return s.seriesDatabase.Close()
 }
+
+type IntervalUnit int
+
+const (
+	DAY IntervalUnit = iota
+	MONTH
+	YEAR
+	MILLISECOND // only for testing
+)
+
+func (iu IntervalUnit) String() string {
+	switch iu {
+	case DAY:
+		return "day"
+	case MONTH:
+		return "month"
+	case YEAR:
+		return "year"
+	case MILLISECOND:
+		return "millis"
+
+	}
+	panic("invalid interval unit")
+}
+
+func (iu IntervalUnit) Format(tm time.Time) string {
+	switch iu {
+	case DAY:
+		return tm.Format(segDayFormat)
+	case MONTH:
+		return tm.Format(segMonthFormat)
+	case YEAR:
+		return tm.Format(segYearFormat)
+	case MILLISECOND:
+		return tm.Format(segMillisecondFormat)
+	}
+	panic("invalid interval unit")
+}
+
+func (iu IntervalUnit) Parse(value string) (time.Time, error) {
+	switch iu {
+	case DAY:
+		return time.Parse(segDayFormat, value)
+	case MONTH:
+		return time.Parse(segMonthFormat, value)
+	case YEAR:
+		return time.Parse(segYearFormat, value)
+	case MILLISECOND:
+		return time.Parse(segMillisecondFormat, value)
+	}
+	panic("invalid interval unit")
+}
+
+type IntervalRule struct {
+	Unit IntervalUnit
+	Num  int
+}
+
+func (ir IntervalRule) NextTime(current time.Time) time.Time {
+	switch ir.Unit {
+	case DAY:
+		return current.AddDate(0, 0, ir.Num)
+	case MONTH:
+		return current.AddDate(0, ir.Num, 0)
+	case YEAR:
+		return current.AddDate(ir.Num, 0, 0)
+	case MILLISECOND:
+		return current.Add(time.Millisecond * time.Duration(ir.Num))
+	}
+	panic("invalid interval unit")
+}
+
+type segmentController struct {
+	sync.RWMutex
+	location     string
+	intervalRule IntervalRule
+	lst          []*segment
+}
+
+func newSegmentController(location string, intervalRule IntervalRule) *segmentController {
+	return &segmentController{
+		location:     location,
+		intervalRule: intervalRule,
+	}
+}
+
+func (sc *segmentController) get(segID uint16) *segment {
+	sc.RLock()
+	defer sc.RUnlock()
+	last := len(sc.lst) - 1
+	for i := range sc.lst {
+		s := sc.lst[last-i]
+		if s.id == segID {
+			return s
+		}
+	}
+	return nil
+}
+
+func (sc *segmentController) span(timeRange TimeRange) (ss []*segment) {
+	sc.RLock()
+	defer sc.RUnlock()
+	last := len(sc.lst) - 1
+	for i := range sc.lst {
+		s := sc.lst[last-i]
+		if s.Overlapping(timeRange) {
+			ss = append(ss, s)
+		}
+	}
+	return ss
+}
+
+func (sc *segmentController) segments() (ss []*segment) {
+	sc.RLock()
+	defer sc.RUnlock()
+	r := make([]*segment, len(sc.lst))
+	copy(r, sc.lst)
+	return r
+}
+
+func (sc *segmentController) Current() bucket.Reporter {
+	sc.RLock()
+	defer sc.RUnlock()
+	now := time.Now()
+	for _, s := range sc.lst {
+		if s.suffix == sc.intervalRule.Unit.Format(now) {
+			return s
+		}
+	}
+	// return the latest segment before now
+	if len(sc.lst) > 0 {
+		return sc.lst[len(sc.lst)-1]
+	}
+	return nil
+}
+
+func (sc *segmentController) Next() (bucket.Reporter, error) {
+	return sc.create(context.TODO(), sc.intervalRule.Unit.Format(
+		sc.intervalRule.NextTime(time.Now())))
+}
+
+func (sc *segmentController) open(ctx context.Context) error {
+	err := walkDir(
+		sc.location,
+		segPathPrefix,
+		func(suffix, absolutePath string) error {
+			_, err := sc.load(ctx, suffix, absolutePath)
+			return err
+		})
+	if err != nil {
+		return err
+	}
+	if sc.Current() == nil {
+		_, err = sc.create(ctx, sc.intervalRule.Unit.Format(time.Now()))
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (sc *segmentController) create(ctx context.Context, suffix string) (*segment, error) {
+	segPath, err := mkdir(segTemplate, sc.location, suffix)
+	if err != nil {
+		return nil, err
+	}
+	return sc.load(ctx, suffix, segPath)
+}
+
+func (sc *segmentController) load(ctx context.Context, suffix, path string) (seg *segment, err error) {
+	seg, err = openSegment(ctx, suffix, path, sc.intervalRule)
+	if err != nil {
+		return nil, err
+	}
+	{

Review comment:
       it tends to reduce the scope of the lock




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#discussion_r790379573



##########
File path: banyand/tsdb/bucket/strategy.go
##########
@@ -0,0 +1,131 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bucket
+
+import (
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	ErrInvalidParameter = errors.New("parameters are invalid")
+	ErrNoMoreBucket     = errors.New("no more buckets")
+)
+
+type Ratio float64
+
+type Strategy struct {
+	optionsErr error
+	ratio      Ratio
+	ctrl       Controller
+	current    Reporter
+	next       Reporter
+	logger     *logger.Logger
+	stopCh     chan struct{}
+}
+
+type StrategyOptions func(*Strategy)
+
+func WithNextThreshold(ratio Ratio) StrategyOptions {
+	return func(s *Strategy) {
+		if ratio > 1.0 {
+			s.optionsErr = multierr.Append(s.optionsErr,
+				errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", ratio))
+			return
+		}
+		s.ratio = ratio
+	}
+}
+
+func WithLogger(logger *logger.Logger) StrategyOptions {
+	return func(s *Strategy) {
+		s.logger = logger
+	}
+}
+
+func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) {
+	if ctrl == nil {
+		return nil, errors.Wrap(ErrInvalidParameter, "controller is absent")
+	}
+	strategy := &Strategy{
+		ctrl:   ctrl,
+		ratio:  0.8,
+		stopCh: make(chan struct{}),
+	}
+	for _, opt := range options {
+		opt(strategy)
+	}
+	if strategy.optionsErr != nil {
+		return nil, strategy.optionsErr
+	}
+	if strategy.logger == nil {
+		strategy.logger = logger.GetLogger("bucket-strategy")
+	}
+	return strategy, nil
+}
+
+func (s *Strategy) Run() {
+	reset := func() {
+		for s.current == nil {
+			s.current = s.ctrl.Current()
+		}
+		s.next = nil
+	}
+	reset()
+	go func(s *Strategy) {
+		var err error
+		for {
+		bucket:

Review comment:
       ` s.current.Report()` will return a new channel. We should reuse it by introducing an outer loop. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#discussion_r790144319



##########
File path: banyand/tsdb/bucket/strategy.go
##########
@@ -0,0 +1,131 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bucket
+
+import (
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	ErrInvalidParameter = errors.New("parameters are invalid")
+	ErrNoMoreBucket     = errors.New("no more buckets")
+)
+
+type Ratio float64
+
+type Strategy struct {
+	optionsErr error
+	ratio      Ratio
+	ctrl       Controller
+	current    Reporter
+	next       Reporter
+	logger     *logger.Logger
+	stopCh     chan struct{}
+}
+
+type StrategyOptions func(*Strategy)
+
+func WithNextThreshold(ratio Ratio) StrategyOptions {
+	return func(s *Strategy) {
+		if ratio > 1.0 {
+			s.optionsErr = multierr.Append(s.optionsErr,
+				errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", ratio))
+			return
+		}
+		s.ratio = ratio
+	}
+}
+
+func WithLogger(logger *logger.Logger) StrategyOptions {
+	return func(s *Strategy) {
+		s.logger = logger
+	}
+}
+
+func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) {
+	if ctrl == nil {
+		return nil, errors.Wrap(ErrInvalidParameter, "controller is absent")
+	}
+	strategy := &Strategy{
+		ctrl:   ctrl,
+		ratio:  0.8,
+		stopCh: make(chan struct{}),
+	}
+	for _, opt := range options {
+		opt(strategy)
+	}
+	if strategy.optionsErr != nil {
+		return nil, strategy.optionsErr
+	}
+	if strategy.logger == nil {
+		strategy.logger = logger.GetLogger("bucket-strategy")
+	}
+	return strategy, nil
+}
+
+func (s *Strategy) Run() {
+	reset := func() {
+		for s.current == nil {
+			s.current = s.ctrl.Current()
+		}
+		s.next = nil
+	}
+	reset()
+	go func(s *Strategy) {
+		var err error
+		for {
+		bucket:
+			c := s.current.Report()
+			for {
+				select {
+				case status, more := <-c:
+					if !more {
+						reset()
+						goto bucket
+					}
+					ratio := Ratio(status.Volume) / Ratio(status.Capacity)
+					if ratio >= s.ratio && s.next == nil {
+						s.next, err = s.ctrl.Next()
+						if errors.Is(err, ErrNoMoreBucket) {
+							return
+						}
+						if err != nil {

Review comment:
       if err is not nil, probably it is dangerous to proceed within the current loop?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#discussion_r790143814



##########
File path: banyand/tsdb/bucket/strategy.go
##########
@@ -0,0 +1,131 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bucket
+
+import (
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	ErrInvalidParameter = errors.New("parameters are invalid")
+	ErrNoMoreBucket     = errors.New("no more buckets")
+)
+
+type Ratio float64
+
+type Strategy struct {
+	optionsErr error
+	ratio      Ratio
+	ctrl       Controller
+	current    Reporter
+	next       Reporter
+	logger     *logger.Logger
+	stopCh     chan struct{}
+}
+
+type StrategyOptions func(*Strategy)
+
+func WithNextThreshold(ratio Ratio) StrategyOptions {
+	return func(s *Strategy) {
+		if ratio > 1.0 {
+			s.optionsErr = multierr.Append(s.optionsErr,
+				errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", ratio))
+			return
+		}
+		s.ratio = ratio
+	}
+}
+
+func WithLogger(logger *logger.Logger) StrategyOptions {
+	return func(s *Strategy) {
+		s.logger = logger
+	}
+}
+
+func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) {
+	if ctrl == nil {
+		return nil, errors.Wrap(ErrInvalidParameter, "controller is absent")
+	}
+	strategy := &Strategy{
+		ctrl:   ctrl,
+		ratio:  0.8,
+		stopCh: make(chan struct{}),
+	}
+	for _, opt := range options {
+		opt(strategy)
+	}
+	if strategy.optionsErr != nil {
+		return nil, strategy.optionsErr
+	}
+	if strategy.logger == nil {
+		strategy.logger = logger.GetLogger("bucket-strategy")
+	}
+	return strategy, nil
+}
+
+func (s *Strategy) Run() {
+	reset := func() {
+		for s.current == nil {
+			s.current = s.ctrl.Current()
+		}
+		s.next = nil
+	}
+	reset()
+	go func(s *Strategy) {
+		var err error
+		for {
+		bucket:

Review comment:
       I suppose a single `for-select` loop is enough for this case

##########
File path: banyand/tsdb/bucket/strategy.go
##########
@@ -0,0 +1,131 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bucket
+
+import (
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	ErrInvalidParameter = errors.New("parameters are invalid")
+	ErrNoMoreBucket     = errors.New("no more buckets")
+)
+
+type Ratio float64
+
+type Strategy struct {
+	optionsErr error
+	ratio      Ratio
+	ctrl       Controller
+	current    Reporter
+	next       Reporter
+	logger     *logger.Logger
+	stopCh     chan struct{}
+}
+
+type StrategyOptions func(*Strategy)
+
+func WithNextThreshold(ratio Ratio) StrategyOptions {
+	return func(s *Strategy) {
+		if ratio > 1.0 {
+			s.optionsErr = multierr.Append(s.optionsErr,
+				errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", ratio))
+			return
+		}
+		s.ratio = ratio
+	}
+}
+
+func WithLogger(logger *logger.Logger) StrategyOptions {
+	return func(s *Strategy) {
+		s.logger = logger
+	}
+}
+
+func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) {
+	if ctrl == nil {
+		return nil, errors.Wrap(ErrInvalidParameter, "controller is absent")
+	}
+	strategy := &Strategy{
+		ctrl:   ctrl,
+		ratio:  0.8,
+		stopCh: make(chan struct{}),
+	}
+	for _, opt := range options {
+		opt(strategy)
+	}
+	if strategy.optionsErr != nil {
+		return nil, strategy.optionsErr
+	}
+	if strategy.logger == nil {
+		strategy.logger = logger.GetLogger("bucket-strategy")
+	}
+	return strategy, nil
+}
+
+func (s *Strategy) Run() {
+	reset := func() {
+		for s.current == nil {
+			s.current = s.ctrl.Current()
+		}
+		s.next = nil
+	}
+	reset()
+	go func(s *Strategy) {
+		var err error
+		for {
+		bucket:
+			c := s.current.Report()
+			for {
+				select {
+				case status, more := <-c:

Review comment:
       ```suggestion
   				case status, closed := <-c:
   ```

##########
File path: banyand/tsdb/bucket/strategy.go
##########
@@ -0,0 +1,131 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bucket
+
+import (
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	ErrInvalidParameter = errors.New("parameters are invalid")
+	ErrNoMoreBucket     = errors.New("no more buckets")
+)
+
+type Ratio float64
+
+type Strategy struct {
+	optionsErr error
+	ratio      Ratio
+	ctrl       Controller
+	current    Reporter
+	next       Reporter
+	logger     *logger.Logger
+	stopCh     chan struct{}
+}
+
+type StrategyOptions func(*Strategy)
+
+func WithNextThreshold(ratio Ratio) StrategyOptions {
+	return func(s *Strategy) {
+		if ratio > 1.0 {
+			s.optionsErr = multierr.Append(s.optionsErr,
+				errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", ratio))
+			return
+		}
+		s.ratio = ratio
+	}
+}
+
+func WithLogger(logger *logger.Logger) StrategyOptions {
+	return func(s *Strategy) {
+		s.logger = logger
+	}
+}
+
+func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) {
+	if ctrl == nil {
+		return nil, errors.Wrap(ErrInvalidParameter, "controller is absent")
+	}
+	strategy := &Strategy{
+		ctrl:   ctrl,
+		ratio:  0.8,
+		stopCh: make(chan struct{}),
+	}
+	for _, opt := range options {
+		opt(strategy)
+	}
+	if strategy.optionsErr != nil {
+		return nil, strategy.optionsErr
+	}
+	if strategy.logger == nil {
+		strategy.logger = logger.GetLogger("bucket-strategy")
+	}
+	return strategy, nil
+}
+
+func (s *Strategy) Run() {
+	reset := func() {
+		for s.current == nil {
+			s.current = s.ctrl.Current()
+		}
+		s.next = nil
+	}
+	reset()
+	go func(s *Strategy) {
+		var err error
+		for {
+		bucket:
+			c := s.current.Report()
+			for {
+				select {
+				case status, more := <-c:
+					if !more {
+						reset()
+						goto bucket
+					}
+					ratio := Ratio(status.Volume) / Ratio(status.Capacity)
+					if ratio >= s.ratio && s.next == nil {
+						s.next, err = s.ctrl.Next()
+						if errors.Is(err, ErrNoMoreBucket) {
+							return
+						}
+						if err != nil {

Review comment:
       if err is not nil, probably it is dangerous to proceed within the current loop




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#discussion_r790397122



##########
File path: banyand/tsdb/bucket/strategy.go
##########
@@ -0,0 +1,131 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bucket
+
+import (
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	ErrInvalidParameter = errors.New("parameters are invalid")
+	ErrNoMoreBucket     = errors.New("no more buckets")
+)
+
+type Ratio float64
+
+type Strategy struct {
+	optionsErr error
+	ratio      Ratio
+	ctrl       Controller
+	current    Reporter
+	next       Reporter
+	logger     *logger.Logger
+	stopCh     chan struct{}
+}
+
+type StrategyOptions func(*Strategy)
+
+func WithNextThreshold(ratio Ratio) StrategyOptions {
+	return func(s *Strategy) {
+		if ratio > 1.0 {
+			s.optionsErr = multierr.Append(s.optionsErr,
+				errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", ratio))
+			return
+		}
+		s.ratio = ratio
+	}
+}
+
+func WithLogger(logger *logger.Logger) StrategyOptions {
+	return func(s *Strategy) {
+		s.logger = logger
+	}
+}
+
+func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) {
+	if ctrl == nil {
+		return nil, errors.Wrap(ErrInvalidParameter, "controller is absent")
+	}
+	strategy := &Strategy{
+		ctrl:   ctrl,
+		ratio:  0.8,
+		stopCh: make(chan struct{}),
+	}
+	for _, opt := range options {
+		opt(strategy)
+	}
+	if strategy.optionsErr != nil {
+		return nil, strategy.optionsErr
+	}
+	if strategy.logger == nil {
+		strategy.logger = logger.GetLogger("bucket-strategy")
+	}
+	return strategy, nil
+}
+
+func (s *Strategy) Run() {
+	reset := func() {
+		for s.current == nil {
+			s.current = s.ctrl.Current()
+		}
+		s.next = nil
+	}
+	reset()
+	go func(s *Strategy) {
+		var err error
+		for {
+		bucket:

Review comment:
       But the outer `for-loop` is not necessary?
   
   ```go
   var err error
   bucket:
   c := s.current.Report()
   for {
     select { ... }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#discussion_r790235172



##########
File path: banyand/tsdb/segment.go
##########
@@ -19,42 +19,74 @@ package tsdb
 
 import (
 	"context"
+	"strconv"
 	"sync"
 	"time"
 
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type segment struct {
-	path string
+	id     uint16
+	path   string
+	suffix string
 
 	lst         []*block
 	globalIndex kv.Store
 	sync.Mutex
-	l         *logger.Logger
-	startTime time.Time
-	endTime   time.Time
+	l              *logger.Logger
+	reporterStopCh chan struct{}
+	TimeRange
 }
 
-func (s *segment) contains(ts time.Time) bool {
-	greaterAndEqualStart := s.startTime.Equal(ts) || s.startTime.Before(ts)
-	if s.endTime.IsZero() {
-		return greaterAndEqualStart
+func (s *segment) Report() bucket.Channel {
+	ch := make(bucket.Channel)
+	interval := s.Duration() >> 4
+	if interval < 100*time.Millisecond {
+		interval = 100 * time.Millisecond
 	}
-	return greaterAndEqualStart && s.endTime.After(ts)
+	go func() {
+		defer close(ch)
+		for {
+			ticker := time.NewTicker(interval)
+			defer ticker.Stop()

Review comment:
       resource leak here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #70: Bucket strategy managment and segment controller

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #70:
URL: https://github.com/apache/skywalking-banyandb/pull/70#discussion_r790147292



##########
File path: banyand/tsdb/shard.go
##########
@@ -52,66 +54,234 @@ func (s *shard) Index() IndexDatabase {
 	return s.indexDatabase
 }
 
-func openShard(ctx context.Context, id common.ShardID, location string) (*shard, error) {
-	s := &shard{
-		id:       id,
-		location: location,
-	}
-	parentLogger := ctx.Value(logger.ContextKey)
-	if parentLogger != nil {
-		if pl, ok := parentLogger.(*logger.Logger); ok {
-			s.logger = pl.Named("shard" + strconv.Itoa(int(id)))
-		}
+func OpenShard(ctx context.Context, id common.ShardID, root string, intervalRule IntervalRule) (Shard, error) {
+	path, err := mkdir(shardTemplate, root, int(id))
+	if err != nil {
+		return nil, errors.Wrapf(err, "make the directory of the shard %d ", int(id))
 	}
-	loadSeg := func(path string) error {
-		seg, err := newSegment(ctx, path)
-		if err != nil {
-			return err
-		}
-		{
-			s.Lock()
-			defer s.Unlock()
-			s.lst = append(s.lst, seg)
-		}
-		s.logger.Info().Int("size", len(s.lst)).Msg("seg size")
-		return nil
+	l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id)))
+	l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a shard")
+	s := &shard{
+		id:                id,
+		segmentController: newSegmentController(path, intervalRule),
+		l:                 l,
 	}
-	err := walkDir(location, segPathPrefix, func(_, absolutePath string) error {
-		s.logger.Info().Str("path", absolutePath).Msg("loading a segment")
-		return loadSeg(absolutePath)
-	})
+	shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
+	err = s.segmentController.open(shardCtx)
 	if err != nil {
 		return nil, err
 	}
-	if len(s.lst) < 1 {
-		var segPath string
-		segPath, err = mkdir(segTemplate, location, time.Now().Format(segFormat))
-		if err != nil {
-			return nil, err
-		}
-		s.logger.Info().Str("path", segPath).Msg("creating a new segment")
-		err = loadSeg(segPath)
-		if err != nil {
-			return nil, err
-		}
-	}
-	seriesPath, err := mkdir(seriesTemplate, s.location)
+	seriesPath, err := mkdir(seriesTemplate, path)
 	if err != nil {
 		return nil, err
 	}
-	sdb, err := newSeriesDataBase(ctx, s.id, seriesPath, s.lst)
+	sdb, err := newSeriesDataBase(shardCtx, s.id, seriesPath, s.segmentController)
 	if err != nil {
 		return nil, err
 	}
 	s.seriesDatabase = sdb
-	idb, err := newIndexDatabase(ctx, s.id, s.lst)
+	idb, err := newIndexDatabase(shardCtx, s.id, s.segmentController)
 	if err != nil {
 		return nil, err
 	}
 	s.indexDatabase = idb
+	s.segmentManageStrategy, err = bucket.NewStrategy(s.segmentController, bucket.WithLogger(s.l))
+	if err != nil {
+		return nil, err
+	}
+	s.segmentManageStrategy.Run()
 	return s, nil
 }
 
 func (s *shard) Close() error {
+	s.segmentManageStrategy.Close()
+	s.segmentController.close()
 	return s.seriesDatabase.Close()
 }
+
+type IntervalUnit int
+
+const (
+	DAY IntervalUnit = iota
+	MONTH
+	YEAR
+	MILLISECOND // only for testing
+)
+
+func (iu IntervalUnit) String() string {
+	switch iu {
+	case DAY:
+		return "day"
+	case MONTH:
+		return "month"
+	case YEAR:
+		return "year"
+	case MILLISECOND:
+		return "millis"
+
+	}
+	panic("invalid interval unit")
+}
+
+func (iu IntervalUnit) Format(tm time.Time) string {
+	switch iu {
+	case DAY:
+		return tm.Format(segDayFormat)
+	case MONTH:
+		return tm.Format(segMonthFormat)
+	case YEAR:
+		return tm.Format(segYearFormat)
+	case MILLISECOND:
+		return tm.Format(segMillisecondFormat)
+	}
+	panic("invalid interval unit")
+}
+
+func (iu IntervalUnit) Parse(value string) (time.Time, error) {
+	switch iu {
+	case DAY:
+		return time.Parse(segDayFormat, value)
+	case MONTH:
+		return time.Parse(segMonthFormat, value)
+	case YEAR:
+		return time.Parse(segYearFormat, value)
+	case MILLISECOND:
+		return time.Parse(segMillisecondFormat, value)
+	}
+	panic("invalid interval unit")
+}
+
+type IntervalRule struct {
+	Unit IntervalUnit
+	Num  int
+}
+
+func (ir IntervalRule) NextTime(current time.Time) time.Time {
+	switch ir.Unit {
+	case DAY:
+		return current.AddDate(0, 0, ir.Num)
+	case MONTH:
+		return current.AddDate(0, ir.Num, 0)
+	case YEAR:
+		return current.AddDate(ir.Num, 0, 0)
+	case MILLISECOND:
+		return current.Add(time.Millisecond * time.Duration(ir.Num))
+	}
+	panic("invalid interval unit")
+}
+
+type segmentController struct {
+	sync.RWMutex
+	location     string
+	intervalRule IntervalRule
+	lst          []*segment
+}
+
+func newSegmentController(location string, intervalRule IntervalRule) *segmentController {
+	return &segmentController{
+		location:     location,
+		intervalRule: intervalRule,
+	}
+}
+
+func (sc *segmentController) get(segID uint16) *segment {
+	sc.RLock()
+	defer sc.RUnlock()
+	last := len(sc.lst) - 1
+	for i := range sc.lst {
+		s := sc.lst[last-i]
+		if s.id == segID {
+			return s
+		}
+	}
+	return nil
+}
+
+func (sc *segmentController) span(timeRange TimeRange) (ss []*segment) {
+	sc.RLock()
+	defer sc.RUnlock()
+	last := len(sc.lst) - 1
+	for i := range sc.lst {
+		s := sc.lst[last-i]
+		if s.Overlapping(timeRange) {
+			ss = append(ss, s)
+		}
+	}
+	return ss
+}
+
+func (sc *segmentController) segments() (ss []*segment) {
+	sc.RLock()
+	defer sc.RUnlock()
+	r := make([]*segment, len(sc.lst))
+	copy(r, sc.lst)
+	return r
+}
+
+func (sc *segmentController) Current() bucket.Reporter {
+	sc.RLock()
+	defer sc.RUnlock()
+	now := time.Now()
+	for _, s := range sc.lst {
+		if s.suffix == sc.intervalRule.Unit.Format(now) {
+			return s
+		}
+	}
+	// return the latest segment before now
+	if len(sc.lst) > 0 {
+		return sc.lst[len(sc.lst)-1]
+	}
+	return nil
+}
+
+func (sc *segmentController) Next() (bucket.Reporter, error) {
+	return sc.create(context.TODO(), sc.intervalRule.Unit.Format(
+		sc.intervalRule.NextTime(time.Now())))
+}
+
+func (sc *segmentController) open(ctx context.Context) error {
+	err := walkDir(
+		sc.location,
+		segPathPrefix,
+		func(suffix, absolutePath string) error {
+			_, err := sc.load(ctx, suffix, absolutePath)
+			return err
+		})
+	if err != nil {
+		return err
+	}
+	if sc.Current() == nil {
+		_, err = sc.create(ctx, sc.intervalRule.Unit.Format(time.Now()))
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (sc *segmentController) create(ctx context.Context, suffix string) (*segment, error) {
+	segPath, err := mkdir(segTemplate, sc.location, suffix)
+	if err != nil {
+		return nil, err
+	}
+	return sc.load(ctx, suffix, segPath)
+}
+
+func (sc *segmentController) load(ctx context.Context, suffix, path string) (seg *segment, err error) {
+	seg, err = openSegment(ctx, suffix, path, sc.intervalRule)
+	if err != nil {
+		return nil, err
+	}
+	{

Review comment:
       unnecessary pair of braces?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org