You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/07/29 14:15:17 UTC

[skywalking-banyandb] branch index-term created (now 7733284)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch index-term
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at 7733284  Remove term metadata store

This branch includes the following new commits:

     new 7733284  Remove term metadata store

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: Remove term metadata store

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch index-term
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 7733284fbb67845aa7057fb6be360d4a0119199b
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Jul 29 14:13:10 2022 +0000

    Remove term metadata store
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/query/processor.go          |  2 +-
 banyand/tsdb/indexdb.go             |  6 +--
 pkg/index/index.go                  | 34 +---------------
 pkg/index/inverted/inverted.go      | 29 ++++----------
 pkg/index/inverted/mem.go           | 23 +++++------
 pkg/index/iterator.go               | 17 ++++----
 pkg/index/lsm/lsm.go                | 31 ++++-----------
 pkg/index/lsm/search.go             |  6 +--
 pkg/index/metadata/metadata.go      | 79 -------------------------------------
 pkg/index/testcases/service_name.go |  2 +-
 scripts/build/test.mk               |  6 +--
 11 files changed, 45 insertions(+), 190 deletions(-)

diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 95a449c..cb7b01f 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -67,7 +67,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
 		p.log.Warn().Msg("invalid event data type")
 		return
 	}
-	p.log.Info().Msg("received a query event")
+	p.log.Debug().Stringer("criteria", queryCriteria).Msg("received a query request")
 
 	meta := queryCriteria.GetMetadata()
 	ec, err := p.streamService.Stream(meta)
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index 4c8b958..261b39e 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -58,7 +58,7 @@ type indexDB struct {
 
 func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
 	result := make([]GlobalItemID, 0)
-	f, err := field.MarshalStraight()
+	f, err := field.Marshal()
 	if err != nil {
 		return nil, err
 	}
@@ -154,7 +154,7 @@ func (i *indexWriter) WriteLSMIndex(field index.Field) error {
 	if i.scope != nil {
 		field.Key.SeriesID = GlobalSeriesID(i.scope)
 	}
-	key, err := field.MarshalStraight()
+	key, err := field.Marshal()
 	if err != nil {
 		return err
 	}
@@ -165,7 +165,7 @@ func (i *indexWriter) WriteInvertedIndex(field index.Field) error {
 	if i.scope != nil {
 		field.Key.SeriesID = GlobalSeriesID(i.scope)
 	}
-	key, err := field.MarshalStraight()
+	key, err := field.Marshal()
 	if err != nil {
 		return err
 	}
diff --git a/pkg/index/index.go b/pkg/index/index.go
index ebedc32..192a94f 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -27,7 +27,6 @@ import (
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 )
 
@@ -68,40 +67,11 @@ type Field struct {
 	Term []byte
 }
 
-func (f Field) MarshalStraight() ([]byte, error) {
+func (f Field) Marshal() ([]byte, error) {
 	return bytes.Join([][]byte{f.Key.Marshal(), f.Term}, nil), nil
 }
 
-func (f Field) Marshal(term metadata.Term) ([]byte, error) {
-	var t []byte
-	if f.Key.EncodeTerm {
-		var err error
-		t, err = term.ID(f.Term)
-		if err != nil {
-			return nil, errors.Wrap(err, "get term id")
-		}
-		f.Term = t
-	}
-	return f.MarshalStraight()
-}
-
-func (f *Field) Unmarshal(term metadata.Term, raw []byte) error {
-	err := f.UnmarshalStraight(raw)
-	if err != nil {
-		return err
-	}
-	if !f.Key.EncodeTerm {
-		return nil
-	}
-	t, err := term.Literal(f.Term)
-	if err != nil {
-		return errors.Wrap(err, "get term literal from metadata store")
-	}
-	f.Term = t
-	return nil
-}
-
-func (f *Field) UnmarshalStraight(raw []byte) error {
+func (f *Field) Unmarshal(raw []byte) error {
 	fk := &f.Key
 	err := fk.Unmarshal(raw[:len(raw)-8])
 	if err != nil {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 8073d19..c126a6b 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -29,7 +29,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -38,7 +37,6 @@ import (
 var _ index.Store = (*store)(nil)
 
 type store struct {
-	termMetadata      metadata.Term
 	diskTable         kv.IndexStore
 	memTable          *memTable
 	immutableMemTable *memTable
@@ -57,23 +55,15 @@ func NewStore(opts StoreOpts) (index.Store, error) {
 	if err != nil {
 		return nil, err
 	}
-	var md metadata.Term
-	if md, err = metadata.NewTerm(metadata.TermOpts{
-		Path:   opts.Path + "/tmd",
-		Logger: opts.Logger,
-	}); err != nil {
-		return nil, err
-	}
 	return &store{
-		memTable:     newMemTable(),
-		diskTable:    diskTable,
-		termMetadata: md,
-		l:            opts.Logger,
+		memTable:  newMemTable(),
+		diskTable: diskTable,
+		l:         opts.Logger,
 	}, nil
 }
 
 func (s *store) Close() error {
-	return multierr.Combine(s.diskTable.Close(), s.termMetadata.Close())
+	return s.diskTable.Close()
 }
 
 func (s *store) Write(field index.Field, chunkID common.ItemID) error {
@@ -92,7 +82,7 @@ func (s *store) Flush() error {
 		s.memTable = newMemTable()
 	}
 	err := s.diskTable.
-		Handover(s.immutableMemTable.Iter(s.termMetadata))
+		Handover(s.immutableMemTable.Iter())
 	if err != nil {
 		return err
 	}
@@ -104,9 +94,6 @@ func (s *store) Stats() observability.Statistics {
 	stat := s.mainStats()
 	disk := s.diskTable.Stats()
 	stat.MaxMemBytes = disk.MaxMemBytes
-	term := s.termMetadata.Stats()
-	stat.MemBytes += term.MemBytes
-	stat.MaxMemBytes += term.MaxMemBytes
 	return stat
 }
 
@@ -127,7 +114,7 @@ func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, error) {
 }
 
 func (s *store) MatchTerms(field index.Field) (posting.List, error) {
-	f, err := field.Marshal(s.termMetadata)
+	f, err := field.Marshal()
 	if err != nil {
 		return nil, err
 	}
@@ -197,7 +184,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
 		}
 		iters = append(iters, it)
 	}
-	it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.diskTable, s.termMetadata,
+	it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.diskTable,
 		func(term, val []byte, delegated kv.Iterator) (*index.PostingValue, error) {
 			list := roaring.NewPostingList()
 			err := list.Unmarshall(val)
@@ -214,7 +201,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
 				f := index.Field{
 					Key: fieldKey,
 				}
-				err := f.Unmarshal(s.termMetadata, delegated.Key())
+				err := f.Unmarshal(delegated.Key())
 				if err != nil {
 					return nil, err
 				}
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index 4d78f69..1b70b2c 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -28,7 +28,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
 )
@@ -147,14 +146,13 @@ func (m *memTable) MatchTerms(field index.Field) (posting.List, error) {
 var _ kv.Iterator = (*flushIterator)(nil)
 
 type flushIterator struct {
-	fieldIdx     int
-	termIdx      int
-	key          []byte
-	value        []byte
-	fields       *fieldMap
-	valid        bool
-	err          error
-	termMetadata metadata.Term
+	fieldIdx int
+	termIdx  int
+	key      []byte
+	value    []byte
+	fields   *fieldMap
+	valid    bool
+	err      error
 }
 
 func (i *flushIterator) Next() {
@@ -228,7 +226,7 @@ func (i *flushIterator) setCurr() bool {
 		Key:  term.key,
 		Term: value.Term,
 	}
-	i.key, err = f.Marshal(i.termMetadata)
+	i.key, err = f.Marshal()
 	if err != nil {
 		i.err = multierr.Append(i.err, err)
 		return false
@@ -236,9 +234,8 @@ func (i *flushIterator) setCurr() bool {
 	return true
 }
 
-func (m *memTable) Iter(termMetadata metadata.Term) kv.Iterator {
+func (m *memTable) Iter() kv.Iterator {
 	return &flushIterator{
-		fields:       m.fields,
-		termMetadata: termMetadata,
+		fields: m.fields,
 	}
 }
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index 559802f..fe94b44 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -26,7 +26,6 @@ import (
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -89,7 +88,7 @@ func (f *FieldIteratorTemplate) Close() error {
 }
 
 func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, iterable kv.Iterable,
-	metadata metadata.Term, fn CompositePostingValueFn,
+	fn CompositePostingValueFn,
 ) (*FieldIteratorTemplate, error) {
 	if termRange.Upper == nil {
 		termRange.Upper = DefaultUpper
@@ -118,12 +117,12 @@ func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange Ran
 		Key:  fieldKey,
 		Term: term,
 	}
-	seekKey, err := field.Marshal(metadata)
+	seekKey, err := field.Marshal()
 	if err != nil {
 		return nil, err
 	}
 	return &FieldIteratorTemplate{
-		delegated: newDelegateIterator(iter, fieldKey, metadata, l),
+		delegated: newDelegateIterator(iter, fieldKey, l),
 		termRange: termRange,
 		fn:        fn,
 		reverse:   reverse,
@@ -131,11 +130,11 @@ func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange Ran
 	}, nil
 }
 
-func parseKey(fieldKey FieldKey, metadata metadata.Term, key []byte) (Field, error) {
+func parseKey(fieldKey FieldKey, key []byte) (Field, error) {
 	f := &Field{
 		Key: fieldKey,
 	}
-	err := f.Unmarshal(metadata, key)
+	err := f.Unmarshal(key)
 	if err != nil {
 		return *f, err
 	}
@@ -235,20 +234,18 @@ type delegateIterator struct {
 	delegated     kv.Iterator
 	fieldKey      FieldKey
 	fieldKeyBytes []byte
-	metadata      metadata.Term
 	l             *logger.Logger
 
 	curField Field
 	closed   bool
 }
 
-func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, metadata metadata.Term, l *logger.Logger) *delegateIterator {
+func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, l *logger.Logger) *delegateIterator {
 	fieldKeyBytes := fieldKey.Marshal()
 	return &delegateIterator{
 		delegated:     delegated,
 		fieldKey:      fieldKey,
 		fieldKeyBytes: fieldKeyBytes,
-		metadata:      metadata,
 		l:             l,
 	}
 }
@@ -282,7 +279,7 @@ func (di *delegateIterator) Valid() bool {
 		return false
 	}
 	var err error
-	di.curField, err = parseKey(di.fieldKey, di.metadata, di.Key())
+	di.curField, err = parseKey(di.fieldKey, di.Key())
 	if err != nil {
 		di.l.Error().Err(err).Msg("fail to parse field from key")
 		di.Close()
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index b2ef18c..53de61b 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -18,23 +18,19 @@
 package lsm
 
 import (
-	"go.uber.org/multierr"
-
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 var _ index.Store = (*store)(nil)
 
 type store struct {
-	lsm          kv.Store
-	termMetadata metadata.Term
-	l            *logger.Logger
+	lsm kv.Store
+	l   *logger.Logger
 }
 
 func (*store) Flush() error {
@@ -42,20 +38,15 @@ func (*store) Flush() error {
 }
 
 func (s *store) Stats() observability.Statistics {
-	store := s.lsm.Stats()
-	term := s.termMetadata.Stats()
-	return observability.Statistics{
-		MemBytes:    store.MemBytes + term.MemBytes,
-		MaxMemBytes: store.MaxMemBytes + term.MaxMemBytes,
-	}
+	return s.lsm.Stats()
 }
 
 func (s *store) Close() error {
-	return multierr.Combine(s.lsm.Close(), s.termMetadata.Close())
+	return s.lsm.Close()
 }
 
 func (s *store) Write(field index.Field, itemID common.ItemID) error {
-	f, err := field.Marshal(s.termMetadata)
+	f, err := field.Marshal()
 	if err != nil {
 		return err
 	}
@@ -74,16 +65,8 @@ func NewStore(opts StoreOpts) (index.Store, error) {
 	if lsm, err = kv.OpenStore(0, opts.Path+"/lsm", kv.StoreWithLogger(opts.Logger)); err != nil {
 		return nil, err
 	}
-	var md metadata.Term
-	if md, err = metadata.NewTerm(metadata.TermOpts{
-		Path:   opts.Path + "/tmd",
-		Logger: opts.Logger,
-	}); err != nil {
-		return nil, err
-	}
 	return &store{
-		lsm:          lsm,
-		termMetadata: md,
-		l:            opts.Logger,
+		lsm: lsm,
+		l:   opts.Logger,
 	}, nil
 }
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
index 43f1948..5e877fa 100644
--- a/pkg/index/lsm/search.go
+++ b/pkg/index/lsm/search.go
@@ -37,7 +37,7 @@ func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err erro
 }
 
 func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
-	f, err := field.Marshal(s.termMetadata)
+	f, err := field.Marshal()
 	if err != nil {
 		return nil, err
 	}
@@ -66,7 +66,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
 }
 
 func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (index.FieldIterator, error) {
-	return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.lsm, s.termMetadata,
+	return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.lsm,
 		func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) {
 			pv := &index.PostingValue{
 				Term:  term,
@@ -75,7 +75,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord
 
 			for ; delegated.Valid(); delegated.Next() {
 				f := index.Field{}
-				err := f.Unmarshal(s.termMetadata, delegated.Key())
+				err := f.Unmarshal(delegated.Key())
 				if err != nil {
 					return nil, err
 				}
diff --git a/pkg/index/metadata/metadata.go b/pkg/index/metadata/metadata.go
deleted file mode 100644
index 3e150e0..0000000
--- a/pkg/index/metadata/metadata.go
+++ /dev/null
@@ -1,79 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package metadata
-
-import (
-	"io"
-
-	"github.com/pkg/errors"
-
-	"github.com/apache/skywalking-banyandb/banyand/kv"
-	"github.com/apache/skywalking-banyandb/banyand/observability"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/logger"
-)
-
-type Term interface {
-	observability.Observable
-	ID(term []byte) (id []byte, err error)
-	Literal(id []byte) (term []byte, err error)
-	io.Closer
-}
-
-var _ Term = (*term)(nil)
-
-type term struct {
-	store kv.Store
-}
-
-type TermOpts struct {
-	Path   string
-	Logger *logger.Logger
-}
-
-func NewTerm(opts TermOpts) (Term, error) {
-	var store kv.Store
-	var err error
-	if store, err = kv.OpenStore(0, opts.Path, kv.StoreWithNamedLogger("term_metadata", opts.Logger)); err != nil {
-		return nil, err
-	}
-	return &term{
-		store: store,
-	}, nil
-}
-
-func (t *term) ID(term []byte) (id []byte, err error) {
-	id = convert.Uint64ToBytes(convert.Hash(term))
-	_, err = t.store.Get(id)
-	if errors.Is(err, kv.ErrKeyNotFound) {
-		return id, t.store.Put(id, term)
-	}
-	return id, nil
-}
-
-func (t *term) Literal(id []byte) (term []byte, err error) {
-	return t.store.Get(id)
-}
-
-func (t *term) Close() error {
-	return t.store.Close()
-}
-
-func (t *term) Stats() observability.Statistics {
-	return t.store.Stats()
-}
diff --git a/pkg/index/testcases/service_name.go b/pkg/index/testcases/service_name.go
index c6e174d..df5e067 100644
--- a/pkg/index/testcases/service_name.go
+++ b/pkg/index/testcases/service_name.go
@@ -31,7 +31,7 @@ import (
 var serviceName = index.FieldKey{
 	// http_method
 	IndexRuleID: 6,
-	EncodeTerm:  true,
+	EncodeTerm:  false,
 }
 
 func RunServiceName(t *testing.T, store SimpleStore) {
diff --git a/scripts/build/test.mk b/scripts/build/test.mk
index 8a2bdf8..ad65bdf 100644
--- a/scripts/build/test.mk
+++ b/scripts/build/test.mk
@@ -41,13 +41,13 @@ test: $(GINKGO) generate ## Run all the unit tests
 	$(GINKGO) $(TEST_OPTS) $(TEST_EXTRA_OPTS) -tags "$(TEST_TAGS)" $(TEST_PKG_LIST)
 
 .PHONY: test-race
-test-race: TEST_OPTS=-race
+test-race: TEST_OPTS=--race
 test-race: test  ## Run all the unit tests with race detector on
 
 .PHONY: test-coverage
-test-coverage: ## Run all the unit tests with coverage analysis on
+test-coverage: $(GINKGO) generate ## Run all the unit tests with coverage analysis on
 	mkdir -p "$(TEST_COVERAGE_DIR)"
-	go test $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) -coverprofile="$(TEST_COVERAGE_PROFILE)" -tags "$(TEST_TAGS)" $(TEST_COVERAGE_PKG_LIST)
+	$(GINKGO) $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) --coverprofile="$(TEST_COVERAGE_PROFILE)" --tags "$(TEST_TAGS)" $(TEST_COVERAGE_PKG_LIST)
 	go tool cover -html="$(TEST_COVERAGE_PROFILE)" -o "$(TEST_COVERAGE_REPORT)"
 	@echo "Test coverage report has been saved to $(TEST_COVERAGE_REPORT)"