You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/07/29 14:15:17 UTC
[skywalking-banyandb] branch index-term created (now 7733284)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a change to branch index-term
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
at 7733284 Remove term metadata store
This branch includes the following new commits:
new 7733284 Remove term metadata store
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[skywalking-banyandb] 01/01: Remove term metadata store
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch index-term
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7733284fbb67845aa7057fb6be360d4a0119199b
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Jul 29 14:13:10 2022 +0000
Remove term metadata store
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/query/processor.go | 2 +-
banyand/tsdb/indexdb.go | 6 +--
pkg/index/index.go | 34 +---------------
pkg/index/inverted/inverted.go | 29 ++++----------
pkg/index/inverted/mem.go | 23 +++++------
pkg/index/iterator.go | 17 ++++----
pkg/index/lsm/lsm.go | 31 ++++-----------
pkg/index/lsm/search.go | 6 +--
pkg/index/metadata/metadata.go | 79 -------------------------------------
pkg/index/testcases/service_name.go | 2 +-
scripts/build/test.mk | 6 +--
11 files changed, 45 insertions(+), 190 deletions(-)
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 95a449c..cb7b01f 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -67,7 +67,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
p.log.Warn().Msg("invalid event data type")
return
}
- p.log.Info().Msg("received a query event")
+ p.log.Debug().Stringer("criteria", queryCriteria).Msg("received a query request")
meta := queryCriteria.GetMetadata()
ec, err := p.streamService.Stream(meta)
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index 4c8b958..261b39e 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -58,7 +58,7 @@ type indexDB struct {
func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
result := make([]GlobalItemID, 0)
- f, err := field.MarshalStraight()
+ f, err := field.Marshal()
if err != nil {
return nil, err
}
@@ -154,7 +154,7 @@ func (i *indexWriter) WriteLSMIndex(field index.Field) error {
if i.scope != nil {
field.Key.SeriesID = GlobalSeriesID(i.scope)
}
- key, err := field.MarshalStraight()
+ key, err := field.Marshal()
if err != nil {
return err
}
@@ -165,7 +165,7 @@ func (i *indexWriter) WriteInvertedIndex(field index.Field) error {
if i.scope != nil {
field.Key.SeriesID = GlobalSeriesID(i.scope)
}
- key, err := field.MarshalStraight()
+ key, err := field.Marshal()
if err != nil {
return err
}
diff --git a/pkg/index/index.go b/pkg/index/index.go
index ebedc32..192a94f 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -27,7 +27,6 @@ import (
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
- "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
)
@@ -68,40 +67,11 @@ type Field struct {
Term []byte
}
-func (f Field) MarshalStraight() ([]byte, error) {
+func (f Field) Marshal() ([]byte, error) {
return bytes.Join([][]byte{f.Key.Marshal(), f.Term}, nil), nil
}
-func (f Field) Marshal(term metadata.Term) ([]byte, error) {
- var t []byte
- if f.Key.EncodeTerm {
- var err error
- t, err = term.ID(f.Term)
- if err != nil {
- return nil, errors.Wrap(err, "get term id")
- }
- f.Term = t
- }
- return f.MarshalStraight()
-}
-
-func (f *Field) Unmarshal(term metadata.Term, raw []byte) error {
- err := f.UnmarshalStraight(raw)
- if err != nil {
- return err
- }
- if !f.Key.EncodeTerm {
- return nil
- }
- t, err := term.Literal(f.Term)
- if err != nil {
- return errors.Wrap(err, "get term literal from metadata store")
- }
- f.Term = t
- return nil
-}
-
-func (f *Field) UnmarshalStraight(raw []byte) error {
+func (f *Field) Unmarshal(raw []byte) error {
fk := &f.Key
err := fk.Unmarshal(raw[:len(raw)-8])
if err != nil {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 8073d19..c126a6b 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -29,7 +29,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -38,7 +37,6 @@ import (
var _ index.Store = (*store)(nil)
type store struct {
- termMetadata metadata.Term
diskTable kv.IndexStore
memTable *memTable
immutableMemTable *memTable
@@ -57,23 +55,15 @@ func NewStore(opts StoreOpts) (index.Store, error) {
if err != nil {
return nil, err
}
- var md metadata.Term
- if md, err = metadata.NewTerm(metadata.TermOpts{
- Path: opts.Path + "/tmd",
- Logger: opts.Logger,
- }); err != nil {
- return nil, err
- }
return &store{
- memTable: newMemTable(),
- diskTable: diskTable,
- termMetadata: md,
- l: opts.Logger,
+ memTable: newMemTable(),
+ diskTable: diskTable,
+ l: opts.Logger,
}, nil
}
func (s *store) Close() error {
- return multierr.Combine(s.diskTable.Close(), s.termMetadata.Close())
+ return s.diskTable.Close()
}
func (s *store) Write(field index.Field, chunkID common.ItemID) error {
@@ -92,7 +82,7 @@ func (s *store) Flush() error {
s.memTable = newMemTable()
}
err := s.diskTable.
- Handover(s.immutableMemTable.Iter(s.termMetadata))
+ Handover(s.immutableMemTable.Iter())
if err != nil {
return err
}
@@ -104,9 +94,6 @@ func (s *store) Stats() observability.Statistics {
stat := s.mainStats()
disk := s.diskTable.Stats()
stat.MaxMemBytes = disk.MaxMemBytes
- term := s.termMetadata.Stats()
- stat.MemBytes += term.MemBytes
- stat.MaxMemBytes += term.MaxMemBytes
return stat
}
@@ -127,7 +114,7 @@ func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, error) {
}
func (s *store) MatchTerms(field index.Field) (posting.List, error) {
- f, err := field.Marshal(s.termMetadata)
+ f, err := field.Marshal()
if err != nil {
return nil, err
}
@@ -197,7 +184,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
}
iters = append(iters, it)
}
- it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.diskTable, s.termMetadata,
+ it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.diskTable,
func(term, val []byte, delegated kv.Iterator) (*index.PostingValue, error) {
list := roaring.NewPostingList()
err := list.Unmarshall(val)
@@ -214,7 +201,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
f := index.Field{
Key: fieldKey,
}
- err := f.Unmarshal(s.termMetadata, delegated.Key())
+ err := f.Unmarshal(delegated.Key())
if err != nil {
return nil, err
}
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index 4d78f69..1b70b2c 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -28,7 +28,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
)
@@ -147,14 +146,13 @@ func (m *memTable) MatchTerms(field index.Field) (posting.List, error) {
var _ kv.Iterator = (*flushIterator)(nil)
type flushIterator struct {
- fieldIdx int
- termIdx int
- key []byte
- value []byte
- fields *fieldMap
- valid bool
- err error
- termMetadata metadata.Term
+ fieldIdx int
+ termIdx int
+ key []byte
+ value []byte
+ fields *fieldMap
+ valid bool
+ err error
}
func (i *flushIterator) Next() {
@@ -228,7 +226,7 @@ func (i *flushIterator) setCurr() bool {
Key: term.key,
Term: value.Term,
}
- i.key, err = f.Marshal(i.termMetadata)
+ i.key, err = f.Marshal()
if err != nil {
i.err = multierr.Append(i.err, err)
return false
@@ -236,9 +234,8 @@ func (i *flushIterator) setCurr() bool {
return true
}
-func (m *memTable) Iter(termMetadata metadata.Term) kv.Iterator {
+func (m *memTable) Iter() kv.Iterator {
return &flushIterator{
- fields: m.fields,
- termMetadata: termMetadata,
+ fields: m.fields,
}
}
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index 559802f..fe94b44 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -26,7 +26,6 @@ import (
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
- "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -89,7 +88,7 @@ func (f *FieldIteratorTemplate) Close() error {
}
func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, iterable kv.Iterable,
- metadata metadata.Term, fn CompositePostingValueFn,
+ fn CompositePostingValueFn,
) (*FieldIteratorTemplate, error) {
if termRange.Upper == nil {
termRange.Upper = DefaultUpper
@@ -118,12 +117,12 @@ func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange Ran
Key: fieldKey,
Term: term,
}
- seekKey, err := field.Marshal(metadata)
+ seekKey, err := field.Marshal()
if err != nil {
return nil, err
}
return &FieldIteratorTemplate{
- delegated: newDelegateIterator(iter, fieldKey, metadata, l),
+ delegated: newDelegateIterator(iter, fieldKey, l),
termRange: termRange,
fn: fn,
reverse: reverse,
@@ -131,11 +130,11 @@ func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange Ran
}, nil
}
-func parseKey(fieldKey FieldKey, metadata metadata.Term, key []byte) (Field, error) {
+func parseKey(fieldKey FieldKey, key []byte) (Field, error) {
f := &Field{
Key: fieldKey,
}
- err := f.Unmarshal(metadata, key)
+ err := f.Unmarshal(key)
if err != nil {
return *f, err
}
@@ -235,20 +234,18 @@ type delegateIterator struct {
delegated kv.Iterator
fieldKey FieldKey
fieldKeyBytes []byte
- metadata metadata.Term
l *logger.Logger
curField Field
closed bool
}
-func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, metadata metadata.Term, l *logger.Logger) *delegateIterator {
+func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, l *logger.Logger) *delegateIterator {
fieldKeyBytes := fieldKey.Marshal()
return &delegateIterator{
delegated: delegated,
fieldKey: fieldKey,
fieldKeyBytes: fieldKeyBytes,
- metadata: metadata,
l: l,
}
}
@@ -282,7 +279,7 @@ func (di *delegateIterator) Valid() bool {
return false
}
var err error
- di.curField, err = parseKey(di.fieldKey, di.metadata, di.Key())
+ di.curField, err = parseKey(di.fieldKey, di.Key())
if err != nil {
di.l.Error().Err(err).Msg("fail to parse field from key")
di.Close()
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index b2ef18c..53de61b 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -18,23 +18,19 @@
package lsm
import (
- "go.uber.org/multierr"
-
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
var _ index.Store = (*store)(nil)
type store struct {
- lsm kv.Store
- termMetadata metadata.Term
- l *logger.Logger
+ lsm kv.Store
+ l *logger.Logger
}
func (*store) Flush() error {
@@ -42,20 +38,15 @@ func (*store) Flush() error {
}
func (s *store) Stats() observability.Statistics {
- store := s.lsm.Stats()
- term := s.termMetadata.Stats()
- return observability.Statistics{
- MemBytes: store.MemBytes + term.MemBytes,
- MaxMemBytes: store.MaxMemBytes + term.MaxMemBytes,
- }
+ return s.lsm.Stats()
}
func (s *store) Close() error {
- return multierr.Combine(s.lsm.Close(), s.termMetadata.Close())
+ return s.lsm.Close()
}
func (s *store) Write(field index.Field, itemID common.ItemID) error {
- f, err := field.Marshal(s.termMetadata)
+ f, err := field.Marshal()
if err != nil {
return err
}
@@ -74,16 +65,8 @@ func NewStore(opts StoreOpts) (index.Store, error) {
if lsm, err = kv.OpenStore(0, opts.Path+"/lsm", kv.StoreWithLogger(opts.Logger)); err != nil {
return nil, err
}
- var md metadata.Term
- if md, err = metadata.NewTerm(metadata.TermOpts{
- Path: opts.Path + "/tmd",
- Logger: opts.Logger,
- }); err != nil {
- return nil, err
- }
return &store{
- lsm: lsm,
- termMetadata: md,
- l: opts.Logger,
+ lsm: lsm,
+ l: opts.Logger,
}, nil
}
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
index 43f1948..5e877fa 100644
--- a/pkg/index/lsm/search.go
+++ b/pkg/index/lsm/search.go
@@ -37,7 +37,7 @@ func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err erro
}
func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
- f, err := field.Marshal(s.termMetadata)
+ f, err := field.Marshal()
if err != nil {
return nil, err
}
@@ -66,7 +66,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
}
func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (index.FieldIterator, error) {
- return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.lsm, s.termMetadata,
+ return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.lsm,
func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) {
pv := &index.PostingValue{
Term: term,
@@ -75,7 +75,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord
for ; delegated.Valid(); delegated.Next() {
f := index.Field{}
- err := f.Unmarshal(s.termMetadata, delegated.Key())
+ err := f.Unmarshal(delegated.Key())
if err != nil {
return nil, err
}
diff --git a/pkg/index/metadata/metadata.go b/pkg/index/metadata/metadata.go
deleted file mode 100644
index 3e150e0..0000000
--- a/pkg/index/metadata/metadata.go
+++ /dev/null
@@ -1,79 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package metadata
-
-import (
- "io"
-
- "github.com/pkg/errors"
-
- "github.com/apache/skywalking-banyandb/banyand/kv"
- "github.com/apache/skywalking-banyandb/banyand/observability"
- "github.com/apache/skywalking-banyandb/pkg/convert"
- "github.com/apache/skywalking-banyandb/pkg/logger"
-)
-
-type Term interface {
- observability.Observable
- ID(term []byte) (id []byte, err error)
- Literal(id []byte) (term []byte, err error)
- io.Closer
-}
-
-var _ Term = (*term)(nil)
-
-type term struct {
- store kv.Store
-}
-
-type TermOpts struct {
- Path string
- Logger *logger.Logger
-}
-
-func NewTerm(opts TermOpts) (Term, error) {
- var store kv.Store
- var err error
- if store, err = kv.OpenStore(0, opts.Path, kv.StoreWithNamedLogger("term_metadata", opts.Logger)); err != nil {
- return nil, err
- }
- return &term{
- store: store,
- }, nil
-}
-
-func (t *term) ID(term []byte) (id []byte, err error) {
- id = convert.Uint64ToBytes(convert.Hash(term))
- _, err = t.store.Get(id)
- if errors.Is(err, kv.ErrKeyNotFound) {
- return id, t.store.Put(id, term)
- }
- return id, nil
-}
-
-func (t *term) Literal(id []byte) (term []byte, err error) {
- return t.store.Get(id)
-}
-
-func (t *term) Close() error {
- return t.store.Close()
-}
-
-func (t *term) Stats() observability.Statistics {
- return t.store.Stats()
-}
diff --git a/pkg/index/testcases/service_name.go b/pkg/index/testcases/service_name.go
index c6e174d..df5e067 100644
--- a/pkg/index/testcases/service_name.go
+++ b/pkg/index/testcases/service_name.go
@@ -31,7 +31,7 @@ import (
var serviceName = index.FieldKey{
// http_method
IndexRuleID: 6,
- EncodeTerm: true,
+ EncodeTerm: false,
}
func RunServiceName(t *testing.T, store SimpleStore) {
diff --git a/scripts/build/test.mk b/scripts/build/test.mk
index 8a2bdf8..ad65bdf 100644
--- a/scripts/build/test.mk
+++ b/scripts/build/test.mk
@@ -41,13 +41,13 @@ test: $(GINKGO) generate ## Run all the unit tests
$(GINKGO) $(TEST_OPTS) $(TEST_EXTRA_OPTS) -tags "$(TEST_TAGS)" $(TEST_PKG_LIST)
.PHONY: test-race
-test-race: TEST_OPTS=-race
+test-race: TEST_OPTS=--race
test-race: test ## Run all the unit tests with race detector on
.PHONY: test-coverage
-test-coverage: ## Run all the unit tests with coverage analysis on
+test-coverage: $(GINKGO) generate ## Run all the unit tests with coverage analysis on
mkdir -p "$(TEST_COVERAGE_DIR)"
- go test $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) -coverprofile="$(TEST_COVERAGE_PROFILE)" -tags "$(TEST_TAGS)" $(TEST_COVERAGE_PKG_LIST)
+ $(GINKGO) $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) --coverprofile="$(TEST_COVERAGE_PROFILE)" --tags "$(TEST_TAGS)" $(TEST_COVERAGE_PKG_LIST)
go tool cover -html="$(TEST_COVERAGE_PROFILE)" -o "$(TEST_COVERAGE_REPORT)"
@echo "Test coverage report has been saved to $(TEST_COVERAGE_REPORT)"