You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/01/22 12:23:44 UTC

[skywalking-banyandb] 01/01: init bucket Add segment controller

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch tsdb-bucket
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 90b75f214e0ad7de59b22faac7f311b5b50e4667
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Jan 17 11:56:58 2022 +0000

    init bucket
    Add segment controller
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/measure/query_test.go                      |   2 +-
 banyand/measure/write.go                           |   2 +-
 banyand/query/processor_test.go                    |   9 +-
 banyand/stream/stream_query.go                     |   2 +-
 banyand/stream/stream_query_test.go                |  21 +-
 banyand/stream/stream_write.go                     |   2 +-
 banyand/tsdb/block.go                              |  14 +-
 .../logger.go => banyand/tsdb/bucket/bucket.go     |  39 +--
 .../tsdb/bucket/bucket_suite_test.go               |  47 ++--
 banyand/tsdb/bucket/strategy.go                    | 131 ++++++++++
 banyand/tsdb/bucket/strategy_test.go               | 141 +++++++++++
 banyand/tsdb/indexdb.go                            |  45 ++--
 banyand/tsdb/segment.go                            |  74 ++++--
 banyand/tsdb/series.go                             |  59 ++++-
 banyand/tsdb/series_seek_sort.go                   |   2 +-
 banyand/tsdb/series_test.go                        | 132 ++++++++++
 banyand/tsdb/seriesdb.go                           |  35 ++-
 banyand/tsdb/shard.go                              | 266 +++++++++++++++++----
 banyand/tsdb/shard_test.go                         |  70 ++++++
 banyand/tsdb/tsdb.go                               |  55 +++--
 .../logger.go => banyand/tsdb/tsdb_suite_test.go   |  46 ++--
 banyand/tsdb/tsdb_test.go                          |   2 +-
 pkg/logger/logger.go                               |  12 +
 pkg/query/logical/common.go                        |   4 +-
 pkg/query/logical/plan_indexscan_local.go          |   2 +-
 25 files changed, 948 insertions(+), 266 deletions(-)

diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 31638db..60b8bf2 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -37,7 +37,7 @@ func Test_ParseTag_And_ParseField(t *testing.T) {
 	r.NoError(err)
 	series, err := shard.Series().Get(tsdb.Entity{tsdb.Entry("1")})
 	r.NoError(err)
-	seriesSpan, err := series.Span(tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour))
+	seriesSpan, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour))
 	defer func(seriesSpan tsdb.SeriesSpan) {
 		_ = seriesSpan.Close()
 	}(seriesSpan)
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index b1dc724..b0f9854 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -78,7 +78,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
 		return err
 	}
 	t := value.GetTimestamp().AsTime()
