You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/01/24 04:16:23 UTC
[skywalking-banyandb] branch main updated: Bucket strategy managment and segment controller (#70)
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new b61ab29 Bucket strategy managment and segment controller (#70)
b61ab29 is described below
commit b61ab295c38c672ca7aa60e308d0d64037272766
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Jan 24 12:16:18 2022 +0800
Bucket strategy managment and segment controller (#70)
* init bucket
Add segment controller
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Update banyand/tsdb/bucket/strategy.go
Co-authored-by: Jiajing LU <lu...@gmail.com>
* Rename variable
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Remove outter loop from strategy
Signed-off-by: Gao Hongtao <ha...@gmail.com>
Co-authored-by: Jiajing LU <lu...@gmail.com>
---
banyand/measure/query_test.go | 2 +-
banyand/measure/write.go | 2 +-
banyand/query/processor_test.go | 9 +-
banyand/stream/stream_query.go | 2 +-
banyand/stream/stream_query_test.go | 21 +-
banyand/stream/stream_write.go | 2 +-
banyand/tsdb/block.go | 14 +-
.../logger.go => banyand/tsdb/bucket/bucket.go | 39 +--
.../tsdb/bucket/bucket_suite_test.go | 47 ++--
banyand/tsdb/bucket/strategy.go | 127 ++++++++++
banyand/tsdb/bucket/strategy_test.go | 141 +++++++++++
banyand/tsdb/indexdb.go | 45 ++--
banyand/tsdb/segment.go | 74 ++++--
banyand/tsdb/series.go | 59 ++++-
banyand/tsdb/series_seek_sort.go | 2 +-
banyand/tsdb/series_test.go | 132 ++++++++++
banyand/tsdb/seriesdb.go | 35 ++-
banyand/tsdb/shard.go | 266 +++++++++++++++++----
banyand/tsdb/shard_test.go | 70 ++++++
banyand/tsdb/tsdb.go | 55 +++--
.../logger.go => banyand/tsdb/tsdb_suite_test.go | 46 ++--
banyand/tsdb/tsdb_test.go | 2 +-
pkg/logger/logger.go | 12 +
pkg/query/logical/common.go | 4 +-
pkg/query/logical/plan_indexscan_local.go | 2 +-
25 files changed, 944 insertions(+), 266 deletions(-)
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 31638db..60b8bf2 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -37,7 +37,7 @@ func Test_ParseTag_And_ParseField(t *testing.T) {
r.NoError(err)
series, err := shard.Series().Get(tsdb.Entity{tsdb.Entry("1")})
r.NoError(err)
- seriesSpan, err := series.Span(tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour))
+ seriesSpan, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour))
defer func(seriesSpan tsdb.SeriesSpan) {
_ = seriesSpan.Close()
}(seriesSpan)
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index b1dc724..b0f9854 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -78,7 +78,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
return err
}
t := value.GetTimestamp().AsTime()
- wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
+ wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0))
if err != nil {
if wp != nil {
_ = wp.Close()
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index dae4951..d3a659a 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -377,8 +377,13 @@ func TestQueryProcessor(t *testing.T) {
singleTester.NoError(err)
singleTester.NotNil(msg)
// TODO: better error response
- singleTester.NotNil(msg.Data())
- singleTester.Len(msg.Data(), tt.wantLen)
+ var dataLen int
+ if msg.Data() == nil {
+ dataLen = 0
+ } else {
+ dataLen = len(msg.Data().([]*streamv1.Element))
+ }
+ singleTester.Equal(dataLen, tt.wantLen)
if tt.checker != nil {
singleTester.True(tt.checker(msg.Data().([]*streamv1.Element)))
}
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index dfca4f4..6913b4a 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -20,8 +20,8 @@ package stream
import (
"io"
- "github.com/golang/protobuf/proto"
"github.com/pkg/errors"
+ "google.golang.org/protobuf/proto"
"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index f77cfd6..2533ef4 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -20,7 +20,6 @@ package stream
import (
"bytes"
"embed"
- _ "embed"
"encoding/base64"
"encoding/json"
"fmt"
@@ -107,7 +106,7 @@ func Test_Stream_Series(t *testing.T) {
name: "all",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
},
want: shardsForTest{
{
@@ -137,7 +136,7 @@ func Test_Stream_Series(t *testing.T) {
name: "time range",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange: tsdb.NewTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 1*time.Hour),
+ timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 1*time.Hour),
},
want: shardsForTest{
{
@@ -164,7 +163,7 @@ func Test_Stream_Series(t *testing.T) {
name: "find series by service_id and instance_id",
args: queryOpts{
entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
- timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
},
want: shardsForTest{
{
@@ -183,7 +182,7 @@ func Test_Stream_Series(t *testing.T) {
name: "find a series",
args: queryOpts{
entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
- timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
},
want: shardsForTest{
{
@@ -197,7 +196,7 @@ func Test_Stream_Series(t *testing.T) {
name: "filter",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
builder.Filter(&databasev1.IndexRule{
Metadata: &commonv1.Metadata{
@@ -243,7 +242,7 @@ func Test_Stream_Series(t *testing.T) {
name: "order by duration",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
builder.OrderByIndex(&databasev1.IndexRule{
Metadata: &commonv1.Metadata{
@@ -284,7 +283,7 @@ func Test_Stream_Series(t *testing.T) {
name: "filter by duration",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
@@ -331,7 +330,7 @@ func Test_Stream_Series(t *testing.T) {
name: "filter and sort by duration",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
@@ -379,7 +378,7 @@ func Test_Stream_Series(t *testing.T) {
name: "filter by several conditions",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
@@ -442,7 +441,7 @@ func Test_Stream_Series(t *testing.T) {
name: "filter by several conditions, sort by duration",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index d35d088..7fef502 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -71,7 +71,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
return err
}
t := value.GetTimestamp().AsTime()
- wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
+ wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0))
if err != nil {
if wp != nil {
_ = wp.Close()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 956bf13..c5a2b98 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -23,11 +23,11 @@ import (
"time"
"github.com/dgraph-io/ristretto/z"
- "github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/index/lsm"
@@ -63,16 +63,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
path: opts.path,
ref: z.NewCloser(1),
startTime: time.Now(),
- }
- parentLogger := ctx.Value(logger.ContextKey)
- if parentLogger != nil {
- if pl, ok := parentLogger.(*logger.Logger); ok {
- b.l = pl.Named("block")
- }
+ l: logger.Fetch(ctx, "block"),
}
encodingMethodObject := ctx.Value(encodingMethodKey)
if encodingMethodObject == nil {
- return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to create a block")
+ encodingMethodObject = EncodingMethod{
+ EncoderPool: encoding.NewPlainEncoderPool(0),
+ DecoderPool: encoding.NewPlainDecoderPool(0),
+ }
}
encodingMethod := encodingMethodObject.(EncodingMethod)
if b.store, err = kv.OpenTimeSeriesStore(
diff --git a/pkg/logger/logger.go b/banyand/tsdb/bucket/bucket.go
similarity index 53%
copy from pkg/logger/logger.go
copy to banyand/tsdb/bucket/bucket.go
index 36896ed..82a76ec 100644
--- a/pkg/logger/logger.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -15,39 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-package logger
+package bucket
-import (
- "strings"
-
- "github.com/pkg/errors"
- "github.com/rs/zerolog"
-)
-
-var ContextKey = contextKey{}
-var ErrNoLoggerInContext = errors.New("no logger in context")
-
-type contextKey struct{}
-
-// Logging is the config info
-type Logging struct {
- Env string
- Level string
+type Controller interface {
+ Current() Reporter
+ Next() (Reporter, error)
}
-// Logger is wrapper for rs/zerolog logger with module, it is singleton.
-type Logger struct {
- module string
- *zerolog.Logger
+type Status struct {
+ Capacity int
+ Volume int
}
-func (l *Logger) Named(name string) *Logger {
- module := strings.Join([]string{l.module, name}, ".")
- subLogger := root.Logger.With().Str("module", module).Logger()
- return &Logger{module: module, Logger: &subLogger}
-}
+type Channel chan Status
-// Loggable indicates the implement supports logging
-type Loggable interface {
- SetLogger(*Logger)
+type Reporter interface {
+ Report() Channel
}
diff --git a/pkg/logger/logger.go b/banyand/tsdb/bucket/bucket_suite_test.go
similarity index 53%
copy from pkg/logger/logger.go
copy to banyand/tsdb/bucket/bucket_suite_test.go
index 36896ed..0ab7ff8 100644
--- a/pkg/logger/logger.go
+++ b/banyand/tsdb/bucket/bucket_suite_test.go
@@ -14,40 +14,25 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
-package logger
+//
+package bucket_test
import (
- "strings"
-
- "github.com/pkg/errors"
- "github.com/rs/zerolog"
-)
-
-var ContextKey = contextKey{}
-var ErrNoLoggerInContext = errors.New("no logger in context")
+ "testing"
-type contextKey struct{}
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
-// Logging is the config info
-type Logging struct {
- Env string
- Level string
-}
-
-// Logger is wrapper for rs/zerolog logger with module, it is singleton.
-type Logger struct {
- module string
- *zerolog.Logger
-}
-
-func (l *Logger) Named(name string) *Logger {
- module := strings.Join([]string{l.module, name}, ".")
- subLogger := root.Logger.With().Str("module", module).Logger()
- return &Logger{module: module, Logger: &subLogger}
-}
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
-// Loggable indicates the implement supports logging
-type Loggable interface {
- SetLogger(*Logger)
+func TestBucket(t *testing.T) {
+ RegisterFailHandler(Fail)
+ BeforeSuite(func() {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "debug",
+ })).Should(Succeed())
+ })
+ RunSpecs(t, "Bucket Suite")
}
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
new file mode 100644
index 0000000..339aa4d
--- /dev/null
+++ b/banyand/tsdb/bucket/strategy.go
@@ -0,0 +1,127 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bucket
+
+import (
+ "github.com/pkg/errors"
+ "go.uber.org/multierr"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+ ErrInvalidParameter = errors.New("parameters are invalid")
+ ErrNoMoreBucket = errors.New("no more buckets")
+)
+
+type Ratio float64
+
+type Strategy struct {
+ optionsErr error
+ ratio Ratio
+ ctrl Controller
+ current Reporter
+ next Reporter
+ logger *logger.Logger
+ stopCh chan struct{}
+}
+
+type StrategyOptions func(*Strategy)
+
+func WithNextThreshold(ratio Ratio) StrategyOptions {
+ return func(s *Strategy) {
+ if ratio > 1.0 {
+ s.optionsErr = multierr.Append(s.optionsErr,
+ errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", ratio))
+ return
+ }
+ s.ratio = ratio
+ }
+}
+
+func WithLogger(logger *logger.Logger) StrategyOptions {
+ return func(s *Strategy) {
+ s.logger = logger
+ }
+}
+
+func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) {
+ if ctrl == nil {
+ return nil, errors.Wrap(ErrInvalidParameter, "controller is absent")
+ }
+ strategy := &Strategy{
+ ctrl: ctrl,
+ ratio: 0.8,
+ stopCh: make(chan struct{}),
+ }
+ for _, opt := range options {
+ opt(strategy)
+ }
+ if strategy.optionsErr != nil {
+ return nil, strategy.optionsErr
+ }
+ if strategy.logger == nil {
+ strategy.logger = logger.GetLogger("bucket-strategy")
+ }
+ return strategy, nil
+}
+
+func (s *Strategy) Run() {
+ reset := func() {
+ for s.current == nil {
+ s.current = s.ctrl.Current()
+ }
+ s.next = nil
+ }
+ reset()
+ go func(s *Strategy) {
+ var err error
+ bucket:
+ c := s.current.Report()
+ for {
+ select {
+ case status, closed := <-c:
+ if !closed {
+ reset()
+ goto bucket
+ }
+ ratio := Ratio(status.Volume) / Ratio(status.Capacity)
+ if ratio >= s.ratio && s.next == nil {
+ s.next, err = s.ctrl.Next()
+ if errors.Is(err, ErrNoMoreBucket) {
+ return
+ }
+ if err != nil {
+ s.logger.Err(err).Msg("failed to create the next bucket")
+ }
+ }
+ if ratio >= 1.0 {
+ s.current = s.next
+ s.next = nil
+ goto bucket
+ }
+ case <-s.stopCh:
+ return
+ }
+ }
+ }(s)
+}
+
+func (s *Strategy) Close() {
+ close(s.stopCh)
+}
diff --git a/banyand/tsdb/bucket/strategy_test.go b/banyand/tsdb/bucket/strategy_test.go
new file mode 100644
index 0000000..3a302ce
--- /dev/null
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -0,0 +1,141 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package bucket_test
+
+import (
+ "sync"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+
+ "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
+)
+
+var _ = Describe("Strategy", func() {
+ Context("Applying the strategy", func() {
+ var strategy *bucket.Strategy
+ It("uses the golden settings", func() {
+ ctrl := newController(2, 1, 10)
+ var err error
+ strategy, err = bucket.NewStrategy(ctrl)
+ Expect(err).NotTo(HaveOccurred())
+ strategy.Run()
+ Eventually(ctrl.isFull).Should(BeTrue())
+ })
+ It("never reaches the limit", func() {
+ ctrl := newController(1, 0, 10)
+ var err error
+ strategy, err = bucket.NewStrategy(ctrl)
+ Expect(err).NotTo(HaveOccurred())
+ strategy.Run()
+ Consistently(ctrl.isFull).ShouldNot(BeTrue())
+ })
+ It("exceeds the limit", func() {
+ ctrl := newController(2, 3, 10)
+ var err error
+ strategy, err = bucket.NewStrategy(ctrl)
+ Expect(err).NotTo(HaveOccurred())
+ strategy.Run()
+ Eventually(ctrl.isFull).Should(BeTrue())
+ })
+ It("'s first step exceeds the limit", func() {
+ ctrl := newController(2, 15, 10)
+ var err error
+ strategy, err = bucket.NewStrategy(ctrl)
+ Expect(err).NotTo(HaveOccurred())
+ strategy.Run()
+ Eventually(ctrl.isFull).Should(BeTrue())
+ })
+ AfterEach(func() {
+ if strategy != nil {
+ strategy.Close()
+ }
+ })
+ })
+ Context("Invalid parameter", func() {
+ It("passes a ratio > 1.0", func() {
+ ctrl := newController(2, 3, 10)
+ _, err := bucket.NewStrategy(ctrl, bucket.WithNextThreshold(1.1))
+ Expect(err).To(MatchError(bucket.ErrInvalidParameter))
+ })
+ })
+})
+
+type controller struct {
+ maxBuckets int
+ usedBuckets int
+
+ reporter *reporter
+ capacity int
+ step int
+ mux sync.RWMutex
+}
+
+func newController(maxBuckets, step, capacity int) *controller {
+ ctrl := &controller{step: step, maxBuckets: maxBuckets, capacity: capacity}
+ ctrl.newReporter()
+ return ctrl
+}
+
+func (c *controller) Next() (bucket.Reporter, error) {
+ c.mux.Lock()
+ defer c.mux.Unlock()
+ if c.usedBuckets >= c.maxBuckets {
+ return nil, bucket.ErrNoMoreBucket
+ }
+ c.usedBuckets++
+ c.newReporter()
+ return c.reporter, nil
+}
+
+func (c *controller) Current() bucket.Reporter {
+ c.mux.RLock()
+ defer c.mux.RUnlock()
+ return c.reporter
+}
+
+func (c *controller) newReporter() {
+ c.reporter = &reporter{step: c.step, capacity: c.capacity}
+}
+
+func (c *controller) isFull() bool {
+ c.mux.RLock()
+ defer c.mux.RUnlock()
+ return c.usedBuckets >= c.maxBuckets
+}
+
+type reporter struct {
+ capacity int
+ volume int
+ step int
+}
+
+func (r *reporter) Report() bucket.Channel {
+ ch := make(bucket.Channel)
+ go func() {
+ for i := 0; i < r.capacity; i++ {
+ r.volume += r.step
+ ch <- bucket.Status{
+ Capacity: r.capacity,
+ Volume: r.volume,
+ }
+ }
+ close(ch)
+ }()
+ return ch
+}
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index dcd6123..741ba85 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -51,7 +51,7 @@ var _ IndexDatabase = (*indexDB)(nil)
type indexDB struct {
shardID common.ShardID
- lst []*segment
+ segCtrl *segmentController
}
func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
@@ -60,36 +60,38 @@ func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
if err != nil {
return nil, err
}
- err = i.lst[0].globalIndex.GetAll(f, func(rawBytes []byte) error {
- id := &GlobalItemID{}
- errUnMarshal := id.UnMarshal(rawBytes)
- if errUnMarshal != nil {
- return errUnMarshal
+ for _, s := range i.segCtrl.segments() {
+ err = s.globalIndex.GetAll(f, func(rawBytes []byte) error {
+ id := &GlobalItemID{}
+ errUnMarshal := id.UnMarshal(rawBytes)
+ if errUnMarshal != nil {
+ return errUnMarshal
+ }
+ result = append(result, *id)
+ return nil
+ })
+ if err == kv.ErrKeyNotFound {
+ return result, nil
}
- result = append(result, *id)
- return nil
- })
- if err == kv.ErrKeyNotFound {
- return result, nil
}
return result, err
}
func (i *indexDB) WriterBuilder() IndexWriterBuilder {
- return newIndexWriterBuilder(i.lst)
+ return newIndexWriterBuilder(i.segCtrl)
}
-func newIndexDatabase(_ context.Context, id common.ShardID, lst []*segment) (IndexDatabase, error) {
+func newIndexDatabase(_ context.Context, id common.ShardID, segCtrl *segmentController) (IndexDatabase, error) {
return &indexDB{
shardID: id,
- lst: lst,
+ segCtrl: segCtrl,
}, nil
}
var _ IndexWriterBuilder = (*indexWriterBuilder)(nil)
type indexWriterBuilder struct {
- segments []*segment
+ segCtrl *segmentController
ts time.Time
seg *segment
globalItemID *GlobalItemID
@@ -97,12 +99,11 @@ type indexWriterBuilder struct {
func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder {
i.ts = ts
- for _, s := range i.segments {
- if s.contains(ts) {
- i.seg = s
- break
- }
+ segs := i.segCtrl.span(NewTimeRangeDuration(ts, 0, true, false))
+ if len(segs) != 1 {
+ return i
}
+ i.seg = segs[0]
return i
}
@@ -125,9 +126,9 @@ func (i *indexWriterBuilder) Build() (IndexWriter, error) {
}, nil
}
-func newIndexWriterBuilder(segments []*segment) IndexWriterBuilder {
+func newIndexWriterBuilder(segCtrl *segmentController) IndexWriterBuilder {
return &indexWriterBuilder{
- segments: segments,
+ segCtrl: segCtrl,
}
}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 6a8305e..e82767a 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -19,42 +19,74 @@ package tsdb
import (
"context"
+ "strconv"
"sync"
"time"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
type segment struct {
- path string
+ id uint16
+ path string
+ suffix string
lst []*block
globalIndex kv.Store
sync.Mutex
- l *logger.Logger
- startTime time.Time
- endTime time.Time
+ l *logger.Logger
+ reporterStopCh chan struct{}
+ TimeRange
}
-func (s *segment) contains(ts time.Time) bool {
- greaterAndEqualStart := s.startTime.Equal(ts) || s.startTime.Before(ts)
- if s.endTime.IsZero() {
- return greaterAndEqualStart
+func (s *segment) Report() bucket.Channel {
+ ch := make(bucket.Channel)
+ interval := s.Duration() >> 4
+ if interval < 100*time.Millisecond {
+ interval = 100 * time.Millisecond
}
- return greaterAndEqualStart && s.endTime.After(ts)
+ go func() {
+ defer close(ch)
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ status := bucket.Status{
+ Capacity: int(s.End.UnixNano() - s.Start.UnixNano()),
+ Volume: int(time.Now().UnixNano() - s.Start.UnixNano()),
+ }
+ ch <- status
+ if status.Volume >= status.Capacity {
+ return
+ }
+ case <-s.reporterStopCh:
+ return
+ }
+ }
+ }()
+ return ch
}
-func newSegment(ctx context.Context, path string) (s *segment, err error) {
- s = &segment{
- path: path,
- startTime: time.Now(),
+func openSegment(ctx context.Context, suffix, path string, intervalRule IntervalRule) (s *segment, err error) {
+ startTime, err := intervalRule.Unit.Parse(suffix)
+ if err != nil {
+ return nil, err
}
- parentLogger := ctx.Value(logger.ContextKey)
- if parentLogger != nil {
- if pl, ok := parentLogger.(*logger.Logger); ok {
- s.l = pl.Named("segment")
- }
+ suffixInteger, err := strconv.Atoi(suffix)
+ if err != nil {
+ return nil, err
+ }
+ id := uint16(intervalRule.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
+ s = &segment{
+ id: id,
+ path: path,
+ suffix: suffix,
+ l: logger.Fetch(ctx, "segment"),
+ reporterStopCh: make(chan struct{}),
+ TimeRange: NewTimeRange(startTime, intervalRule.NextTime(startTime), true, false),
}
indexPath, err := mkdir(globalIndexTemplate, path)
if err != nil {
@@ -66,7 +98,8 @@ func newSegment(ctx context.Context, path string) (s *segment, err error) {
loadBlock := func(path string) error {
var b *block
if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, s.l), blockOpts{
- path: path,
+ segID: s.id,
+ path: path,
}); err != nil {
return err
}
@@ -77,7 +110,7 @@ func newSegment(ctx context.Context, path string) (s *segment, err error) {
}
return nil
}
- err = walkDir(path, blockPathPrefix, func(name, absolutePath string) error {
+ err = walkDir(path, blockPathPrefix, func(_, absolutePath string) error {
return loadBlock(absolutePath)
})
if err != nil {
@@ -103,4 +136,5 @@ func (s *segment) close() {
b.close()
}
s.globalIndex.Close()
+ close(s.reporterStopCh)
}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index c9ec07b..5cc9805 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -34,6 +34,7 @@ import (
var (
ErrEmptySeriesSpan = errors.New("there is no data in such time range")
ErrItemIDMalformed = errors.New("serialized item id is malformed")
+ ErrBlockAbsent = errors.New("block is absent")
)
type GlobalItemID struct {
@@ -72,32 +73,63 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
}
type TimeRange struct {
- Start time.Time
- End time.Time
+ Start time.Time
+ End time.Time
+ IncludeStart bool
+ IncludeEnd bool
}
-func (t TimeRange) contains(unixNano uint64) bool {
+func (t TimeRange) Contains(unixNano uint64) bool {
tp := time.Unix(0, int64(unixNano))
- if tp.Equal(t.End) || tp.After(t.End) {
- return false
+ if t.Start.Equal(tp) {
+ return t.IncludeStart
}
- return tp.Equal(t.Start) || tp.After(t.Start)
+ if t.End.Equal(tp) {
+ return t.IncludeEnd
+ }
+ return !tp.Before(t.Start) && !tp.After(t.End)
+}
+
+func (t TimeRange) Overlapping(other TimeRange) bool {
+ if t.Start.Equal(other.End) {
+ return t.IncludeStart && other.IncludeEnd
+ }
+ if other.Start.Equal(t.End) {
+ return t.IncludeEnd && other.IncludeStart
+ }
+ return !t.Start.After(other.End) && !other.Start.After(t.End)
+}
+
+func (t TimeRange) Duration() time.Duration {
+ return t.End.Sub(t.Start)
}
-func NewTimeRange(Start, End time.Time) TimeRange {
+func NewInclusiveTimeRange(start, end time.Time) TimeRange {
return TimeRange{
- Start: Start,
- End: End,
+ Start: start,
+ End: end,
+ IncludeStart: true,
+ IncludeEnd: true,
}
}
-func NewTimeRangeDuration(Start time.Time, Duration time.Duration) TimeRange {
+func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration) TimeRange {
+ return NewTimeRangeDuration(start, duration, true, true)
+}
+
+func NewTimeRange(start, end time.Time, includeStart, includeEnd bool) TimeRange {
return TimeRange{
- Start: Start,
- End: Start.Add(Duration),
+ Start: start,
+ End: end,
+ IncludeStart: includeStart,
+ IncludeEnd: includeEnd,
}
}
+func NewTimeRangeDuration(start time.Time, duration time.Duration, includeStart, includeEnd bool) TimeRange {
+ return NewTimeRange(start, start.Add(duration), includeStart, includeEnd)
+}
+
type Series interface {
ID() common.SeriesID
Span(timeRange TimeRange) (SeriesSpan, error)
@@ -121,6 +153,9 @@ type series struct {
func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
b := s.blockDB.block(id)
+ if b == nil {
+ return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", id)
+ }
return &item{
data: b.dataReader(),
itemID: id.ID,
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 6182d96..38b733b 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -56,7 +56,7 @@ func (s *seekerBuilder) buildSeries(conditions []condWithIRT) ([]Iterator, error
func (s *seekerBuilder) buildSeriesByIndex(conditions []condWithIRT) (series []Iterator, err error) {
timeFilter := func(item Item) bool {
- valid := s.seriesSpan.timeRange.contains(item.Time())
+ valid := s.seriesSpan.timeRange.Contains(item.Time())
timeRange := s.seriesSpan.timeRange
s.seriesSpan.l.Trace().
Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
diff --git a/banyand/tsdb/series_test.go b/banyand/tsdb/series_test.go
new file mode 100644
index 0000000..25a7df6
--- /dev/null
+++ b/banyand/tsdb/series_test.go
@@ -0,0 +1,132 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package tsdb_test
+
+import (
+ "fmt"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+)
+
+var _ = Describe("Series", func() {
+ Context("TimeRange", func() {
+ Context("Contains", func() {
+ verifyFn := func(start, end string, includeStart, includeEnd bool, ts string, expected bool) {
+ startTime, _ := time.Parse("20060202", start)
+ endTime, _ := time.Parse("20060202", end)
+ tsTime, _ := time.Parse("20060202", ts)
+ Expect(tsdb.NewTimeRange(startTime, endTime, includeStart, includeEnd).Contains(uint64(tsTime.UnixNano()))).To(Equal(expected))
+ }
+ DescribeTable("It's a exclusive range",
+ func(start, end, ts string, expected bool) {
+ verifyFn(start, end, false, false, ts, expected)
+ },
+ Entry("is in the middle", "20220205", "20220107", "20220106", true),
+ Entry("is at the lower", "20220205", "20220107", "20220105", false),
+ Entry("is at the upper", "20220205", "20220107", "20220107", false),
+ Entry("is before the lower", "20220205", "20220107", "20220104", false),
+ Entry("is after the upper", "20220205", "20220107", "20220108", false),
+ )
+ DescribeTable("It's a inclusive range",
+ func(start, end, ts string, expected bool) {
+ verifyFn(start, end, true, true, ts, expected)
+ },
+ Entry("is in the middle", "20220205", "20220107", "20220106", true),
+ Entry("is at the lower", "20220205", "20220107", "20220105", true),
+ Entry("is at the upper", "20220205", "20220107", "20220107", true),
+ Entry("is before the lower", "20220205", "20220107", "20220104", false),
+ Entry("is after the upper", "20220205", "20220107", "20220108", false),
+ )
+ DescribeTable("It's a inclusive lower and exclusive upper range",
+ func(start, end, ts string, expected bool) {
+ verifyFn(start, end, true, false, ts, expected)
+ },
+ Entry("is in the middle", "20220205", "20220107", "20220106", true),
+ Entry("is at the lower", "20220205", "20220107", "20220105", true),
+ Entry("is at the upper", "20220205", "20220107", "20220107", false),
+ Entry("is before the lower", "20220205", "20220107", "20220104", false),
+ Entry("is after the upper", "20220205", "20220107", "20220108", false),
+ )
+ DescribeTable("It's a exclusive lower and inclusive upper range",
+ func(start, end, ts string, expected bool) {
+ verifyFn(start, end, false, true, ts, expected)
+ },
+ Entry("is in the middle", "20220205", "20220107", "20220106", true),
+ Entry("is at the lower", "20220205", "20220107", "20220105", false),
+ Entry("is at the upper", "20220205", "20220107", "20220107", true),
+ Entry("is before the lower", "20220205", "20220107", "20220104", false),
+ Entry("is after the upper", "20220205", "20220107", "20220108", false),
+ )
+ })
+ Context("Overlapping", func() {
+ verifyFn := func(start1, end1, start2, end2 string, expected bool) {
+ startTime1, _ := time.Parse("20060102", start1)
+ endTime1, _ := time.Parse("20060102", end1)
+ startTime2, _ := time.Parse("20060102", start2)
+ endTime2, _ := time.Parse("20060102", end2)
+ includes := []bool{true, false}
+
+ for _, r1l := range includes {
+ for _, r1u := range includes {
+ for _, r2l := range includes {
+ for _, r2u := range includes {
+ By(fmt.Sprintf("r1 lower:%v upper:%v. r1 lower:%v upper:%v", r1l, r1u, r2l, r2u), func() {
+ r1 := tsdb.NewTimeRange(startTime1, endTime1, r1l, r1u)
+ r2 := tsdb.NewTimeRange(startTime2, endTime2, r2l, r2u)
+ Expect(r1.Overlapping(r2)).To(Equal(expected))
+ Expect(r2.Overlapping(r1)).To(Equal(expected))
+ })
+ }
+ }
+ }
+ }
+ }
+ DescribeTable("Each range type's behavior is identical",
+ verifyFn,
+ Entry("is no overlapping", "20220205", "20220107", "20220108", "20220112", false),
+ Entry("is the two range are identical", "20220105", "20220107", "20220105", "20220107", true),
+ Entry("is the one includes the other", "20220102", "20220107", "20220103", "20220106", true),
+ Entry("is the one includes the other, the upper bounds are identical", "20220102", "20220107", "20220103", "20220107", true),
+ Entry("is the one includes the other, the lower bounds are identical", "20220102", "20220107", "20220102", "20220106", true),
+ Entry("is they have an intersection", "20220102", "20220105", "20220103", "20220106", true),
+ )
+ adjacentVerifyFn := func(include1, include2, expected bool) {
+ startTime1, _ := time.Parse("20060102", "20210105")
+ endTime1, _ := time.Parse("20060102", "20210107")
+ startTime2, _ := time.Parse("20060102", "20210107")
+ endTime2, _ := time.Parse("20060102", "20210109")
+ r1 := tsdb.NewTimeRange(startTime1, endTime1, false, include1)
+ r2 := tsdb.NewTimeRange(startTime2, endTime2, include2, false)
+ Expect(r1.Overlapping(r2)).To(Equal(expected))
+ }
+
+ DescribeTable("They are adjacent",
+ adjacentVerifyFn,
+ Entry("is they are inclusive", true, true, true),
+ Entry("is that range1 includes upper, but range2 excludes lower", true, false, false),
+ Entry("is that range1 excludes upper, but range2 includes lower", false, true, false),
+ Entry("is they are exclusive", false, false, false),
+ )
+ })
+
+ })
+})
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index c2e9d5e..de269e6 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -106,7 +106,7 @@ type seriesDB struct {
sync.Mutex
l *logger.Logger
- lst []*segment
+ segCtrl *segmentController
seriesMetadata kv.Store
sID common.ShardID
}
@@ -134,7 +134,11 @@ func (s *seriesDB) GetByID(id common.SeriesID) (Series, error) {
}
func (s *seriesDB) block(id GlobalItemID) blockDelegate {
- return s.lst[id.segID].lst[id.blockID].delegate()
+ seg := s.segCtrl.get(id.segID)
+ if seg == nil {
+ return nil
+ }
+ return seg.lst[id.blockID].delegate()
}
func (s *seriesDB) shardID() common.ShardID {
@@ -191,11 +195,13 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
return result, err
}
-func (s *seriesDB) span(_ TimeRange) []blockDelegate {
+func (s *seriesDB) span(timeRange TimeRange) []blockDelegate {
//TODO: return correct blocks
- result := make([]blockDelegate, 0, len(s.lst[0].lst))
- for _, b := range s.lst[0].lst {
- result = append(result, b.delegate())
+ result := make([]blockDelegate, 0)
+ for _, s := range s.segCtrl.span(timeRange) {
+ for _, b := range s.lst {
+ result = append(result, b.delegate())
+ }
}
return result
}
@@ -205,23 +211,14 @@ func (s *seriesDB) context() context.Context {
}
func (s *seriesDB) Close() error {
- for _, seg := range s.lst {
- seg.close()
- }
return s.seriesMetadata.Close()
}
-func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string, segLst []*segment) (SeriesDatabase, error) {
+func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string, segCtrl *segmentController) (SeriesDatabase, error) {
sdb := &seriesDB{
- sID: shardID,
- lst: segLst,
- }
- parentLogger := ctx.Value(logger.ContextKey)
- if parentLogger == nil {
- return nil, logger.ErrNoLoggerInContext
- }
- if pl, ok := parentLogger.(*logger.Logger); ok {
- sdb.l = pl.Named("series_database")
+ sID: shardID,
+ segCtrl: segCtrl,
+ l: logger.Fetch(ctx, "series_database"),
}
var err error
sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 7b1b7b9..7eaf50f 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -23,21 +23,23 @@ import (
"sync"
"time"
+ "github.com/pkg/errors"
+
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
var _ Shard = (*shard)(nil)
type shard struct {
- sync.Mutex
- id common.ShardID
- logger *logger.Logger
+ l *logger.Logger
+ id common.ShardID
- location string
- seriesDatabase SeriesDatabase
- indexDatabase IndexDatabase
- lst []*segment
+ seriesDatabase SeriesDatabase
+ indexDatabase IndexDatabase
+ segmentController *segmentController
+ segmentManageStrategy *bucket.Strategy
}
func (s *shard) ID() common.ShardID {
@@ -52,66 +54,234 @@ func (s *shard) Index() IndexDatabase {
return s.indexDatabase
}
-func openShard(ctx context.Context, id common.ShardID, location string) (*shard, error) {
- s := &shard{
- id: id,
- location: location,
- }
- parentLogger := ctx.Value(logger.ContextKey)
- if parentLogger != nil {
- if pl, ok := parentLogger.(*logger.Logger); ok {
- s.logger = pl.Named("shard" + strconv.Itoa(int(id)))
- }
+func OpenShard(ctx context.Context, id common.ShardID, root string, intervalRule IntervalRule) (Shard, error) {
+ path, err := mkdir(shardTemplate, root, int(id))
+ if err != nil {
+ return nil, errors.Wrapf(err, "make the directory of the shard %d ", int(id))
}
- loadSeg := func(path string) error {
- seg, err := newSegment(ctx, path)
- if err != nil {
- return err
- }
- {
- s.Lock()
- defer s.Unlock()
- s.lst = append(s.lst, seg)
- }
- s.logger.Info().Int("size", len(s.lst)).Msg("seg size")
- return nil
+ l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id)))
+ l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a shard")
+ s := &shard{
+ id: id,
+ segmentController: newSegmentController(path, intervalRule),
+ l: l,
}
- err := walkDir(location, segPathPrefix, func(_, absolutePath string) error {
- s.logger.Info().Str("path", absolutePath).Msg("loading a segment")
- return loadSeg(absolutePath)
- })
+ shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
+ err = s.segmentController.open(shardCtx)
if err != nil {
return nil, err
}
- if len(s.lst) < 1 {
- var segPath string
- segPath, err = mkdir(segTemplate, location, time.Now().Format(segFormat))
- if err != nil {
- return nil, err
- }
- s.logger.Info().Str("path", segPath).Msg("creating a new segment")
- err = loadSeg(segPath)
- if err != nil {
- return nil, err
- }
- }
- seriesPath, err := mkdir(seriesTemplate, s.location)
+ seriesPath, err := mkdir(seriesTemplate, path)
if err != nil {
return nil, err
}
- sdb, err := newSeriesDataBase(ctx, s.id, seriesPath, s.lst)
+ sdb, err := newSeriesDataBase(shardCtx, s.id, seriesPath, s.segmentController)
if err != nil {
return nil, err
}
s.seriesDatabase = sdb
- idb, err := newIndexDatabase(ctx, s.id, s.lst)
+ idb, err := newIndexDatabase(shardCtx, s.id, s.segmentController)
if err != nil {
return nil, err
}
s.indexDatabase = idb
+ s.segmentManageStrategy, err = bucket.NewStrategy(s.segmentController, bucket.WithLogger(s.l))
+ if err != nil {
+ return nil, err
+ }
+ s.segmentManageStrategy.Run()
return s, nil
}
func (s *shard) Close() error {
+ s.segmentManageStrategy.Close()
+ s.segmentController.close()
return s.seriesDatabase.Close()
}
+
+type IntervalUnit int
+
+const (
+ DAY IntervalUnit = iota
+ MONTH
+ YEAR
+ MILLISECOND // only for testing
+)
+
+func (iu IntervalUnit) String() string {
+ switch iu {
+ case DAY:
+ return "day"
+ case MONTH:
+ return "month"
+ case YEAR:
+ return "year"
+ case MILLISECOND:
+ return "millis"
+
+ }
+ panic("invalid interval unit")
+}
+
+func (iu IntervalUnit) Format(tm time.Time) string {
+ switch iu {
+ case DAY:
+ return tm.Format(segDayFormat)
+ case MONTH:
+ return tm.Format(segMonthFormat)
+ case YEAR:
+ return tm.Format(segYearFormat)
+ case MILLISECOND:
+ return tm.Format(segMillisecondFormat)
+ }
+ panic("invalid interval unit")
+}
+
+func (iu IntervalUnit) Parse(value string) (time.Time, error) {
+ switch iu {
+ case DAY:
+ return time.Parse(segDayFormat, value)
+ case MONTH:
+ return time.Parse(segMonthFormat, value)
+ case YEAR:
+ return time.Parse(segYearFormat, value)
+ case MILLISECOND:
+ return time.Parse(segMillisecondFormat, value)
+ }
+ panic("invalid interval unit")
+}
+
+type IntervalRule struct {
+ Unit IntervalUnit
+ Num int
+}
+
+func (ir IntervalRule) NextTime(current time.Time) time.Time {
+ switch ir.Unit {
+ case DAY:
+ return current.AddDate(0, 0, ir.Num)
+ case MONTH:
+ return current.AddDate(0, ir.Num, 0)
+ case YEAR:
+ return current.AddDate(ir.Num, 0, 0)
+ case MILLISECOND:
+ return current.Add(time.Millisecond * time.Duration(ir.Num))
+ }
+ panic("invalid interval unit")
+}
+
+type segmentController struct {
+ sync.RWMutex
+ location string
+ intervalRule IntervalRule
+ lst []*segment
+}
+
+func newSegmentController(location string, intervalRule IntervalRule) *segmentController {
+ return &segmentController{
+ location: location,
+ intervalRule: intervalRule,
+ }
+}
+
+func (sc *segmentController) get(segID uint16) *segment {
+ sc.RLock()
+ defer sc.RUnlock()
+ last := len(sc.lst) - 1
+ for i := range sc.lst {
+ s := sc.lst[last-i]
+ if s.id == segID {
+ return s
+ }
+ }
+ return nil
+}
+
+func (sc *segmentController) span(timeRange TimeRange) (ss []*segment) {
+ sc.RLock()
+ defer sc.RUnlock()
+ last := len(sc.lst) - 1
+ for i := range sc.lst {
+ s := sc.lst[last-i]
+ if s.Overlapping(timeRange) {
+ ss = append(ss, s)
+ }
+ }
+ return ss
+}
+
+func (sc *segmentController) segments() (ss []*segment) {
+ sc.RLock()
+ defer sc.RUnlock()
+ r := make([]*segment, len(sc.lst))
+ copy(r, sc.lst)
+ return r
+}
+
+func (sc *segmentController) Current() bucket.Reporter {
+ sc.RLock()
+ defer sc.RUnlock()
+ now := time.Now()
+ for _, s := range sc.lst {
+ if s.suffix == sc.intervalRule.Unit.Format(now) {
+ return s
+ }
+ }
+ // return the latest segment before now
+ if len(sc.lst) > 0 {
+ return sc.lst[len(sc.lst)-1]
+ }
+ return nil
+}
+
+func (sc *segmentController) Next() (bucket.Reporter, error) {
+ return sc.create(context.TODO(), sc.intervalRule.Unit.Format(
+ sc.intervalRule.NextTime(time.Now())))
+}
+
+func (sc *segmentController) open(ctx context.Context) error {
+ err := walkDir(
+ sc.location,
+ segPathPrefix,
+ func(suffix, absolutePath string) error {
+ _, err := sc.load(ctx, suffix, absolutePath)
+ return err
+ })
+ if err != nil {
+ return err
+ }
+ if sc.Current() == nil {
+ _, err = sc.create(ctx, sc.intervalRule.Unit.Format(time.Now()))
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (sc *segmentController) create(ctx context.Context, suffix string) (*segment, error) {
+ segPath, err := mkdir(segTemplate, sc.location, suffix)
+ if err != nil {
+ return nil, err
+ }
+ return sc.load(ctx, suffix, segPath)
+}
+
+func (sc *segmentController) load(ctx context.Context, suffix, path string) (seg *segment, err error) {
+ seg, err = openSegment(ctx, suffix, path, sc.intervalRule)
+ if err != nil {
+ return nil, err
+ }
+ {
+ sc.Lock()
+ defer sc.Unlock()
+ sc.lst = append(sc.lst, seg)
+ }
+ return seg, nil
+}
+
+func (sc *segmentController) close() {
+ for _, s := range sc.lst {
+ s.close()
+ }
+}
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
new file mode 100644
index 0000000..5609f0a
--- /dev/null
+++ b/banyand/tsdb/shard_test.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package tsdb_test
+
+import (
+ "context"
+ "io/ioutil"
+ "strings"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+var _ = Describe("Shard", func() {
+ Describe("Generate segments", func() {
+ var tmp string
+ var deferFn func()
+ var shard tsdb.Shard
+
+ BeforeEach(func() {
+ var err error
+ tmp, deferFn, err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ AfterEach(func() {
+ shard.Close()
+ deferFn()
+ })
+ It("generates several segments", func() {
+ var err error
+ shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp, tsdb.IntervalRule{
+ Unit: tsdb.MILLISECOND,
+ Num: 1000,
+ })
+ Expect(err).NotTo(HaveOccurred())
+ Eventually(func() int {
+ files, err := ioutil.ReadDir(tmp + "/shard-0")
+ Expect(err).NotTo(HaveOccurred())
+ num := 0
+ for _, fi := range files {
+ if fi.IsDir() && strings.HasPrefix(fi.Name(), "seg-") {
+ num++
+ }
+ }
+ return num
+ }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 3))
+ })
+
+ })
+})
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 3d740b0..a3aff1c 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -49,8 +49,11 @@ const (
blockTemplate = rootPrefix + blockPathPrefix + "-%s"
globalIndexTemplate = rootPrefix + "index"
- segFormat = "20060102"
- blockFormat = "1504"
+ segDayFormat = "20060102"
+ segMonthFormat = "200601"
+ segYearFormat = "2006"
+ segMillisecondFormat = "20060102150405000"
+ blockFormat = "1504"
dirPerm = 0700
)
@@ -125,22 +128,17 @@ func (d *database) Close() error {
}
func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
- db := &database{
- location: opts.Location,
- shardNum: opts.ShardNum,
- }
- parentLogger := ctx.Value(logger.ContextKey)
- if parentLogger != nil {
- if pl, ok := parentLogger.(*logger.Logger); ok {
- db.logger = pl.Named("tsdb")
- }
- }
if opts.EncodingMethod.EncoderPool == nil || opts.EncodingMethod.DecoderPool == nil {
return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to open database")
}
if _, err := mkdir(opts.Location); err != nil {
return nil, err
}
+ db := &database{
+ location: opts.Location,
+ shardNum: opts.ShardNum,
+ logger: logger.Fetch(ctx, "tsdb"),
+ }
db.logger.Info().Str("path", opts.Location).Msg("initialized")
var entries []fs.FileInfo
var err error
@@ -165,13 +163,11 @@ func initDatabase(ctx context.Context, db *database) (Database, error) {
func createDatabase(ctx context.Context, db *database, startID int) (Database, error) {
var err error
for i := startID; i < int(db.shardNum); i++ {
- shardLocation, errInternal := mkdir(shardTemplate, db.location, i)
- if errInternal != nil {
- err = multierr.Append(err, errInternal)
- continue
- }
- db.logger.Info().Int("shard_id", i).Str("path", shardLocation).Msg("creating a shard")
- so, errNewShard := openShard(ctx, common.ShardID(i), shardLocation)
+ db.logger.Info().Int("shard_id", i).Msg("creating a shard")
+ so, errNewShard := OpenShard(ctx, common.ShardID(i), db.location, IntervalRule{
+ Unit: DAY,
+ Num: 1,
+ })
if errNewShard != nil {
err = multierr.Append(err, errNewShard)
continue
@@ -186,17 +182,23 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) {
//TODO: open the manifest file
db.Lock()
defer db.Unlock()
- err := walkDir(db.location, shardPathPrefix, func(name, absolutePath string) error {
- shardSegs := strings.Split(name, "-")
- shardID, err := strconv.Atoi(shardSegs[1])
+ err := walkDir(db.location, shardPathPrefix, func(suffix, _ string) error {
+ shardID, err := strconv.Atoi(suffix)
if err != nil {
return err
}
if shardID >= int(db.shardNum) {
return nil
}
- db.logger.Info().Int("shard_id", shardID).Str("path", absolutePath).Msg("opening a shard")
- so, errOpenShard := openShard(context.WithValue(ctx, logger.ContextKey, db.logger), common.ShardID(shardID), absolutePath)
+ db.logger.Info().Int("shard_id", shardID).Msg("opening a existing shard")
+ so, errOpenShard := OpenShard(
+ context.WithValue(ctx, logger.ContextKey, db.logger),
+ common.ShardID(shardID),
+ db.location,
+ IntervalRule{
+ Unit: DAY,
+ Num: 1,
+ })
if errOpenShard != nil {
return errOpenShard
}
@@ -218,7 +220,7 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) {
return db, nil
}
-type walkFn func(name, absolutePath string) error
+type walkFn func(suffix, absolutePath string) error
func walkDir(root, prefix string, walkFn walkFn) error {
files, err := ioutil.ReadDir(root)
@@ -229,7 +231,8 @@ func walkDir(root, prefix string, walkFn walkFn) error {
if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) {
continue
}
- errWalk := walkFn(f.Name(), fmt.Sprintf(rootPrefix, root)+f.Name())
+ segs := strings.Split(f.Name(), "-")
+ errWalk := walkFn(segs[len(segs)-1], fmt.Sprintf(rootPrefix, root)+f.Name())
if errWalk != nil {
return errors.WithMessagef(errWalk, "failed to load: %s", f.Name())
}
diff --git a/pkg/logger/logger.go b/banyand/tsdb/tsdb_suite_test.go
similarity index 53%
copy from pkg/logger/logger.go
copy to banyand/tsdb/tsdb_suite_test.go
index 36896ed..c761503 100644
--- a/pkg/logger/logger.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -14,40 +14,26 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
-package logger
+//
+package tsdb_test
import (
- "strings"
-
- "github.com/pkg/errors"
- "github.com/rs/zerolog"
-)
-
-var ContextKey = contextKey{}
-var ErrNoLoggerInContext = errors.New("no logger in context")
+ "testing"
-type contextKey struct{}
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
-// Logging is the config info
-type Logging struct {
- Env string
- Level string
-}
-
-// Logger is wrapper for rs/zerolog logger with module, it is singleton.
-type Logger struct {
- module string
- *zerolog.Logger
-}
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
-func (l *Logger) Named(name string) *Logger {
- module := strings.Join([]string{l.module, name}, ".")
- subLogger := root.Logger.With().Str("module", module).Logger()
- return &Logger{module: module, Logger: &subLogger}
+func TestTsdb(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Tsdb Suite")
}
-// Loggable indicates the implement supports logging
-type Loggable interface {
- SetLogger(*Logger)
-}
+var _ = BeforeSuite(func() {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "info",
+ })).Should(Succeed())
+})
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index bfb9957..75c1cb8 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -60,7 +60,7 @@ func verifyDatabaseStructure(tester *assert.Assertions, tempDir string) {
seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
validateDirectory(tester, seriesPath)
now := time.Now()
- segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segFormat))
+ segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segDayFormat))
validateDirectory(tester, segPath)
validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, now.Format(blockFormat)))
}
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 36896ed..98a190a 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -18,6 +18,7 @@
package logger
import (
+ "context"
"strings"
"github.com/pkg/errors"
@@ -51,3 +52,14 @@ func (l *Logger) Named(name string) *Logger {
type Loggable interface {
SetLogger(*Logger)
}
+
+func Fetch(ctx context.Context, name string) *Logger {
+ parentLogger := ctx.Value(ContextKey)
+ if parentLogger == nil {
+ return GetLogger(name)
+ }
+ if pl, ok := parentLogger.(*Logger); ok {
+ return pl.Named(name)
+ }
+ return GetLogger(name)
+}
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index bacb9a1..95f3e3e 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -77,7 +77,9 @@ func executeForShard(series tsdb.SeriesList, timeRange tsdb.TimeRange,
itersInSeries, err := func() ([]tsdb.Iterator, error) {
sp, errInner := seriesFound.Span(timeRange)
defer func(sp tsdb.SeriesSpan) {
- _ = sp.Close()
+ if sp != nil {
+ _ = sp.Close()
+ }
}(sp)
if errInner != nil {
return nil, errInner
diff --git a/pkg/query/logical/plan_indexscan_local.go b/pkg/query/logical/plan_indexscan_local.go
index bd22fd1..a18e62e 100644
--- a/pkg/query/logical/plan_indexscan_local.go
+++ b/pkg/query/logical/plan_indexscan_local.go
@@ -108,7 +108,7 @@ func (uis *unresolvedIndexScan) Analyze(s Schema) (Plan, error) {
return &localIndexScan{
orderBy: orderBySubPlan,
- timeRange: tsdb.NewTimeRange(uis.startTime, uis.endTime),
+ timeRange: tsdb.NewInclusiveTimeRange(uis.startTime, uis.endTime),
schema: s,
projectionFieldRefs: projFieldsRefs,
metadata: uis.metadata,