You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/03 13:53:17 UTC

[skywalking-banyandb] 01/05: Add mem table

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch index-mem
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit d5cce85d98809afb5ebfd6eee5d737e73c1a6ad4
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Jul 26 11:45:59 2021 +0800

    Add mem table
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/index/tsdb/field_map.go |  64 ++++++++++
 banyand/index/tsdb/mem.go       |  91 ++++++++++++++
 banyand/index/tsdb/mem_test.go  | 261 ++++++++++++++++++++++++++++++++++++++++
 banyand/index/tsdb/term_map.go  | 141 ++++++++++++++++++++++
 banyand/index/tsdb/tsdb.go      |  37 ++++++
 5 files changed, 594 insertions(+)

diff --git a/banyand/index/tsdb/field_map.go b/banyand/index/tsdb/field_map.go
new file mode 100644
index 0000000..cc80bad
--- /dev/null
+++ b/banyand/index/tsdb/field_map.go
@@ -0,0 +1,64 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+	"github.com/pkg/errors"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var ErrFieldAbsent = errors.New("field doesn't exist")
+
+type fieldHashID uint64
+
+type fieldMap struct {
+	repo map[fieldHashID]*fieldValue
+}
+
+func newFieldMap(initialSize int) *fieldMap {
+	return &fieldMap{
+		repo: make(map[fieldHashID]*fieldValue, initialSize),
+	}
+}
+
+func (fm *fieldMap) createKey(key []byte) {
+	fm.repo[fieldHashID(convert.Hash(key))] = &fieldValue{
+		key:   key,
+		value: newPostingMap(),
+	}
+}
+
+func (fm *fieldMap) get(key []byte) (*fieldValue, bool) {
+	v, ok := fm.repo[fieldHashID(convert.Hash(key))]
+	return v, ok
+}
+
+func (fm *fieldMap) put(fv *Field, id common.ChunkID) error {
+	pm, ok := fm.get(fv.name)
+	if !ok {
+		return errors.Wrapf(ErrFieldAbsent, "filed name:%s", fv.name)
+	}
+	return pm.value.put(fv.value, id)
+}
+
+type fieldValue struct {
+	key   []byte
+	value *postingMap
+}
diff --git a/banyand/index/tsdb/mem.go b/banyand/index/tsdb/mem.go
new file mode 100644
index 0000000..39af209
--- /dev/null
+++ b/banyand/index/tsdb/mem.go
@@ -0,0 +1,91 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+	"github.com/pkg/errors"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
+	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+var ErrFieldsAbsent = errors.New("fields are absent")
+
+var emptyPostingList = roaring.NewPostingList()
+
+type MemTable struct {
+	terms *fieldMap
+	name  string
+	group string
+}
+
+func NewMemTable(name, group string) *MemTable {
+	return &MemTable{
+		name:  name,
+		group: group,
+	}
+}
+
+type Field struct {
+	name  []byte
+	value []byte
+}
+
+type FieldSpec struct {
+	Name string
+}
+
+func (m *MemTable) Initialize(fields []FieldSpec) error {
+	if len(fields) < 1 {
+		return ErrFieldsAbsent
+	}
+	m.terms = newFieldMap(len(fields))
+	for _, f := range fields {
+		m.terms.createKey([]byte(f.Name))
+	}
+	return nil
+}
+
+func (m *MemTable) Insert(field *Field, chunkID common.ChunkID) error {
+	return m.terms.put(field, chunkID)
+}
+
+func (m *MemTable) MatchField(fieldName []byte) (list posting.List) {
+	fieldsValues, ok := m.terms.get(fieldName)
+	if !ok {
+		return emptyPostingList
+	}
+	return fieldsValues.value.allValues()
+}
+
+func (m *MemTable) MatchTerms(field *Field) (list posting.List) {
+	fieldsValues, ok := m.terms.get(field.name)
+	if !ok {
+		return emptyPostingList
+	}
+	return fieldsValues.value.get(field.value)
+}
+
+func (m *MemTable) Range(fieldName []byte, opts *RangeOpts) (list posting.List) {
+	fieldsValues, ok := m.terms.get(fieldName)
+	if !ok {
+		return emptyPostingList
+	}
+	return fieldsValues.value.getRange(opts)
+}
diff --git a/banyand/index/tsdb/mem_test.go b/banyand/index/tsdb/mem_test.go
new file mode 100644
index 0000000..0ab2af2
--- /dev/null
+++ b/banyand/index/tsdb/mem_test.go
@@ -0,0 +1,261 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+	"reflect"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
+	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+func TestMemTable_Initialize(t *testing.T) {
+	type args struct {
+		fields []FieldSpec
+	}
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "golden path",
+			args: args{
+				fields: []FieldSpec{
+					{
+						Name: "service_name",
+					},
+					{
+						Name: "duration",
+					},
+				},
+			},
+		},
+		{
+			name:    "fields absent",
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			m := NewMemTable("sw", "group")
+			var err error
+			if err = m.Initialize(tt.args.fields); (err != nil) != tt.wantErr {
+				t.Errorf("Initialize() error = %v, wantErr %v", err, tt.wantErr)
+			}
+			if err != nil {
+				return
+			}
+			assert.Equal(t, len(m.terms.repo), len(tt.args.fields))
+		})
+	}
+}
+
+func TestMemTable_Range(t *testing.T) {
+	type args struct {
+		fieldName []byte
+		opts      *RangeOpts
+	}
+	m := NewMemTable("sw", "group")
+	setUp(t, m)
+	tests := []struct {
+		name     string
+		args     args
+		wantList posting.List
+	}{
+		{
+			name: "in range",
+			args: args{
+				fieldName: []byte("duration"),
+				opts: &RangeOpts{
+					Lower: convert.Uint16ToBytes(100),
+					Upper: convert.Uint16ToBytes(500),
+				},
+			},
+			wantList: m.MatchTerms(&Field{
+				name:  []byte("duration"),
+				value: convert.Uint16ToBytes(200),
+			}),
+		},
+		{
+			name: "excludes edge",
+			args: args{
+				fieldName: []byte("duration"),
+				opts: &RangeOpts{
+					Lower: convert.Uint16ToBytes(50),
+					Upper: convert.Uint16ToBytes(1000),
+				},
+			},
+			wantList: union(m,
+				&Field{
+					name:  []byte("duration"),
+					value: convert.Uint16ToBytes(200),
+				},
+			),
+		},
+		{
+			name: "includes lower",
+			args: args{
+				fieldName: []byte("duration"),
+				opts: &RangeOpts{
+					Lower:         convert.Uint16ToBytes(50),
+					Upper:         convert.Uint16ToBytes(1000),
+					IncludesLower: true,
+				},
+			},
+			wantList: union(m,
+				&Field{
+					name:  []byte("duration"),
+					value: convert.Uint16ToBytes(50),
+				},
+				&Field{
+					name:  []byte("duration"),
+					value: convert.Uint16ToBytes(200),
+				},
+			),
+		},
+		{
+			name: "includes upper",
+			args: args{
+				fieldName: []byte("duration"),
+				opts: &RangeOpts{
+					Lower:         convert.Uint16ToBytes(50),
+					Upper:         convert.Uint16ToBytes(1000),
+					IncludesUpper: true,
+				},
+			},
+			wantList: union(m,
+				&Field{
+					name:  []byte("duration"),
+					value: convert.Uint16ToBytes(200),
+				},
+				&Field{
+					name:  []byte("duration"),
+					value: convert.Uint16ToBytes(1000),
+				},
+			),
+		},
+		{
+			name: "includes edges",
+			args: args{
+				fieldName: []byte("duration"),
+				opts: &RangeOpts{
+					Lower:         convert.Uint16ToBytes(50),
+					Upper:         convert.Uint16ToBytes(1000),
+					IncludesUpper: true,
+					IncludesLower: true,
+				},
+			},
+			wantList: union(m,
+				&Field{
+					name:  []byte("duration"),
+					value: convert.Uint16ToBytes(50),
+				},
+				&Field{
+					name:  []byte("duration"),
+					value: convert.Uint16ToBytes(200),
+				},
+				&Field{
+					name:  []byte("duration"),
+					value: convert.Uint16ToBytes(1000),
+				},
+			),
+		},
+		{
+			name: "match one",
+			args: args{
+				fieldName: []byte("duration"),
+				opts: &RangeOpts{
+					Lower:         convert.Uint16ToBytes(200),
+					Upper:         convert.Uint16ToBytes(200),
+					IncludesUpper: true,
+					IncludesLower: true,
+				},
+			},
+			wantList: union(m,
+				&Field{
+					name:  []byte("duration"),
+					value: convert.Uint16ToBytes(200),
+				},
+			),
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if gotList := m.Range(tt.args.fieldName, tt.args.opts); !reflect.DeepEqual(gotList, tt.wantList) {
+				t.Errorf("Range() = %v, want %v", gotList.Len(), tt.wantList.Len())
+			}
+		})
+	}
+}
+
+func union(memTable *MemTable, fields ...*Field) posting.List {
+	result := roaring.NewPostingList()
+	for _, f := range fields {
+		_ = result.Union(memTable.MatchTerms(f))
+	}
+	return result
+}
+
+func setUp(t *testing.T, mt *MemTable) {
+	assert.NoError(t, mt.Initialize([]FieldSpec{
+		{
+			Name: "service_name",
+		},
+		{
+			Name: "duration",
+		},
+	}))
+	for i := 0; i < 100; i++ {
+		if i%2 == 0 {
+			assert.NoError(t, mt.Insert(&Field{
+				name:  []byte("service_name"),
+				value: []byte("gateway"),
+			}, common.ChunkID(i)))
+		} else {
+			assert.NoError(t, mt.Insert(&Field{
+				name:  []byte("service_name"),
+				value: []byte("webpage"),
+			}, common.ChunkID(i)))
+		}
+	}
+	for i := 100; i < 200; i++ {
+		switch {
+		case i%3 == 0:
+			assert.NoError(t, mt.Insert(&Field{
+				name:  []byte("duration"),
+				value: convert.Uint16ToBytes(50),
+			}, common.ChunkID(i)))
+		case i%3 == 1:
+			assert.NoError(t, mt.Insert(&Field{
+				name:  []byte("duration"),
+				value: convert.Uint16ToBytes(200),
+			}, common.ChunkID(i)))
+		case i%3 == 2:
+			assert.NoError(t, mt.Insert(&Field{
+				name:  []byte("duration"),
+				value: convert.Uint16ToBytes(1000),
+			}, common.ChunkID(i)))
+		}
+	}
+}
diff --git a/banyand/index/tsdb/term_map.go b/banyand/index/tsdb/term_map.go
new file mode 100644
index 0000000..77d84ef
--- /dev/null
+++ b/banyand/index/tsdb/term_map.go
@@ -0,0 +1,141 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+	"bytes"
+	"sort"
+	"sync"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
+	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+type termHashID uint64
+
+type postingMap struct {
+	repo  map[termHashID]*postingValue
+	mutex sync.RWMutex
+}
+
+func newPostingMap() *postingMap {
+	return &postingMap{
+		repo: make(map[termHashID]*postingValue),
+	}
+}
+
+func (p *postingMap) put(key []byte, id common.ChunkID) error {
+	list := p.getOrCreate(key)
+	list.Insert(id)
+	return nil
+}
+
+func (p *postingMap) getOrCreate(key []byte) posting.List {
+	list := p.get(key)
+	if list != emptyPostingList {
+		return list
+	}
+	p.mutex.Lock()
+	defer p.mutex.Unlock()
+	hashedKey := termHashID(convert.Hash(key))
+	v := &postingValue{
+		key:   key,
+		value: roaring.NewPostingList(),
+	}
+	p.repo[hashedKey] = v
+	return v.value
+}
+
+func (p *postingMap) get(key []byte) posting.List {
+	hashedKey := termHashID(convert.Hash(key))
+	v, ok := p.repo[hashedKey]
+	if !ok {
+		return emptyPostingList
+	}
+	return v.value
+}
+
+func (p *postingMap) allValues() posting.List {
+	result := roaring.NewPostingList()
+	for _, value := range p.repo {
+		_ = result.Union(value.value)
+	}
+	return result
+}
+
+func (p *postingMap) getRange(opts *RangeOpts) posting.List {
+	switch bytes.Compare(opts.Upper, opts.Lower) {
+	case -1:
+		return emptyPostingList
+	case 0:
+		if opts.IncludesUpper && opts.IncludesLower {
+			return p.get(opts.Upper)
+		} else {
+			return emptyPostingList
+		}
+	}
+	keys := make(Asc, 0, len(p.repo))
+	for _, v := range p.repo {
+		keys = append(keys, v.key)
+	}
+	sort.Sort(keys)
+	index := sort.Search(len(keys), func(i int) bool {
+		return bytes.Compare(keys[i], opts.Lower) >= 0
+	})
+	result := roaring.NewPostingList()
+	for i := index; i < len(keys); i++ {
+		k := keys[i]
+		switch {
+		case bytes.Equal(k, opts.Lower):
+			if opts.IncludesLower {
+				_ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
+			}
+		case bytes.Compare(k, opts.Upper) > 0:
+			break
+		case bytes.Equal(k, opts.Upper):
+			if opts.IncludesUpper {
+				_ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
+			}
+			break
+		default:
+			_ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
+		}
+	}
+	return result
+}
+
+type Asc [][]byte
+
+func (a Asc) Len() int {
+	return len(a)
+}
+
+func (a Asc) Less(i, j int) bool {
+	return bytes.Compare(a[i], a[j]) < 0
+}
+
+func (a Asc) Swap(i, j int) {
+	a[i], a[j] = a[j], a[i]
+}
+
+type postingValue struct {
+	key   []byte
+	value posting.List
+}
diff --git a/banyand/index/tsdb/tsdb.go b/banyand/index/tsdb/tsdb.go
new file mode 100644
index 0000000..06c96af
--- /dev/null
+++ b/banyand/index/tsdb/tsdb.go
@@ -0,0 +1,37 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
+)
+
+type RangeOpts struct {
+	Upper         []byte
+	Lower         []byte
+	IncludesUpper bool
+	IncludesLower bool
+}
+
+type Reader interface {
+	Insert(field *Field, chunkID common.ChunkID) error
+	MatchField(fieldNames []byte) (list posting.List)
+	MatchTerms(field *Field) (list posting.List)
+	Range(fieldName []byte, opts *RangeOpts) (list posting.List)
+}