-	wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
+	wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
 			_ = wp.Close()
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index dae4951..d3a659a 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -377,8 +377,13 @@ func TestQueryProcessor(t *testing.T) {
 			singleTester.NoError(err)
 			singleTester.NotNil(msg)
 			// TODO: better error response
-			singleTester.NotNil(msg.Data())
-			singleTester.Len(msg.Data(), tt.wantLen)
+			var dataLen int
+			if msg.Data() == nil {
+				dataLen = 0
+			} else {
+				dataLen = len(msg.Data().([]*streamv1.Element))
+			}
+			singleTester.Equal(dataLen, tt.wantLen)
 			if tt.checker != nil {
 				singleTester.True(tt.checker(msg.Data().([]*streamv1.Element)))
 			}
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index dfca4f4..6913b4a 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -20,8 +20,8 @@ package stream
 import (
 	"io"
 
-	"github.com/golang/protobuf/proto"
 	"github.com/pkg/errors"
+	"google.golang.org/protobuf/proto"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index f77cfd6..2533ef4 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -20,7 +20,6 @@ package stream
 import (
 	"bytes"
 	"embed"
-	_ "embed"
 	"encoding/base64"
 	"encoding/json"
 	"fmt"
@@ -107,7 +106,7 @@ func Test_Stream_Series(t *testing.T) {
 			name: "all",
 			args: queryOpts{
 				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
-				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
 			},
 			want: shardsForTest{
 				{
@@ -137,7 +136,7 @@ func Test_Stream_Series(t *testing.T) {
 			name: "time range",
 			args: queryOpts{
 				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
-				timeRange: tsdb.NewTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 1*time.Hour),
+				timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 1*time.Hour),
 			},
 			want: shardsForTest{
 				{
@@ -164,7 +163,7 @@ func Test_Stream_Series(t *testing.T) {
 			name: "find series by service_id and instance_id",
 			args: queryOpts{
 				entity:    tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
-				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
 			},
 			want: shardsForTest{
 				{
@@ -183,7 +182,7 @@ func Test_Stream_Series(t *testing.T) {
 			name: "find a series",
 			args: queryOpts{
 				entity:    tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
-				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
 			},
 			want: shardsForTest{
 				{
@@ -197,7 +196,7 @@ func Test_Stream_Series(t *testing.T) {
 			name: "filter",
 			args: queryOpts{
 				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
-				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
 				buildFn: func(builder tsdb.SeekerBuilder) {
 					builder.Filter(&databasev1.IndexRule{
 						Metadata: &commonv1.Metadata{
@@ -243,7 +242,7 @@ func Test_Stream_Series(t *testing.T) {
 			name: "order by duration",
 			args: queryOpts{
 				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
-				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
 				buildFn: func(builder tsdb.SeekerBuilder) {
 					builder.OrderByIndex(&databasev1.IndexRule{
 						Metadata: &commonv1.Metadata{
@@ -284,7 +283,7 @@ func Test_Stream_Series(t *testing.T) {
 			name: "filter by duration",
 			args: queryOpts{
 				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
-				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
 				buildFn: func(builder tsdb.SeekerBuilder) {
 					rule := &databasev1.IndexRule{
 						Metadata: &commonv1.Metadata{
@@ -331,7 +330,7 @@ func Test_Stream_Series(t *testing.T) {
 			name: "filter and sort by duration",
 			args: queryOpts{
 				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
-				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
 				buildFn: func(builder tsdb.SeekerBuilder) {
 					rule := &databasev1.IndexRule{
 						Metadata: &commonv1.Metadata{
@@ -379,7 +378,7 @@ func Test_Stream_Series(t *testing.T) {
 			name: "filter by several conditions",
 			args: queryOpts{
 				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
-				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
 				buildFn: func(builder tsdb.SeekerBuilder) {
 					rule := &databasev1.IndexRule{
 						Metadata: &commonv1.Metadata{
@@ -442,7 +441,7 @@ func Test_Stream_Series(t *testing.T) {
 			name: "filter by several conditions, sort by duration",
 			args: queryOpts{
 				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
-				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
 				buildFn: func(builder tsdb.SeekerBuilder) {
 					rule := &databasev1.IndexRule{
 						Metadata: &commonv1.Metadata{
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index d35d088..7fef502 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -71,7 +71,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
 		return err
 	}
 	t := value.GetTimestamp().AsTime()
-	wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
+	wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
 			_ = wp.Close()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 956bf13..c5a2b98 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -23,11 +23,11 @@ import (
 	"time"
 
 	"github.com/dgraph-io/ristretto/z"
-	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/inverted"
 	"github.com/apache/skywalking-banyandb/pkg/index/lsm"
@@ -63,16 +63,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 		path:      opts.path,
 		ref:       z.NewCloser(1),
 		startTime: time.Now(),
-	}
-	parentLogger := ctx.Value(logger.ContextKey)
-	if parentLogger != nil {
-		if pl, ok := parentLogger.(*logger.Logger); ok {
-			b.l = pl.Named("block")
-		}
+		l:         logger.Fetch(ctx, "block"),
 	}
 	encodingMethodObject := ctx.Value(encodingMethodKey)
 	if encodingMethodObject == nil {
-		return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to create a block")
+		encodingMethodObject = EncodingMethod{
+			EncoderPool: encoding.NewPlainEncoderPool(0),
+			DecoderPool: encoding.NewPlainDecoderPool(0),
+		}
 	}
 	encodingMethod := encodingMethodObject.(EncodingMethod)
 	if b.store, err = kv.OpenTimeSeriesStore(
diff --git a/pkg/logger/logger.go b/banyand/tsdb/bucket/bucket.go
similarity index 53%
copy from pkg/logger/logger.go
copy to banyand/tsdb/bucket/bucket.go
index 36896ed..82a76ec 100644
--- a/pkg/logger/logger.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -15,39 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package logger
+package bucket
 
-import (
-	"strings"
-
-	"github.com/pkg/errors"
-	"github.com/rs/zerolog"
-)
-
-var ContextKey = contextKey{}
-var ErrNoLoggerInContext = errors.New("no logger in context")
-
-type contextKey struct{}
-
-// Logging is the config info
-type Logging struct {
-	Env   string
-	Level string
+type Controller interface {
+	Current() Reporter
+	Next() (Reporter, error)
 }
 
-// Logger is wrapper for rs/zerolog logger with module, it is singleton.
-type Logger struct {
-	module string
-	*zerolog.Logger
+type Status struct {
+	Capacity int
+	Volume   int
 }
 
-func (l *Logger) Named(name string) *Logger {
-	module := strings.Join([]string{l.module, name}, ".")
-	subLogger := root.Logger.With().Str("module", module).Logger()
-	return &Logger{module: module, Logger: &subLogger}
-}
+type Channel chan Status
 
-// Loggable indicates the implement supports logging
-type Loggable interface {
-	SetLogger(*Logger)
+type Reporter interface {
+	Report() Channel
 }
diff --git a/pkg/logger/logger.go b/banyand/tsdb/bucket/bucket_suite_test.go
similarity index 53%
copy from pkg/logger/logger.go
copy to banyand/tsdb/bucket/bucket_suite_test.go
index 36896ed..0ab7ff8 100644
--- a/pkg/logger/logger.go
+++ b/banyand/tsdb/bucket/bucket_suite_test.go
@@ -14,40 +14,25 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-package logger
+//
+package bucket_test
 
 import (
-	"strings"
-
-	"github.com/pkg/errors"
-	"github.com/rs/zerolog"
-)
-
-var ContextKey = contextKey{}
-var ErrNoLoggerInContext = errors.New("no logger in context")
+	"testing"
 
-type contextKey struct{}
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
 
-// Logging is the config info
-type Logging struct {
-	Env   string
-	Level string
-}
-
-// Logger is wrapper for rs/zerolog logger with module, it is singleton.
-type Logger struct {
-	module string
-	*zerolog.Logger
-}
-
-func (l *Logger) Named(name string) *Logger {
-	module := strings.Join([]string{l.module, name}, ".")
-	subLogger := root.Logger.With().Str("module", module).Logger()
-	return &Logger{module: module, Logger: &subLogger}
-}
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
 
-// Loggable indicates the implement supports logging
-type Loggable interface {
-	SetLogger(*Logger)
+func TestBucket(t *testing.T) {
+	RegisterFailHandler(Fail)
+	BeforeSuite(func() {
+		Expect(logger.Init(logger.Logging{
+			Env:   "dev",
+			Level: "debug",
+		})).Should(Succeed())
+	})
+	RunSpecs(t, "Bucket Suite")
 }
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
new file mode 100644
index 0000000..d1c2ea3
--- /dev/null
+++ b/banyand/tsdb/bucket/strategy.go
@@ -0,0 +1,131 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bucket
+
+import (
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	ErrInvalidParameter = errors.New("parameters are invalid")
+	ErrNoMoreBucket     = errors.New("no more buckets")
+)
+
+type Ratio float64
+
+type Strategy struct {
+	optionsErr error
+	ratio      Ratio
+	ctrl       Controller
+	current    Reporter
+	next       Reporter
+	logger     *logger.Logger
+	stopCh     chan struct{}
+}
+
+type StrategyOptions func(*Strategy)
+
+func WithNextThreshold(ratio Ratio) StrategyOptions {
+	return func(s *Strategy) {
+		if ratio > 1.0 {
+			s.optionsErr = multierr.Append(s.optionsErr,
+				errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", ratio))
+			return
+		}
+		s.ratio = ratio
+	}
+}
+
+func WithLogger(logger *logger.Logger) StrategyOptions {
+	return func(s *Strategy) {
+		s.logger = logger
+	}
+}
+
+func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) {
+	if ctrl == nil {
+		return nil, errors.Wrap(ErrInvalidParameter, "controller is absent")
+	}
+	strategy := &Strategy{
+		ctrl:   ctrl,
+		ratio:  0.8,
+		stopCh: make(chan struct{}),
+	}
+	for _, opt := range options {
+		opt(strategy)
+	}
+	if strategy.optionsErr != nil {
+		return nil, strategy.optionsErr
+	}
+	if strategy.logger == nil {
+		strategy.logger = logger.GetLogger("bucket-strategy")
+	}
+	return strategy, nil
+}
+
+func (s *Strategy) Run() {
+	reset := func() {
+		for s.current == nil {
+			s.current = s.ctrl.Current()
+		}
+		s.next = nil
+	}
+	reset()
+	go func(s *Strategy) {
+		var err error
+		for {
+		bucket:
+			c := s.current.Report()
+			for {
+				select {
+				case status, more := <-c:
+					if !more {
+						reset()
+						goto bucket
+					}
+					ratio := Ratio(status.Volume) / Ratio(status.Capacity)
+					if ratio >= s.ratio && s.next == nil {
+						s.next, err = s.ctrl.Next()
+						if errors.Is(err, ErrNoMoreBucket) {
+							return
+						}
+						if err != nil {
+							s.logger.Err(err).Msg("failed to create the next bucket")
+						}
+					}
+					if ratio >= 1.0 {
+						s.current = s.next
+						s.next = nil
+						goto bucket
+					}
+				case <-s.stopCh:
+					return
+				}
+			}
+
+		}
+
+	}(s)
+}
+
+func (s *Strategy) Close() {
+	close(s.stopCh)
+}
diff --git a/banyand/tsdb/bucket/strategy_test.go b/banyand/tsdb/bucket/strategy_test.go
new file mode 100644
index 0000000..3a302ce
--- /dev/null
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -0,0 +1,141 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package bucket_test
+
+import (
+	"sync"
+
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
+
+	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
+)
+
+var _ = Describe("Strategy", func() {
+	Context("Applying the strategy", func() {
+		var strategy *bucket.Strategy
+		It("uses the golden settings", func() {
+			ctrl := newController(2, 1, 10)
+			var err error
+			strategy, err = bucket.NewStrategy(ctrl)
+			Expect(err).NotTo(HaveOccurred())
+			strategy.Run()
+			Eventually(ctrl.isFull).Should(BeTrue())
+		})
+		It("never reaches the limit", func() {
+			ctrl := newController(1, 0, 10)
+			var err error
+			strategy, err = bucket.NewStrategy(ctrl)
+			Expect(err).NotTo(HaveOccurred())
+			strategy.Run()
+			Consistently(ctrl.isFull).ShouldNot(BeTrue())
+		})
+		It("exceeds the limit", func() {
+			ctrl := newController(2, 3, 10)
+			var err error
+			strategy, err = bucket.NewStrategy(ctrl)
+			Expect(err).NotTo(HaveOccurred())
+			strategy.Run()
+			Eventually(ctrl.isFull).Should(BeTrue())
+		})
+		It("'s first step exceeds the limit", func() {
+			ctrl := newController(2, 15, 10)
+			var err error
+			strategy, err = bucket.NewStrategy(ctrl)
+			Expect(err).NotTo(HaveOccurred())
+			strategy.Run()
+			Eventually(ctrl.isFull).Should(BeTrue())
+		})
+		AfterEach(func() {
+			if strategy != nil {
+				strategy.Close()
+			}
+		})
+	})
+	Context("Invalid parameter", func() {
+		It("passes a ratio > 1.0", func() {
+			ctrl := newController(2, 3, 10)
+			_, err := bucket.NewStrategy(ctrl, bucket.WithNextThreshold(1.1))
+			Expect(err).To(MatchError(bucket.ErrInvalidParameter))
+		})
+	})
+})
+
+type controller struct {
+	maxBuckets  int
+	usedBuckets int
+
+	reporter *reporter
+	capacity int
+	step     int
+	mux      sync.RWMutex
+}
+
+func newController(maxBuckets, step, capacity int) *controller {
+	ctrl := &controller{step: step, maxBuckets: maxBuckets, capacity: capacity}
+	ctrl.newReporter()
+	return ctrl
+}
+
+func (c *controller) Next() (bucket.Reporter, error) {
+	c.mux.Lock()
+	defer c.mux.Unlock()
+	if c.usedBuckets >= c.maxBuckets {
+		return nil, bucket.ErrNoMoreBucket
+	}
+	c.usedBuckets++
+	c.newReporter()
+	return c.reporter, nil
+}
+
+func (c *controller) Current() bucket.Reporter {
+	c.mux.RLock()
+	defer c.mux.RUnlock()
+	return c.reporter
+}
+
+func (c *controller) newReporter() {
+	c.reporter = &reporter{step: c.step, capacity: c.capacity}
+}
+
+func (c *controller) isFull() bool {
+	c.mux.RLock()
+	defer c.mux.RUnlock()
+	return c.usedBuckets >= c.maxBuckets
+}
+
+type reporter struct {
+	capacity int
+	volume   int
+	step     int
+}
+
+func (r *reporter) Report() bucket.Channel {
+	ch := make(bucket.Channel)
+	go func() {
+		for i := 0; i < r.capacity; i++ {
+			r.volume += r.step
+			ch <- bucket.Status{
+				Capacity: r.capacity,
+				Volume:   r.volume,
+			}
+		}
+		close(ch)
+	}()
+	return ch
+}
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index dcd6123..741ba85 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -51,7 +51,7 @@ var _ IndexDatabase = (*indexDB)(nil)
 
 type indexDB struct {
 	shardID common.ShardID
-	lst     []*segment
+	segCtrl *segmentController
 }
 
 func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
@@ -60,36 +60,38 @@ func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
 	if err != nil {
 		return nil, err
 	}
-	err = i.lst[0].globalIndex.GetAll(f, func(rawBytes []byte) error {
-		id := &GlobalItemID{}
-		errUnMarshal := id.UnMarshal(rawBytes)
-		if errUnMarshal != nil {
-			return errUnMarshal
+	for _, s := range i.segCtrl.segments() {
+		err = s.globalIndex.GetAll(f, func(rawBytes []byte) error {
+			id := &GlobalItemID{}
+			errUnMarshal := id.UnMarshal(rawBytes)
+			if errUnMarshal != nil {
+				return errUnMarshal
+			}
+			result = append(result, *id)
+			return nil
+		})
+		if err == kv.ErrKeyNotFound {
+			return result, nil
 		}
-		result = append(result, *id)
-		return nil
-	})
-	if err == kv.ErrKeyNotFound {
-		return result, nil
 	}
 	return result, err
 }
 
 func (i *indexDB) WriterBuilder() IndexWriterBuilder {
-	return newIndexWriterBuilder(i.lst)
+	return newIndexWriterBuilder(i.segCtrl)
 }
 
-func newIndexDatabase(_ context.Context, id common.ShardID, lst []*segment) (IndexDatabase, error) {
+func newIndexDatabase(_ context.Context, id common.ShardID, segCtrl *segmentController) (IndexDatabase, error) {
 	return &indexDB{
 		shardID: id,
-		lst:     lst,
+		segCtrl: segCtrl,
 	}, nil
 }
 
 var _ IndexWriterBuilder = (*indexWriterBuilder)(nil)
 
 type indexWriterBuilder struct {
-	segments     []*segment
+	segCtrl      *segmentController
 	ts           time.Time
 	seg          *segment
 	globalItemID *GlobalItemID
@@ -97,12 +99,11 @@ type indexWriterBuilder struct {
 
 func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder {
 	i.ts = ts
-	for _, s := range i.segments {
-		if s.contains(ts) {
-			i.seg = s
-			break
-		}
+	segs := i.segCtrl.span(NewTimeRangeDuration(ts, 0, true, false))
+	if len(segs) != 1 {
+		return i
 	}
+	i.seg = segs[0]
 	return i
 }
 
@@ -125,9 +126,9 @@ func (i *indexWriterBuilder) Build() (IndexWriter, error) {
 	}, nil
 }
 
-func newIndexWriterBuilder(segments []*segment) IndexWriterBuilder {
+func newIndexWriterBuilder(segCtrl *segmentController) IndexWriterBuilder {
 	return &indexWriterBuilder{
-		segments: segments,
+		segCtrl: segCtrl,
 	}
 }
 
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 6a8305e..9fd89ab 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -19,42 +19,74 @@ package tsdb
 
 import (
 	"context"
+	"strconv"
 	"sync"
 	"time"
 
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type segment struct {
-	path string
+	id     uint16
+	path   string
+	suffix string
 
 	lst         []*block
 	globalIndex kv.Store
 	sync.Mutex
-	l         *logger.Logger
-	startTime time.Time
-	endTime   time.Time
+	l              *logger.Logger
+	reporterStopCh chan struct{}
+	TimeRange
 }
 
-func (s *segment) contains(ts time.Time) bool {
-	greaterAndEqualStart := s.startTime.Equal(ts) || s.startTime.Before(ts)
-	if s.endTime.IsZero() {
-		return greaterAndEqualStart
+func (s *segment) Report() bucket.Channel {
+	ch := make(bucket.Channel)
+	interval := s.Duration() >> 4
+	if interval < 100*time.Millisecond {
+		interval = 100 * time.Millisecond
 	}
-	return greaterAndEqualStart && s.endTime.After(ts)
+	go func() {
+		defer close(ch)
+		for {
+			ticker := time.NewTicker(interval)
+			defer ticker.Stop()
+			select {
+			case <-ticker.C:
+				status := bucket.Status{
+					Capacity: int(s.End.UnixNano() - s.Start.UnixNano()),
+					Volume:   int(time.Now().UnixNano() - s.Start.UnixNano()),
+				}
+				ch <- status
+				if status.Volume >= status.Capacity {
+					return
+				}
+			case <-s.reporterStopCh:
+				return
+			}
+		}
+	}()
+	return ch
 }
 
-func newSegment(ctx context.Context, path string) (s *segment, err error) {
-	s = &segment{
-		path:      path,
-		startTime: time.Now(),
+func openSegment(ctx context.Context, suffix, path string, intervalRule IntervalRule) (s *segment, err error) {
+	startTime, err := intervalRule.Unit.Parse(suffix)
+	if err != nil {
+		return nil, err
 	}
-	parentLogger := ctx.Value(logger.ContextKey)
-	if parentLogger != nil {
-		if pl, ok := parentLogger.(*logger.Logger); ok {
-			s.l = pl.Named("segment")
-		}
+	suffixInteger, err := strconv.Atoi(suffix)
+	if err != nil {
+		return nil, err
+	}
+	id := uint16(intervalRule.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
+	s = &segment{
+		id:             id,
+		path:           path,
+		suffix:         suffix,
+		l:              logger.Fetch(ctx, "segment"),
+		reporterStopCh: make(chan struct{}),
+		TimeRange:      NewTimeRange(startTime, intervalRule.NextTime(startTime), true, false),
 	}
 	indexPath, err := mkdir(globalIndexTemplate, path)
 	if err != nil {
@@ -66,7 +98,8 @@ func newSegment(ctx context.Context, path string) (s *segment, err error) {
 	loadBlock := func(path string) error {
 		var b *block
 		if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, s.l), blockOpts{
-			path: path,
+			segID: s.id,
+			path:  path,
 		}); err != nil {
 			return err
 		}
@@ -77,7 +110,7 @@ func newSegment(ctx context.Context, path string) (s *segment, err error) {
 		}
 		return nil
 	}
-	err = walkDir(path, blockPathPrefix, func(name, absolutePath string) error {
+	err = walkDir(path, blockPathPrefix, func(_, absolutePath string) error {
 		return loadBlock(absolutePath)
 	})
 	if err != nil {
@@ -103,4 +136,5 @@ func (s *segment) close() {
 		b.close()
 	}
 	s.globalIndex.Close()
+	close(s.reporterStopCh)
 }
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index c9ec07b..5cc9805 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -34,6 +34,7 @@ import (
 var (
 	ErrEmptySeriesSpan = errors.New("there is no data in such time range")
 	ErrItemIDMalformed = errors.New("serialized item id is malformed")
+	ErrBlockAbsent     = errors.New("block is absent")
 )
 
 type GlobalItemID struct {
@@ -72,32 +73,63 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
 }
 
 type TimeRange struct {
-	Start time.Time
-	End   time.Time
+	Start        time.Time
+	End          time.Time
+	IncludeStart bool
+	IncludeEnd   bool
 }
 
-func (t TimeRange) contains(unixNano uint64) bool {
+func (t TimeRange) Contains(unixNano uint64) bool {
 	tp := time.Unix(0, int64(unixNano))
-	if tp.Equal(t.End) || tp.After(t.End) {
-		return false
+	if t.Start.Equal(tp) {
+		return t.IncludeStart
 	}
-	return tp.Equal(t.Start) || tp.After(t.Start)
+	if t.End.Equal(tp) {
+		return t.IncludeEnd
+	}
+	return !tp.Before(t.Start) && !tp.After(t.End)
+}
+
+func (t TimeRange) Overlapping(other TimeRange) bool {
+	if t.Start.Equal(other.End) {
+		return t.IncludeStart && other.IncludeEnd
+	}
+	if other.Start.Equal(t.End) {
+		return t.IncludeEnd && other.IncludeStart
+	}
+	return !t.Start.After(other.End) && !other.Start.After(t.End)
+}
+
+func (t TimeRange) Duration() time.Duration {
+	return t.End.Sub(t.Start)
 }
 
-func NewTimeRange(Start, End time.Time) TimeRange {
+func NewInclusiveTimeRange(start, end time.Time) TimeRange {
 	return TimeRange{
-		Start: Start,
-		End:   End,
+		Start:        start,
+		End:          end,
+		IncludeStart: true,
+		IncludeEnd:   true,
 	}
 }
 
-func NewTimeRangeDuration(Start time.Time, Duration time.Duration) TimeRange {
+func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration) TimeRange {
+	return NewTimeRangeDuration(start, duration, true, true)
+}
+
+func NewTimeRange(start, end time.Time, includeStart, includeEnd bool) TimeRange {
 	return TimeRange{
-		Start: Start,
-		End:   Start.Add(Duration),
+		Start:        start,
+		End:          end,
+		IncludeStart: includeStart,
+		IncludeEnd:   includeEnd,
 	}
 }
 
+func NewTimeRangeDuration(start time.Time, duration time.Duration, includeStart, includeEnd bool) TimeRange {
+	return NewTimeRange(start, start.Add(duration), includeStart, includeEnd)
+}
+
 type Series interface {
 	ID() common.SeriesID
 	Span(timeRange TimeRange) (SeriesSpan, error)
@@ -121,6 +153,9 @@ type series struct {
 
 func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
 	b := s.blockDB.block(id)
+	if b == nil {
+		return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", id)
+	}
 	return &item{
 		data:     b.dataReader(),
 		itemID:   id.ID,
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 6182d96..38b733b 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -56,7 +56,7 @@ func (s *seekerBuilder) buildSeries(conditions []condWithIRT) ([]Iterator, error
 
 func (s *seekerBuilder) buildSeriesByIndex(conditions []condWithIRT) (series []Iterator, err error) {
 	timeFilter := func(item Item) bool {
-		valid := s.seriesSpan.timeRange.contains(item.Time())
+		valid := s.seriesSpan.timeRange.Contains(item.Time())
 		timeRange := s.seriesSpan.timeRange
 		s.seriesSpan.l.Trace().
 			Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
diff --git a/banyand/tsdb/series_test.go b/banyand/tsdb/series_test.go
new file mode 100644
index 0000000..25a7df6
--- /dev/null
+++ b/banyand/tsdb/series_test.go
@@ -0,0 +1,132 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package tsdb_test
+
+import (
+	"fmt"
+	"time"
+
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
+
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+)
+
+var _ = Describe("Series", func() {
+	Context("TimeRange", func() {
+		Context("Contains", func() {
+			verifyFn := func(start, end string, includeStart, includeEnd bool, ts string, expected bool) {
+				startTime, _ := time.Parse("20060202", start)
+				endTime, _ := time.Parse("20060202", end)
+				tsTime, _ := time.Parse("20060202", ts)
+				Expect(tsdb.NewTimeRange(startTime, endTime, includeStart, includeEnd).Contains(uint64(tsTime.UnixNano()))).To(Equal(expected))
+			}
+			DescribeTable("It's a exclusive range",
+				func(start, end, ts string, expected bool) {
+					verifyFn(start, end, false, false, ts, expected)
+				},
+				Entry("is in the middle", "20220205", "20220107", "20220106", true),
+				Entry("is at the lower", "20220205", "20220107", "20220105", false),
+				Entry("is at the upper", "20220205", "20220107", "20220107", false),
+				Entry("is before the lower", "20220205", "20220107", "20220104", false),
+				Entry("is after the upper", "20220205", "20220107", "20220108", false),
+			)
+			DescribeTable("It's a inclusive range",
+				func(start, end, ts string, expected bool) {
+					verifyFn(start, end, true, true, ts, expected)
+				},
+				Entry("is in the middle", "20220205", "20220107", "20220106", true),
+				Entry("is at the lower", "20220205", "20220107", "20220105", true),
+				Entry("is at the upper", "20220205", "20220107", "20220107", true),
+				Entry("is before the lower", "20220205", "20220107", "20220104", false),
+				Entry("is after the upper", "20220205", "20220107", "20220108", false),
+			)
+			DescribeTable("It's a inclusive lower and exclusive upper range",
+				func(start, end, ts string, expected bool) {
+					verifyFn(start, end, true, false, ts, expected)
+				},
+				Entry("is in the middle", "20220205", "20220107", "20220106", true),
+				Entry("is at the lower", "20220205", "20220107", "20220105", true),
+				Entry("is at the upper", "20220205", "20220107", "20220107", false),
+				Entry("is before the lower", "20220205", "20220107", "20220104", false),
+				Entry("is after the upper", "20220205", "20220107", "20220108", false),
+			)
+			DescribeTable("It's a exclusive lower and inclusive upper range",
+				func(start, end, ts string, expected bool) {
+					verifyFn(start, end, false, true, ts, expected)
+				},
+				Entry("is in the middle", "20220205", "20220107", "20220106", true),
+				Entry("is at the lower", "20220205", "20220107", "20220105", false),
+				Entry("is at the upper", "20220205", "20220107", "20220107", true),
+				Entry("is before the lower", "20220205", "20220107", "20220104", false),
+				Entry("is after the upper", "20220205", "20220107", "20220108", false),
+			)
+		})
+		Context("Overlapping", func() {
+			verifyFn := func(start1, end1, start2, end2 string, expected bool) {
+				startTime1, _ := time.Parse("20060102", start1)
+				endTime1, _ := time.Parse("20060102", end1)
+				startTime2, _ := time.Parse("20060102", start2)
+				endTime2, _ := time.Parse("20060102", end2)
+				includes := []bool{true, false}
+
+				for _, r1l := range includes {
+					for _, r1u := range includes {
+						for _, r2l := range includes {
+							for _, r2u := range includes {
+								By(fmt.Sprintf("r1 lower:%v upper:%v. r1 lower:%v upper:%v", r1l, r1u, r2l, r2u), func() {
+									r1 := tsdb.NewTimeRange(startTime1, endTime1, r1l, r1u)
+									r2 := tsdb.NewTimeRange(startTime2, endTime2, r2l, r2u)
+									Expect(r1.Overlapping(r2)).To(Equal(expected))
+									Expect(r2.Overlapping(r1)).To(Equal(expected))
+								})
+							}
+						}
+					}
+				}
+			}
+			DescribeTable("Each range type's behavior is identical",
+				verifyFn,
+				Entry("is no overlapping", "20220205", "20220107", "20220108", "20220112", false),
+				Entry("is the two range are identical", "20220105", "20220107", "20220105", "20220107", true),
+				Entry("is the one includes the other", "20220102", "20220107", "20220103", "20220106", true),
+				Entry("is the one includes the other, the upper bounds are identical", "20220102", "20220107", "20220103", "20220107", true),
+				Entry("is the one includes the other, the lower bounds are identical", "20220102", "20220107", "20220102", "20220106", true),
+				Entry("is they have an intersection", "20220102", "20220105", "20220103", "20220106", true),
+			)
+			adjacentVerifyFn := func(include1, include2, expected bool) {
+				startTime1, _ := time.Parse("20060102", "20210105")
+				endTime1, _ := time.Parse("20060102", "20210107")
+				startTime2, _ := time.Parse("20060102", "20210107")
+				endTime2, _ := time.Parse("20060102", "20210109")
+				r1 := tsdb.NewTimeRange(startTime1, endTime1, false, include1)
+				r2 := tsdb.NewTimeRange(startTime2, endTime2, include2, false)
+				Expect(r1.Overlapping(r2)).To(Equal(expected))
+			}
+
+			DescribeTable("They are adjacent",
+				adjacentVerifyFn,
+				Entry("is they are inclusive", true, true, true),
+				Entry("is that range1 includes upper, but range2 excludes lower", true, false, false),
+				Entry("is that range1 excludes upper, but range2 includes lower", false, true, false),
+				Entry("is they are exclusive", false, false, false),
+			)
+		})
+
+	})
+})
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index c2e9d5e..de269e6 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -106,7 +106,7 @@ type seriesDB struct {
 	sync.Mutex
 	l *logger.Logger
 
-	lst            []*segment
+	segCtrl        *segmentController
 	seriesMetadata kv.Store
 	sID            common.ShardID
 }
@@ -134,7 +134,11 @@ func (s *seriesDB) GetByID(id common.SeriesID) (Series, error) {
 }
 
 func (s *seriesDB) block(id GlobalItemID) blockDelegate {
-	return s.lst[id.segID].lst[id.blockID].delegate()
+	seg := s.segCtrl.get(id.segID)
+	if seg == nil {
+		return nil
+	}
+	return seg.lst[id.blockID].delegate()
 }
 
 func (s *seriesDB) shardID() common.ShardID {
@@ -191,11 +195,13 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 	return result, err
 }
 
-func (s *seriesDB) span(_ TimeRange) []blockDelegate {
+func (s *seriesDB) span(timeRange TimeRange) []blockDelegate {
 	//TODO: return correct blocks
-	result := make([]blockDelegate, 0, len(s.lst[0].lst))
-	for _, b := range s.lst[0].lst {
-		result = append(result, b.delegate())
+	result := make([]blockDelegate, 0)
+	for _, s := range s.segCtrl.span(timeRange) {
+		for _, b := range s.lst {
+			result = append(result, b.delegate())
+		}
 	}
 	return result
 }
@@ -205,23 +211,14 @@ func (s *seriesDB) context() context.Context {
 }
 
 func (s *seriesDB) Close() error {
-	for _, seg := range s.lst {
-		seg.close()
-	}
 	return s.seriesMetadata.Close()
 }
 
-func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string, segLst []*segment) (SeriesDatabase, error) {
+func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string, segCtrl *segmentController) (SeriesDatabase, error) {
 	sdb := &seriesDB{
-		sID: shardID,
-		lst: segLst,
-	}
-	parentLogger := ctx.Value(logger.ContextKey)
-	if parentLogger == nil {
-		return nil, logger.ErrNoLoggerInContext
-	}
-	if pl, ok := parentLogger.(*logger.Logger); ok {
-		sdb.l = pl.Named("series_database")
+		sID:     shardID,
+		segCtrl: segCtrl,
+		l:       logger.Fetch(ctx, "series_database"),
 	}
 	var err error
 	sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 7b1b7b9..7eaf50f 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -23,21 +23,23 @@ import (
 	"sync"
 	"time"
 
+	"github.com/pkg/errors"
+
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 var _ Shard = (*shard)(nil)
 
 type shard struct {
-	sync.Mutex
-	id     common.ShardID
-	logger *logger.Logger
+	l  *logger.Logger
+	id common.ShardID
 
-	location       string
-	seriesDatabase SeriesDatabase
-	indexDatabase  IndexDatabase
-	lst            []*segment
+	seriesDatabase        SeriesDatabase
+	indexDatabase         IndexDatabase
+	segmentController     *segmentController
+	segmentManageStrategy *bucket.Strategy
 }
 
 func (s *shard) ID() common.ShardID {
@@ -52,66 +54,234 @@ func (s *shard) Index() IndexDatabase {
 	return s.indexDatabase
 }
 
-func openShard(ctx context.Context, id common.ShardID, location string) (*shard, error) {
-	s := &shard{
-		id:       id,
-		location: location,
-	}
-	parentLogger := ctx.Value(logger.ContextKey)
-	if parentLogger != nil {
-		if pl, ok := parentLogger.(*logger.Logger); ok {
-			s.logger = pl.Named("shard" + strconv.Itoa(int(id)))
-		}
+func OpenShard(ctx context.Context, id common.ShardID, root string, intervalRule IntervalRule) (Shard, error) {
+	path, err := mkdir(shardTemplate, root, int(id))
+	if err != nil {
+		return nil, errors.Wrapf(err, "make the directory of the shard %d ", int(id))
 	}
-	loadSeg := func(path string) error {
-		seg, err := newSegment(ctx, path)
-		if err != nil {
-			return err
-		}
-		{
-			s.Lock()
-			defer s.Unlock()
-			s.lst = append(s.lst, seg)
-		}
-		s.logger.Info().Int("size", len(s.lst)).Msg("seg size")
-		return nil
+	l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id)))
+	l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a shard")
+	s := &shard{
+		id:                id,
+		segmentController: newSegmentController(path, intervalRule),
+		l:                 l,
 	}
-	err := walkDir(location, segPathPrefix, func(_, absolutePath string) error {
-		s.logger.Info().Str("path", absolutePath).Msg("loading a segment")
-		return loadSeg(absolutePath)
-	})
+	shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
+	err = s.segmentController.open(shardCtx)
 	if err != nil {
 		return nil, err
 	}
-	if len(s.lst) < 1 {
-		var segPath string
-		segPath, err = mkdir(segTemplate, location, time.Now().Format(segFormat))
-		if err != nil {
-			return nil, err
-		}
-		s.logger.Info().Str("path", segPath).Msg("creating a new segment")
-		err = loadSeg(segPath)
-		if err != nil {
-			return nil, err
-		}
-	}
-	seriesPath, err := mkdir(seriesTemplate, s.location)
+	seriesPath, err := mkdir(seriesTemplate, path)
 	if err != nil {
 		return nil, err
 	}
-	sdb, err := newSeriesDataBase(ctx, s.id, seriesPath, s.lst)
+	sdb, err := newSeriesDataBase(shardCtx, s.id, seriesPath, s.segmentController)
 	if err != nil {
 		return nil, err
 	}
 	s.seriesDatabase = sdb
-	idb, err := newIndexDatabase(ctx, s.id, s.lst)
+	idb, err := newIndexDatabase(shardCtx, s.id, s.segmentController)
 	if err != nil {
 		return nil, err
 	}
 	s.indexDatabase = idb
+	s.segmentManageStrategy, err = bucket.NewStrategy(s.segmentController, bucket.WithLogger(s.l))
+	if err != nil {
+		return nil, err
+	}
+	s.segmentManageStrategy.Run()
 	return s, nil
 }
 
 func (s *shard) Close() error {
+	s.segmentManageStrategy.Close()
+	s.segmentController.close()
 	return s.seriesDatabase.Close()
 }
+
+type IntervalUnit int
+
+const (
+	DAY IntervalUnit = iota
+	MONTH
+	YEAR
+	MILLISECOND // only for testing
+)
+
+func (iu IntervalUnit) String() string {
+	switch iu {
+	case DAY:
+		return "day"
+	case MONTH:
+		return "month"
+	case YEAR:
+		return "year"
+	case MILLISECOND:
+		return "millis"
+
+	}
+	panic("invalid interval unit")
+}
+
+func (iu IntervalUnit) Format(tm time.Time) string {
+	switch iu {
+	case DAY:
+		return tm.Format(segDayFormat)
+	case MONTH:
+		return tm.Format(segMonthFormat)
+	case YEAR:
+		return tm.Format(segYearFormat)
+	case MILLISECOND:
+		return tm.Format(segMillisecondFormat)
+	}
+	panic("invalid interval unit")
+}
+
+func (iu IntervalUnit) Parse(value string) (time.Time, error) {
+	switch iu {
+	case DAY:
+		return time.Parse(segDayFormat, value)
+	case MONTH:
+		return time.Parse(segMonthFormat, value)
+	case YEAR:
+		return time.Parse(segYearFormat, value)
+	case MILLISECOND:
+		return time.Parse(segMillisecondFormat, value)
+	}
+	panic("invalid interval unit")
+}
+
+type IntervalRule struct {
+	Unit IntervalUnit
+	Num  int
+}
+
+func (ir IntervalRule) NextTime(current time.Time) time.Time {
+	switch ir.Unit {
+	case DAY:
+		return current.AddDate(0, 0, ir.Num)
+	case MONTH:
+		return current.AddDate(0, ir.Num, 0)
+	case YEAR:
+		return current.AddDate(ir.Num, 0, 0)
+	case MILLISECOND:
+		return current.Add(time.Millisecond * time.Duration(ir.Num))
+	}
+	panic("invalid interval unit")
+}
+
+type segmentController struct {
+	sync.RWMutex
+	location     string
+	intervalRule IntervalRule
+	lst          []*segment
+}
+
+func newSegmentController(location string, intervalRule IntervalRule) *segmentController {
+	return &segmentController{
+		location:     location,
+		intervalRule: intervalRule,
+	}
+}
+
+func (sc *segmentController) get(segID uint16) *segment {
+	sc.RLock()
+	defer sc.RUnlock()
+	last := len(sc.lst) - 1
+	for i := range sc.lst {
+		s := sc.lst[last-i]
+		if s.id == segID {
+			return s
+		}
+	}
+	return nil
+}
+
+func (sc *segmentController) span(timeRange TimeRange) (ss []*segment) {
+	sc.RLock()
+	defer sc.RUnlock()
+	last := len(sc.lst) - 1
+	for i := range sc.lst {
+		s := sc.lst[last-i]
+		if s.Overlapping(timeRange) {
+			ss = append(ss, s)
+		}
+	}
+	return ss
+}
+
+func (sc *segmentController) segments() (ss []*segment) {
+	sc.RLock()
+	defer sc.RUnlock()
+	r := make([]*segment, len(sc.lst))
+	copy(r, sc.lst)
+	return r
+}
+
+func (sc *segmentController) Current() bucket.Reporter {
+	sc.RLock()
+	defer sc.RUnlock()
+	now := time.Now()
+	for _, s := range sc.lst {
+		if s.suffix == sc.intervalRule.Unit.Format(now) {
+			return s
+		}
+	}
+	// return the latest segment before now
+	if len(sc.lst) > 0 {
+		return sc.lst[len(sc.lst)-1]
+	}
+	return nil
+}
+
+func (sc *segmentController) Next() (bucket.Reporter, error) {
+	return sc.create(context.TODO(), sc.intervalRule.Unit.Format(
+		sc.intervalRule.NextTime(time.Now())))
+}
+
+func (sc *segmentController) open(ctx context.Context) error {
+	err := walkDir(
+		sc.location,
+		segPathPrefix,
+		func(suffix, absolutePath string) error {
+			_, err := sc.load(ctx, suffix, absolutePath)
+			return err
+		})
+	if err != nil {
+		return err
+	}
+	if sc.Current() == nil {
+		_, err = sc.create(ctx, sc.intervalRule.Unit.Format(time.Now()))
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (sc *segmentController) create(ctx context.Context, suffix string) (*segment, error) {
+	segPath, err := mkdir(segTemplate, sc.location, suffix)
+	if err != nil {
+		return nil, err
+	}
+	return sc.load(ctx, suffix, segPath)
+}
+
+func (sc *segmentController) load(ctx context.Context, suffix, path string) (seg *segment, err error) {
+	seg, err = openSegment(ctx, suffix, path, sc.intervalRule)
+	if err != nil {
+		return nil, err
+	}
+	{
+		sc.Lock()
+		defer sc.Unlock()
+		sc.lst = append(sc.lst, seg)
+	}
+	return seg, nil
+}
+
+func (sc *segmentController) close() {
+	for _, s := range sc.lst {
+		s.close()
+	}
+}
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
new file mode 100644
index 0000000..5609f0a
--- /dev/null
+++ b/banyand/tsdb/shard_test.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package tsdb_test
+
+import (
+	"context"
+	"io/ioutil"
+	"strings"
+	"time"
+
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+var _ = Describe("Shard", func() {
+	Describe("Generate segments", func() {
+		var tmp string
+		var deferFn func()
+		var shard tsdb.Shard
+
+		BeforeEach(func() {
+			var err error
+			tmp, deferFn, err = test.NewSpace()
+			Expect(err).NotTo(HaveOccurred())
+		})
+		AfterEach(func() {
+			shard.Close()
+			deferFn()
+		})
+		It("generates several segments", func() {
+			var err error
+			shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp, tsdb.IntervalRule{
+				Unit: tsdb.MILLISECOND,
+				Num:  1000,
+			})
+			Expect(err).NotTo(HaveOccurred())
+			Eventually(func() int {
+				files, err := ioutil.ReadDir(tmp + "/shard-0")
+				Expect(err).NotTo(HaveOccurred())
+				num := 0
+				for _, fi := range files {
+					if fi.IsDir() && strings.HasPrefix(fi.Name(), "seg-") {
+						num++
+					}
+				}
+				return num
+			}).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 3))
+		})
+
+	})
+})
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 3d740b0..a3aff1c 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -49,8 +49,11 @@ const (
 	blockTemplate       = rootPrefix + blockPathPrefix + "-%s"
 	globalIndexTemplate = rootPrefix + "index"
 
-	segFormat   = "20060102"
-	blockFormat = "1504"
+	segDayFormat         = "20060102"
+	segMonthFormat       = "200601"
+	segYearFormat        = "2006"
+	segMillisecondFormat = "20060102150405000"
+	blockFormat          = "1504"
 
 	dirPerm = 0700
 )
@@ -125,22 +128,17 @@ func (d *database) Close() error {
 }
 
 func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
-	db := &database{
-		location: opts.Location,
-		shardNum: opts.ShardNum,
-	}
-	parentLogger := ctx.Value(logger.ContextKey)
-	if parentLogger != nil {
-		if pl, ok := parentLogger.(*logger.Logger); ok {
-			db.logger = pl.Named("tsdb")
-		}
-	}
 	if opts.EncodingMethod.EncoderPool == nil || opts.EncodingMethod.DecoderPool == nil {
 		return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to open database")
 	}
 	if _, err := mkdir(opts.Location); err != nil {
 		return nil, err
 	}
+	db := &database{
+		location: opts.Location,
+		shardNum: opts.ShardNum,
+		logger:   logger.Fetch(ctx, "tsdb"),
+	}
 	db.logger.Info().Str("path", opts.Location).Msg("initialized")
 	var entries []fs.FileInfo
 	var err error
@@ -165,13 +163,11 @@ func initDatabase(ctx context.Context, db *database) (Database, error) {
 func createDatabase(ctx context.Context, db *database, startID int) (Database, error) {
 	var err error
 	for i := startID; i < int(db.shardNum); i++ {
-		shardLocation, errInternal := mkdir(shardTemplate, db.location, i)
-		if errInternal != nil {
-			err = multierr.Append(err, errInternal)
-			continue
-		}
-		db.logger.Info().Int("shard_id", i).Str("path", shardLocation).Msg("creating a shard")
-		so, errNewShard := openShard(ctx, common.ShardID(i), shardLocation)
+		db.logger.Info().Int("shard_id", i).Msg("creating a shard")
+		so, errNewShard := OpenShard(ctx, common.ShardID(i), db.location, IntervalRule{
+			Unit: DAY,
+			Num:  1,
+		})
 		if errNewShard != nil {
 			err = multierr.Append(err, errNewShard)
 			continue
@@ -186,17 +182,23 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) {
 	//TODO: open the manifest file
 	db.Lock()
 	defer db.Unlock()
-	err := walkDir(db.location, shardPathPrefix, func(name, absolutePath string) error {
-		shardSegs := strings.Split(name, "-")
-		shardID, err := strconv.Atoi(shardSegs[1])
+	err := walkDir(db.location, shardPathPrefix, func(suffix, _ string) error {
+		shardID, err := strconv.Atoi(suffix)
 		if err != nil {
 			return err
 		}
 		if shardID >= int(db.shardNum) {
 			return nil
 		}
-		db.logger.Info().Int("shard_id", shardID).Str("path", absolutePath).Msg("opening a shard")
-		so, errOpenShard := openShard(context.WithValue(ctx, logger.ContextKey, db.logger), common.ShardID(shardID), absolutePath)
+		db.logger.Info().Int("shard_id", shardID).Msg("opening a existing shard")
+		so, errOpenShard := OpenShard(
+			context.WithValue(ctx, logger.ContextKey, db.logger),
+			common.ShardID(shardID),
+			db.location,
+			IntervalRule{
+				Unit: DAY,
+				Num:  1,
+			})
 		if errOpenShard != nil {
 			return errOpenShard
 		}
@@ -218,7 +220,7 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) {
 	return db, nil
 }
 
-type walkFn func(name, absolutePath string) error
+type walkFn func(suffix, absolutePath string) error
 
 func walkDir(root, prefix string, walkFn walkFn) error {
 	files, err := ioutil.ReadDir(root)
@@ -229,7 +231,8 @@ func walkDir(root, prefix string, walkFn walkFn) error {
 		if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) {
 			continue
 		}
-		errWalk := walkFn(f.Name(), fmt.Sprintf(rootPrefix, root)+f.Name())
+		segs := strings.Split(f.Name(), "-")
+		errWalk := walkFn(segs[len(segs)-1], fmt.Sprintf(rootPrefix, root)+f.Name())
 		if errWalk != nil {
 			return errors.WithMessagef(errWalk, "failed to load: %s", f.Name())
 		}
diff --git a/pkg/logger/logger.go b/banyand/tsdb/tsdb_suite_test.go
similarity index 53%
copy from pkg/logger/logger.go
copy to banyand/tsdb/tsdb_suite_test.go
index 36896ed..c761503 100644
--- a/pkg/logger/logger.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -14,40 +14,26 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-package logger
+//
+package tsdb_test
 
 import (
-	"strings"
-
-	"github.com/pkg/errors"
-	"github.com/rs/zerolog"
-)
-
-var ContextKey = contextKey{}
-var ErrNoLoggerInContext = errors.New("no logger in context")
+	"testing"
 
-type contextKey struct{}
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
 
-// Logging is the config info
-type Logging struct {
-	Env   string
-	Level string
-}
-
-// Logger is wrapper for rs/zerolog logger with module, it is singleton.
-type Logger struct {
-	module string
-	*zerolog.Logger
-}
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
 
-func (l *Logger) Named(name string) *Logger {
-	module := strings.Join([]string{l.module, name}, ".")
-	subLogger := root.Logger.With().Str("module", module).Logger()
-	return &Logger{module: module, Logger: &subLogger}
+func TestTsdb(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Tsdb Suite")
 }
 
-// Loggable indicates the implement supports logging
-type Loggable interface {
-	SetLogger(*Logger)
-}
+var _ = BeforeSuite(func() {
+	Expect(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "info",
+	})).Should(Succeed())
+})
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index bfb9957..75c1cb8 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -60,7 +60,7 @@ func verifyDatabaseStructure(tester *assert.Assertions, tempDir string) {
 	seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
 	validateDirectory(tester, seriesPath)
 	now := time.Now()
-	segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segFormat))
+	segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segDayFormat))
 	validateDirectory(tester, segPath)
 	validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, now.Format(blockFormat)))
 }
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 36896ed..98a190a 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -18,6 +18,7 @@
 package logger
 
 import (
+	"context"
 	"strings"
 
 	"github.com/pkg/errors"
@@ -51,3 +52,14 @@ func (l *Logger) Named(name string) *Logger {
 type Loggable interface {
 	SetLogger(*Logger)
 }
+
+func Fetch(ctx context.Context, name string) *Logger {
+	parentLogger := ctx.Value(ContextKey)
+	if parentLogger == nil {
+		return GetLogger(name)
+	}
+	if pl, ok := parentLogger.(*Logger); ok {
+		return pl.Named(name)
+	}
+	return GetLogger(name)
+}
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index bacb9a1..95f3e3e 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -77,7 +77,9 @@ func executeForShard(series tsdb.SeriesList, timeRange tsdb.TimeRange,
 		itersInSeries, err := func() ([]tsdb.Iterator, error) {
 			sp, errInner := seriesFound.Span(timeRange)
 			defer func(sp tsdb.SeriesSpan) {
-				_ = sp.Close()
+				if sp != nil {
+					_ = sp.Close()
+				}
 			}(sp)
 			if errInner != nil {
 				return nil, errInner
diff --git a/pkg/query/logical/plan_indexscan_local.go b/pkg/query/logical/plan_indexscan_local.go
index bd22fd1..a18e62e 100644
--- a/pkg/query/logical/plan_indexscan_local.go
+++ b/pkg/query/logical/plan_indexscan_local.go
@@ -108,7 +108,7 @@ func (uis *unresolvedIndexScan) Analyze(s Schema) (Plan, error) {
 
 	return &localIndexScan{
 		orderBy:             orderBySubPlan,
-		timeRange:           tsdb.NewTimeRange(uis.startTime, uis.endTime),
+		timeRange:           tsdb.NewInclusiveTimeRange(uis.startTime, uis.endTime),
 		schema:              s,
 		projectionFieldRefs: projFieldsRefs,
 		metadata:            uis.metadata,