You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/07/29 15:49:00 UTC

[skywalking-banyandb] branch main updated: Remove term metadata store (#148)

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e87c5f  Remove term metadata store (#148)
8e87c5f is described below

commit 8e87c5ff9cad1cc91fa8690cf0fc85981bbfef9d
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Jul 29 23:48:56 2022 +0800

    Remove term metadata store (#148)
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/query/processor.go          |  2 +-
 banyand/tsdb/indexdb.go             |  6 +--
 pkg/index/index.go                  | 34 +---------------
 pkg/index/inverted/inverted.go      | 29 ++++----------
 pkg/index/inverted/mem.go           | 23 +++++------
 pkg/index/iterator.go               | 17 ++++----
 pkg/index/lsm/lsm.go                | 31 ++++-----------
 pkg/index/lsm/search.go             |  6 +--
 pkg/index/metadata/metadata.go      | 79 -------------------------------------
 pkg/index/testcases/service_name.go |  2 +-
 scripts/build/test.mk               |  6 +--
 11 files changed, 45 insertions(+), 190 deletions(-)

diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 95a449c..cb7b01f 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -67,7 +67,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
 		p.log.Warn().Msg("invalid event data type")
 		return
 	}
-	p.log.Info().Msg("received a query event")
+	p.log.Debug().Stringer("criteria", queryCriteria).Msg("received a query request")
 
 	meta := queryCriteria.GetMetadata()
 	ec, err := p.streamService.Stream(meta)
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index 4c8b958..261b39e 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -58,7 +58,7 @@ type indexDB struct {
 
 func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
 	result := make([]GlobalItemID, 0)
-	f, err := field.MarshalStraight()
+	f, err := field.Marshal()
 	if err != nil {
 		return nil, err
 	}
@@ -154,7 +154,7 @@ func (i *indexWriter) WriteLSMIndex(field index.Field) error {
 	if i.scope != nil {
 		field.Key.SeriesID = GlobalSeriesID(i.scope)
 	}
-	key, err := field.MarshalStraight()
+	key, err := field.Marshal()
 	if err != nil {
 		return err
 	}
@@ -165,7 +165,7 @@ func (i *indexWriter) WriteInvertedIndex(field index.Field) error {
 	if i.scope != nil {
 		field.Key.SeriesID = GlobalSeriesID(i.scope)
 	}
-	key, err := field.MarshalStraight()
+	key, err := field.Marshal()
 	if err != nil {
 		return err
 	}
diff --git a/pkg/index/index.go b/pkg/index/index.go
index ebedc32..192a94f 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -27,7 +27,6 @@ import (
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 )
 
@@ -68,40 +67,11 @@ type Field struct {
 	Term []byte
 }
 
-func (f Field) MarshalStraight() ([]byte, error) {
+func (f Field) Marshal() ([]byte, error) {
 	return bytes.Join([][]byte{f.Key.Marshal(), f.Term}, nil), nil
 }
 
-func (f Field) Marshal(term metadata.Term) ([]byte, error) {
-	var t []byte
-	if f.Key.EncodeTerm {
-		var err error
-		t, err = term.ID(f.Term)
-		if err != nil {
-			return nil, errors.Wrap(err, "get term id")
-		}
-		f.Term = t
-	}
-	return f.MarshalStraight()
-}
-
-func (f *Field) Unmarshal(term metadata.Term, raw []byte) error {
-	err := f.UnmarshalStraight(raw)
-	if err != nil {
-		return err
-	}
-	if !f.Key.EncodeTerm {
-		return nil
-	}
-	t, err := term.Literal(f.Term)
-	if err != nil {
-		return errors.Wrap(err, "get term literal from metadata store")
-	}
-	f.Term = t
-	return nil
-}
-
-func (f *Field) UnmarshalStraight(raw []byte) error {
+func (f *Field) Unmarshal(raw []byte) error {
 	fk := &f.Key
 	err := fk.Unmarshal(raw[:len(raw)-8])
 	if err != nil {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 8073d19..c126a6b 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -29,7 +29,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -38,7 +37,6 @@ import (
 var _ index.Store = (*store)(nil)
 
 type store struct {
-	termMetadata      metadata.Term
 	diskTable         kv.IndexStore
 	memTable          *memTable
 	immutableMemTable *memTable
@@ -57,23 +55,15 @@ func NewStore(opts StoreOpts) (index.Store, error) {
 	if err != nil {
 		return nil, err
 	}
-	var md metadata.Term
-	if md, err = metadata.NewTerm(metadata.TermOpts{
-		Path:   opts.Path + "/tmd",
-		Logger: opts.Logger,
-	}); err != nil {
-		return nil, err
-	}
 	return &store{
-		memTable:     newMemTable(),
-		diskTable:    diskTable,
-		termMetadata: md,
-		l:            opts.Logger,
+		memTable:  newMemTable(),
+		diskTable: diskTable,
+		l:         opts.Logger,
 	}, nil
 }
 
 func (s *store) Close() error {
-	return multierr.Combine(s.diskTable.Close(), s.termMetadata.Close())
+	return s.diskTable.Close()
 }
 
 func (s *store) Write(field index.Field, chunkID common.ItemID) error {
@@ -92,7 +82,7 @@ func (s *store) Flush() error {
 		s.memTable = newMemTable()
 	}
 	err := s.diskTable.
-		Handover(s.immutableMemTable.Iter(s.termMetadata))
+		Handover(s.immutableMemTable.Iter())
 	if err != nil {
 		return err
 	}
@@ -104,9 +94,6 @@ func (s *store) Stats() observability.Statistics {
 	stat := s.mainStats()
 	disk := s.diskTable.Stats()
 	stat.MaxMemBytes = disk.MaxMemBytes
-	term := s.termMetadata.Stats()
-	stat.MemBytes += term.MemBytes
-	stat.MaxMemBytes += term.MaxMemBytes
 	return stat
 }
 
@@ -127,7 +114,7 @@ func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, error) {
 }
 
 func (s *store) MatchTerms(field index.Field) (posting.List, error) {
-	f, err := field.Marshal(s.termMetadata)
+	f, err := field.Marshal()
 	if err != nil {
 		return nil, err
 	}
@@ -197,7 +184,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
 		}
 		iters = append(iters, it)
 	}
-	it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.diskTable, s.termMetadata,
+	it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.diskTable,
 		func(term, val []byte, delegated kv.Iterator) (*index.PostingValue, error) {
 			list := roaring.NewPostingList()
 			err := list.Unmarshall(val)
@@ -214,7 +201,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
 				f := index.Field{
 					Key: fieldKey,
 				}
-				err := f.Unmarshal(s.termMetadata, delegated.Key())
+				err := f.Unmarshal(delegated.Key())
 				if err != nil {
 					return nil, err
 				}
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index 4d78f69..1b70b2c 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -28,7 +28,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
 )
@@ -147,14 +146,13 @@ func (m *memTable) MatchTerms(field index.Field) (posting.List, error) {
 var _ kv.Iterator = (*flushIterator)(nil)
 
 type flushIterator struct {
-	fieldIdx     int
-	termIdx      int
-	key          []byte
-	value        []byte
-	fields       *fieldMap
-	valid        bool
-	err          error
-	termMetadata metadata.Term
+	fieldIdx int
+	termIdx  int
+	key      []byte
+	value    []byte
+	fields   *fieldMap
+	valid    bool
+	err      error
 }
 
 func (i *flushIterator) Next() {
@@ -228,7 +226,7 @@ func (i *flushIterator) setCurr() bool {
 		Key:  term.key,
 		Term: value.Term,
 	}
-	i.key, err = f.Marshal(i.termMetadata)
+	i.key, err = f.Marshal()
 	if err != nil {
 		i.err = multierr.Append(i.err, err)
 		return false
@@ -236,9 +234,8 @@ func (i *flushIterator) setCurr() bool {
 	return true
 }
 
-func (m *memTable) Iter(termMetadata metadata.Term) kv.Iterator {
+func (m *memTable) Iter() kv.Iterator {
 	return &flushIterator{
-		fields:       m.fields,
-		termMetadata: termMetadata,
+		fields: m.fields,
 	}
 }
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index 559802f..fe94b44 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -26,7 +26,6 @@ import (
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -89,7 +88,7 @@ func (f *FieldIteratorTemplate) Close() error {
 }
 
 func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, iterable kv.Iterable,
-	metadata metadata.Term, fn CompositePostingValueFn,
+	fn CompositePostingValueFn,
 ) (*FieldIteratorTemplate, error) {
 	if termRange.Upper == nil {
 		termRange.Upper = DefaultUpper
@@ -118,12 +117,12 @@ func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange Ran
 		Key:  fieldKey,
 		Term: term,
 	}
-	seekKey, err := field.Marshal(metadata)
+	seekKey, err := field.Marshal()
 	if err != nil {
 		return nil, err
 	}
 	return &FieldIteratorTemplate{
-		delegated: newDelegateIterator(iter, fieldKey, metadata, l),
+		delegated: newDelegateIterator(iter, fieldKey, l),
 		termRange: termRange,
 		fn:        fn,
 		reverse:   reverse,
@@ -131,11 +130,11 @@ func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange Ran
 	}, nil
 }
 
-func parseKey(fieldKey FieldKey, metadata metadata.Term, key []byte) (Field, error) {
+func parseKey(fieldKey FieldKey, key []byte) (Field, error) {
 	f := &Field{
 		Key: fieldKey,
 	}
-	err := f.Unmarshal(metadata, key)
+	err := f.Unmarshal(key)
 	if err != nil {
 		return *f, err
 	}
@@ -235,20 +234,18 @@ type delegateIterator struct {
 	delegated     kv.Iterator
 	fieldKey      FieldKey
 	fieldKeyBytes []byte
-	metadata      metadata.Term
 	l             *logger.Logger
 
 	curField Field
 	closed   bool
 }
 
-func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, metadata metadata.Term, l *logger.Logger) *delegateIterator {
+func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, l *logger.Logger) *delegateIterator {
 	fieldKeyBytes := fieldKey.Marshal()
 	return &delegateIterator{
 		delegated:     delegated,
 		fieldKey:      fieldKey,
 		fieldKeyBytes: fieldKeyBytes,
-		metadata:      metadata,
 		l:             l,
 	}
 }
@@ -282,7 +279,7 @@ func (di *delegateIterator) Valid() bool {
 		return false
 	}
 	var err error
-	di.curField, err = parseKey(di.fieldKey, di.metadata, di.Key())
+	di.curField, err = parseKey(di.fieldKey, di.Key())
 	if err != nil {
 		di.l.Error().Err(err).Msg("fail to parse field from key")
 		di.Close()
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index b2ef18c..53de61b 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -18,23 +18,19 @@
 package lsm
 
 import (
-	"go.uber.org/multierr"
-
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 var _ index.Store = (*store)(nil)
 
 type store struct {
-	lsm          kv.Store
-	termMetadata metadata.Term
-	l            *logger.Logger
+	lsm kv.Store
+	l   *logger.Logger
 }
 
 func (*store) Flush() error {
@@ -42,20 +38,15 @@ func (*store) Flush() error {
 }
 
 func (s *store) Stats() observability.Statistics {
-	store := s.lsm.Stats()
-	term := s.termMetadata.Stats()
-	return observability.Statistics{
-		MemBytes:    store.MemBytes + term.MemBytes,
-		MaxMemBytes: store.MaxMemBytes + term.MaxMemBytes,
-	}
+	return s.lsm.Stats()
 }
 
 func (s *store) Close() error {
-	return multierr.Combine(s.lsm.Close(), s.termMetadata.Close())
+	return s.lsm.Close()
 }
 
 func (s *store) Write(field index.Field, itemID common.ItemID) error {
-	f, err := field.Marshal(s.termMetadata)
+	f, err := field.Marshal()
 	if err != nil {
 		return err
 	}
@@ -74,16 +65,8 @@ func NewStore(opts StoreOpts) (index.Store, error) {
 	if lsm, err = kv.OpenStore(0, opts.Path+"/lsm", kv.StoreWithLogger(opts.Logger)); err != nil {
 		return nil, err
 	}
-	var md metadata.Term
-	if md, err = metadata.NewTerm(metadata.TermOpts{
-		Path:   opts.Path + "/tmd",
-		Logger: opts.Logger,
-	}); err != nil {
-		return nil, err
-	}
 	return &store{
-		lsm:          lsm,
-		termMetadata: md,
-		l:            opts.Logger,
+		lsm: lsm,
+		l:   opts.Logger,
 	}, nil
 }
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
index 43f1948..5e877fa 100644
--- a/pkg/index/lsm/search.go
+++ b/pkg/index/lsm/search.go
@@ -37,7 +37,7 @@ func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err erro
 }
 
 func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
-	f, err := field.Marshal(s.termMetadata)
+	f, err := field.Marshal()
 	if err != nil {
 		return nil, err
 	}
@@ -66,7 +66,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
 }
 
 func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (index.FieldIterator, error) {
-	return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.lsm, s.termMetadata,
+	return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.lsm,
 		func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) {
 			pv := &index.PostingValue{
 				Term:  term,
@@ -75,7 +75,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord
 
 			for ; delegated.Valid(); delegated.Next() {
 				f := index.Field{}
-				err := f.Unmarshal(s.termMetadata, delegated.Key())
+				err := f.Unmarshal(delegated.Key())
 				if err != nil {
 					return nil, err
 				}
diff --git a/pkg/index/metadata/metadata.go b/pkg/index/metadata/metadata.go
deleted file mode 100644
index 3e150e0..0000000
--- a/pkg/index/metadata/metadata.go
+++ /dev/null
@@ -1,79 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package metadata
-
-import (
-	"io"
-
-	"github.com/pkg/errors"
-
-	"github.com/apache/skywalking-banyandb/banyand/kv"
-	"github.com/apache/skywalking-banyandb/banyand/observability"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/logger"
-)
-
-type Term interface {
-	observability.Observable
-	ID(term []byte) (id []byte, err error)
-	Literal(id []byte) (term []byte, err error)
-	io.Closer
-}
-
-var _ Term = (*term)(nil)
-
-type term struct {
-	store kv.Store
-}
-
-type TermOpts struct {
-	Path   string
-	Logger *logger.Logger
-}
-
-func NewTerm(opts TermOpts) (Term, error) {
-	var store kv.Store
-	var err error
-	if store, err = kv.OpenStore(0, opts.Path, kv.StoreWithNamedLogger("term_metadata", opts.Logger)); err != nil {
-		return nil, err
-	}
-	return &term{
-		store: store,
-	}, nil
-}
-
-func (t *term) ID(term []byte) (id []byte, err error) {
-	id = convert.Uint64ToBytes(convert.Hash(term))
-	_, err = t.store.Get(id)
-	if errors.Is(err, kv.ErrKeyNotFound) {
-		return id, t.store.Put(id, term)
-	}
-	return id, nil
-}
-
-func (t *term) Literal(id []byte) (term []byte, err error) {
-	return t.store.Get(id)
-}
-
-func (t *term) Close() error {
-	return t.store.Close()
-}
-
-func (t *term) Stats() observability.Statistics {
-	return t.store.Stats()
-}
diff --git a/pkg/index/testcases/service_name.go b/pkg/index/testcases/service_name.go
index c6e174d..df5e067 100644
--- a/pkg/index/testcases/service_name.go
+++ b/pkg/index/testcases/service_name.go
@@ -31,7 +31,7 @@ import (
 var serviceName = index.FieldKey{
 	// http_method
 	IndexRuleID: 6,
-	EncodeTerm:  true,
+	EncodeTerm:  false,
 }
 
 func RunServiceName(t *testing.T, store SimpleStore) {
diff --git a/scripts/build/test.mk b/scripts/build/test.mk
index 8a2bdf8..ad65bdf 100644
--- a/scripts/build/test.mk
+++ b/scripts/build/test.mk
@@ -41,13 +41,13 @@ test: $(GINKGO) generate ## Run all the unit tests
 	$(GINKGO) $(TEST_OPTS) $(TEST_EXTRA_OPTS) -tags "$(TEST_TAGS)" $(TEST_PKG_LIST)
 
 .PHONY: test-race
-test-race: TEST_OPTS=-race
+test-race: TEST_OPTS=--race
 test-race: test  ## Run all the unit tests with race detector on
 
 .PHONY: test-coverage
-test-coverage: ## Run all the unit tests with coverage analysis on
+test-coverage: $(GINKGO) generate ## Run all the unit tests with coverage analysis on
 	mkdir -p "$(TEST_COVERAGE_DIR)"
-	go test $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) -coverprofile="$(TEST_COVERAGE_PROFILE)" -tags "$(TEST_TAGS)" $(TEST_COVERAGE_PKG_LIST)
+	$(GINKGO) $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) --coverprofile="$(TEST_COVERAGE_PROFILE)" --tags "$(TEST_TAGS)" $(TEST_COVERAGE_PKG_LIST)
 	go tool cover -html="$(TEST_COVERAGE_PROFILE)" -o "$(TEST_COVERAGE_REPORT)"
 	@echo "Test coverage report has been saved to $(TEST_COVERAGE_REPORT)"