You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/03 13:53:17 UTC
[skywalking-banyandb] 01/05: Add mem table
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch index-mem
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit d5cce85d98809afb5ebfd6eee5d737e73c1a6ad4
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Jul 26 11:45:59 2021 +0800
Add mem table
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/index/tsdb/field_map.go | 64 ++++++++++
banyand/index/tsdb/mem.go | 91 ++++++++++++++
banyand/index/tsdb/mem_test.go | 261 ++++++++++++++++++++++++++++++++++++++++
banyand/index/tsdb/term_map.go | 141 ++++++++++++++++++++++
banyand/index/tsdb/tsdb.go | 37 ++++++
5 files changed, 594 insertions(+)
diff --git a/banyand/index/tsdb/field_map.go b/banyand/index/tsdb/field_map.go
new file mode 100644
index 0000000..cc80bad
--- /dev/null
+++ b/banyand/index/tsdb/field_map.go
@@ -0,0 +1,64 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "github.com/pkg/errors"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var ErrFieldAbsent = errors.New("field doesn't exist")
+
+type fieldHashID uint64
+
+type fieldMap struct {
+ repo map[fieldHashID]*fieldValue
+}
+
+func newFieldMap(initialSize int) *fieldMap {
+ return &fieldMap{
+ repo: make(map[fieldHashID]*fieldValue, initialSize),
+ }
+}
+
+func (fm *fieldMap) createKey(key []byte) {
+ fm.repo[fieldHashID(convert.Hash(key))] = &fieldValue{
+ key: key,
+ value: newPostingMap(),
+ }
+}
+
+func (fm *fieldMap) get(key []byte) (*fieldValue, bool) {
+ v, ok := fm.repo[fieldHashID(convert.Hash(key))]
+ return v, ok
+}
+
+func (fm *fieldMap) put(fv *Field, id common.ChunkID) error {
+ pm, ok := fm.get(fv.name)
+ if !ok {
+ return errors.Wrapf(ErrFieldAbsent, "filed name:%s", fv.name)
+ }
+ return pm.value.put(fv.value, id)
+}
+
+type fieldValue struct {
+ key []byte
+ value *postingMap
+}
diff --git a/banyand/index/tsdb/mem.go b/banyand/index/tsdb/mem.go
new file mode 100644
index 0000000..39af209
--- /dev/null
+++ b/banyand/index/tsdb/mem.go
@@ -0,0 +1,91 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "github.com/pkg/errors"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+var ErrFieldsAbsent = errors.New("fields are absent")
+
+var emptyPostingList = roaring.NewPostingList()
+
+type MemTable struct {
+ terms *fieldMap
+ name string
+ group string
+}
+
+func NewMemTable(name, group string) *MemTable {
+ return &MemTable{
+ name: name,
+ group: group,
+ }
+}
+
+type Field struct {
+ name []byte
+ value []byte
+}
+
+type FieldSpec struct {
+ Name string
+}
+
+func (m *MemTable) Initialize(fields []FieldSpec) error {
+ if len(fields) < 1 {
+ return ErrFieldsAbsent
+ }
+ m.terms = newFieldMap(len(fields))
+ for _, f := range fields {
+ m.terms.createKey([]byte(f.Name))
+ }
+ return nil
+}
+
+func (m *MemTable) Insert(field *Field, chunkID common.ChunkID) error {
+ return m.terms.put(field, chunkID)
+}
+
+func (m *MemTable) MatchField(fieldName []byte) (list posting.List) {
+ fieldsValues, ok := m.terms.get(fieldName)
+ if !ok {
+ return emptyPostingList
+ }
+ return fieldsValues.value.allValues()
+}
+
+func (m *MemTable) MatchTerms(field *Field) (list posting.List) {
+ fieldsValues, ok := m.terms.get(field.name)
+ if !ok {
+ return emptyPostingList
+ }
+ return fieldsValues.value.get(field.value)
+}
+
+func (m *MemTable) Range(fieldName []byte, opts *RangeOpts) (list posting.List) {
+ fieldsValues, ok := m.terms.get(fieldName)
+ if !ok {
+ return emptyPostingList
+ }
+ return fieldsValues.value.getRange(opts)
+}
diff --git a/banyand/index/tsdb/mem_test.go b/banyand/index/tsdb/mem_test.go
new file mode 100644
index 0000000..0ab2af2
--- /dev/null
+++ b/banyand/index/tsdb/mem_test.go
@@ -0,0 +1,261 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+func TestMemTable_Initialize(t *testing.T) {
+ type args struct {
+ fields []FieldSpec
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "golden path",
+ args: args{
+ fields: []FieldSpec{
+ {
+ Name: "service_name",
+ },
+ {
+ Name: "duration",
+ },
+ },
+ },
+ },
+ {
+ name: "fields absent",
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := NewMemTable("sw", "group")
+ var err error
+ if err = m.Initialize(tt.args.fields); (err != nil) != tt.wantErr {
+ t.Errorf("Initialize() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ if err != nil {
+ return
+ }
+ assert.Equal(t, len(m.terms.repo), len(tt.args.fields))
+ })
+ }
+}
+
+func TestMemTable_Range(t *testing.T) {
+ type args struct {
+ fieldName []byte
+ opts *RangeOpts
+ }
+ m := NewMemTable("sw", "group")
+ setUp(t, m)
+ tests := []struct {
+ name string
+ args args
+ wantList posting.List
+ }{
+ {
+ name: "in range",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(100),
+ Upper: convert.Uint16ToBytes(500),
+ },
+ },
+ wantList: m.MatchTerms(&Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(200),
+ }),
+ },
+ {
+ name: "excludes edge",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(50),
+ Upper: convert.Uint16ToBytes(1000),
+ },
+ },
+ wantList: union(m,
+ &Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(200),
+ },
+ ),
+ },
+ {
+ name: "includes lower",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(50),
+ Upper: convert.Uint16ToBytes(1000),
+ IncludesLower: true,
+ },
+ },
+ wantList: union(m,
+ &Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(50),
+ },
+ &Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(200),
+ },
+ ),
+ },
+ {
+ name: "includes upper",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(50),
+ Upper: convert.Uint16ToBytes(1000),
+ IncludesUpper: true,
+ },
+ },
+ wantList: union(m,
+ &Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(200),
+ },
+ &Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(1000),
+ },
+ ),
+ },
+ {
+ name: "includes edges",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(50),
+ Upper: convert.Uint16ToBytes(1000),
+ IncludesUpper: true,
+ IncludesLower: true,
+ },
+ },
+ wantList: union(m,
+ &Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(50),
+ },
+ &Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(200),
+ },
+ &Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(1000),
+ },
+ ),
+ },
+ {
+ name: "match one",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(200),
+ Upper: convert.Uint16ToBytes(200),
+ IncludesUpper: true,
+ IncludesLower: true,
+ },
+ },
+ wantList: union(m,
+ &Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(200),
+ },
+ ),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if gotList := m.Range(tt.args.fieldName, tt.args.opts); !reflect.DeepEqual(gotList, tt.wantList) {
+ t.Errorf("Range() = %v, want %v", gotList.Len(), tt.wantList.Len())
+ }
+ })
+ }
+}
+
+func union(memTable *MemTable, fields ...*Field) posting.List {
+ result := roaring.NewPostingList()
+ for _, f := range fields {
+ _ = result.Union(memTable.MatchTerms(f))
+ }
+ return result
+}
+
+func setUp(t *testing.T, mt *MemTable) {
+ assert.NoError(t, mt.Initialize([]FieldSpec{
+ {
+ Name: "service_name",
+ },
+ {
+ Name: "duration",
+ },
+ }))
+ for i := 0; i < 100; i++ {
+ if i%2 == 0 {
+ assert.NoError(t, mt.Insert(&Field{
+ name: []byte("service_name"),
+ value: []byte("gateway"),
+ }, common.ChunkID(i)))
+ } else {
+ assert.NoError(t, mt.Insert(&Field{
+ name: []byte("service_name"),
+ value: []byte("webpage"),
+ }, common.ChunkID(i)))
+ }
+ }
+ for i := 100; i < 200; i++ {
+ switch {
+ case i%3 == 0:
+ assert.NoError(t, mt.Insert(&Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(50),
+ }, common.ChunkID(i)))
+ case i%3 == 1:
+ assert.NoError(t, mt.Insert(&Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(200),
+ }, common.ChunkID(i)))
+ case i%3 == 2:
+ assert.NoError(t, mt.Insert(&Field{
+ name: []byte("duration"),
+ value: convert.Uint16ToBytes(1000),
+ }, common.ChunkID(i)))
+ }
+ }
+}
diff --git a/banyand/index/tsdb/term_map.go b/banyand/index/tsdb/term_map.go
new file mode 100644
index 0000000..77d84ef
--- /dev/null
+++ b/banyand/index/tsdb/term_map.go
@@ -0,0 +1,141 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "bytes"
+ "sort"
+ "sync"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+type termHashID uint64
+
+type postingMap struct {
+ repo map[termHashID]*postingValue
+ mutex sync.RWMutex
+}
+
+func newPostingMap() *postingMap {
+ return &postingMap{
+ repo: make(map[termHashID]*postingValue),
+ }
+}
+
+func (p *postingMap) put(key []byte, id common.ChunkID) error {
+ list := p.getOrCreate(key)
+ list.Insert(id)
+ return nil
+}
+
+func (p *postingMap) getOrCreate(key []byte) posting.List {
+ list := p.get(key)
+ if list != emptyPostingList {
+ return list
+ }
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ hashedKey := termHashID(convert.Hash(key))
+ v := &postingValue{
+ key: key,
+ value: roaring.NewPostingList(),
+ }
+ p.repo[hashedKey] = v
+ return v.value
+}
+
+func (p *postingMap) get(key []byte) posting.List {
+ hashedKey := termHashID(convert.Hash(key))
+ v, ok := p.repo[hashedKey]
+ if !ok {
+ return emptyPostingList
+ }
+ return v.value
+}
+
+func (p *postingMap) allValues() posting.List {
+ result := roaring.NewPostingList()
+ for _, value := range p.repo {
+ _ = result.Union(value.value)
+ }
+ return result
+}
+
+func (p *postingMap) getRange(opts *RangeOpts) posting.List {
+ switch bytes.Compare(opts.Upper, opts.Lower) {
+ case -1:
+ return emptyPostingList
+ case 0:
+ if opts.IncludesUpper && opts.IncludesLower {
+ return p.get(opts.Upper)
+ } else {
+ return emptyPostingList
+ }
+ }
+ keys := make(Asc, 0, len(p.repo))
+ for _, v := range p.repo {
+ keys = append(keys, v.key)
+ }
+ sort.Sort(keys)
+ index := sort.Search(len(keys), func(i int) bool {
+ return bytes.Compare(keys[i], opts.Lower) >= 0
+ })
+ result := roaring.NewPostingList()
+ for i := index; i < len(keys); i++ {
+ k := keys[i]
+ switch {
+ case bytes.Equal(k, opts.Lower):
+ if opts.IncludesLower {
+ _ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
+ }
+ case bytes.Compare(k, opts.Upper) > 0:
+ break
+ case bytes.Equal(k, opts.Upper):
+ if opts.IncludesUpper {
+ _ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
+ }
+ break
+ default:
+ _ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
+ }
+ }
+ return result
+}
+
+type Asc [][]byte
+
+func (a Asc) Len() int {
+ return len(a)
+}
+
+func (a Asc) Less(i, j int) bool {
+ return bytes.Compare(a[i], a[j]) < 0
+}
+
+func (a Asc) Swap(i, j int) {
+ a[i], a[j] = a[j], a[i]
+}
+
+type postingValue struct {
+ key []byte
+ value posting.List
+}
diff --git a/banyand/index/tsdb/tsdb.go b/banyand/index/tsdb/tsdb.go
new file mode 100644
index 0000000..06c96af
--- /dev/null
+++ b/banyand/index/tsdb/tsdb.go
@@ -0,0 +1,37 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+)
+
+type RangeOpts struct {
+ Upper []byte
+ Lower []byte
+ IncludesUpper bool
+ IncludesLower bool
+}
+
+type Reader interface {
+ Insert(field *Field, chunkID common.ChunkID) error
+ MatchField(fieldNames []byte) (list posting.List)
+ MatchTerms(field *Field) (list posting.List)
+ Range(fieldName []byte, opts *RangeOpts) (list posting.List)
+}