You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/11/20 02:30:07 UTC
[skywalking-banyandb] branch encoding created (now a9e768b)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a change to branch encoding
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.
at a9e768b Merge remote-tracking branch 'origin/main' into encoding
This branch includes the following new commits:
new 08a9b4e Introduce gorilla encoding for integer values
new a9e768b Merge remote-tracking branch 'origin/main' into encoding
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[skywalking-banyandb] 02/02: Merge remote-tracking branch 'origin/main' into encoding
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch encoding
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a9e768bb2bdeab423f1df140922e965ae3505b79
Merge: 08a9b4e aa9fb38
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Nov 20 08:37:56 2021 +0800
Merge remote-tracking branch 'origin/main' into encoding
[skywalking-banyandb] 01/02: Introduce gorilla encoding for integer values
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch encoding
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 08a9b4e815e655de3cef9172bc29f75d2b5ea6a0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Nov 20 08:31:18 2021 +0800
Introduce gorilla encoding for integer values
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/kv/badger.go | 31 +++++
banyand/kv/kv.go | 16 ++-
banyand/stream/stream.go | 8 +-
banyand/tsdb/block.go | 2 +-
banyand/tsdb/tsdb.go | 6 +-
banyand/tsdb/tsdb_test.go | 8 +-
go.mod | 2 +-
go.sum | 4 +-
pkg/bit/reader.go | 104 +++++++++++++++
pkg/{bytes/bytes.go => bit/reader_test.go} | 41 ++++--
pkg/bit/writer.go | 92 ++++++++++++++
pkg/{bytes/bytes.go => bit/writer_test.go} | 50 ++++++--
pkg/buffer/writer.go | 72 +++++++++++
pkg/encoding/encoding.go | 14 ++-
pkg/encoding/int.go | 195 +++++++++++++++++++++++++++++
pkg/encoding/{stream_chunk.go => plain.go} | 175 +++++++++++++++-----------
pkg/encoding/xor.go | 177 ++++++++++++++++++++++++++
pkg/encoding/xor_test.go | 60 +++++++++
18 files changed, 932 insertions(+), 125 deletions(-)
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 24820f0..728a298 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -259,6 +259,37 @@ func (l *badgerLog) Debugf(f string, v ...interface{}) {
l.delegated.Debug().Msgf(f, v...)
}
+var _ bydb.TSetEncoderPool = (*encoderPoolDelegate)(nil)
+
+type encoderPoolDelegate struct {
+ encoding.SeriesEncoderPool
+}
+
+func (e *encoderPoolDelegate) Get(metadata []byte) bydb.TSetEncoder {
+ return e.SeriesEncoderPool.Get(metadata)
+}
+
+func (e *encoderPoolDelegate) Put(encoder bydb.TSetEncoder) {
+ e.SeriesEncoderPool.Put(encoder)
+}
+
+var _ bydb.TSetDecoderPool = (*decoderPoolDelegate)(nil)
+
+type decoderPoolDelegate struct {
+ encoding.SeriesDecoderPool
+}
+
+func (e *decoderPoolDelegate) Get(metadata []byte) bydb.TSetDecoder {
+ return &decoderDelegate{
+ e.SeriesDecoderPool.Get(metadata),
+ }
+}
+
+func (e *decoderPoolDelegate) Put(decoder bydb.TSetDecoder) {
+ dd := decoder.(*decoderDelegate)
+ e.SeriesDecoderPool.Put(dd.SeriesDecoder)
+}
+
var _ bydb.TSetDecoder = (*decoderDelegate)(nil)
type decoderDelegate struct {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index ea92d1c..2def4dc 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -23,7 +23,6 @@ import (
"math"
"github.com/dgraph-io/badger/v3"
- "github.com/dgraph-io/badger/v3/bydb"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -103,16 +102,15 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
}
}
-func TSSWithEncoding(encoderFactory encoding.SeriesEncoderFactory, decoderFactory encoding.SeriesDecoderFactory) TimeSeriesOptions {
+func TSSWithEncoding(encoderPool encoding.SeriesEncoderPool, decoderPool encoding.SeriesDecoderPool) TimeSeriesOptions {
return func(store TimeSeriesStore) {
if btss, ok := store.(*badgerTSS); ok {
- btss.dbOpts = btss.dbOpts.WithExternalCompactor(func() bydb.TSetEncoder {
- return encoderFactory()
- }, func() bydb.TSetDecoder {
- return &decoderDelegate{
- SeriesDecoder: decoderFactory(),
- }
- })
+ btss.dbOpts = btss.dbOpts.WithExternalCompactor(
+ &encoderPoolDelegate{
+ encoderPool,
+ }, &decoderPoolDelegate{
+ decoderPool,
+ })
}
}
}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 760687d..444a483 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -107,12 +107,8 @@ func openStream(root string, spec streamSpec, l *logger.Logger) (*stream, error)
ShardNum: sm.schema.GetOpts().GetShardNum(),
IndexRules: spec.indexRules,
EncodingMethod: tsdb.EncodingMethod{
- EncoderFactory: func() encoding.SeriesEncoder {
- return encoding.NewStreamChunkEncoder(chunkSize)
- },
- DecoderFactory: func() encoding.SeriesDecoder {
- return encoding.NewStreamChunkDecoder(chunkSize)
- },
+ EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
+ DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
},
})
if err != nil {
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 2f38308..956bf13 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -78,7 +78,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
if b.store, err = kv.OpenTimeSeriesStore(
0,
b.path+"/store",
- kv.TSSWithEncoding(encodingMethod.EncoderFactory, encodingMethod.DecoderFactory),
+ kv.TSSWithEncoding(encodingMethod.EncoderPool, encodingMethod.DecoderPool),
kv.TSSWithLogger(b.l),
); err != nil {
return nil, err
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 8df233b..e9ed19a 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -82,8 +82,8 @@ type DatabaseOpts struct {
}
type EncodingMethod struct {
- EncoderFactory encoding.SeriesEncoderFactory
- DecoderFactory encoding.SeriesDecoderFactory
+ EncoderPool encoding.SeriesEncoderPool
+ DecoderPool encoding.SeriesDecoderPool
}
type database struct {
@@ -124,7 +124,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
db.logger = pl.Named("tsdb")
}
}
- if opts.EncodingMethod.EncoderFactory == nil || opts.EncodingMethod.DecoderFactory == nil {
+ if opts.EncodingMethod.EncoderPool == nil || opts.EncodingMethod.DecoderPool == nil {
return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to open database")
}
if _, err := mkdir(opts.Location); err != nil {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 89c56d6..c570fdf 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -58,12 +58,8 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database
Location: tempDir,
ShardNum: 1,
EncodingMethod: EncodingMethod{
- EncoderFactory: func() encoding.SeriesEncoder {
- return nil
- },
- DecoderFactory: func() encoding.SeriesDecoder {
- return nil
- },
+ EncoderPool: encoding.NewPlainEncoderPool(0),
+ DecoderPool: encoding.NewPlainDecoderPool(0),
},
})
t.NoError(err)
diff --git a/go.mod b/go.mod
index 24e4ce0..b2815cb 100644
--- a/go.mod
+++ b/go.mod
@@ -101,4 +101,4 @@ require (
sigs.k8s.io/yaml v1.2.0 // indirect
)
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a
diff --git a/go.sum b/go.sum
index 957109b..afb3bff 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 h1:MH/Jy2x3WF3RdD+WD25XepG4fzIz3qMOoIUM4Enn+GA=
-github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
+github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a h1:kcUQmdVI0E0J8bfwJpbQhWOOxijKNeoEfLsiIkayf1E=
+github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
diff --git a/pkg/bit/reader.go b/pkg/bit/reader.go
new file mode 100644
index 0000000..22cb929
--- /dev/null
+++ b/pkg/bit/reader.go
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bit
+
+import (
+ "io"
+)
+
+// Reader reads bits from buffer
+type Reader struct {
+ in io.ByteReader
+ cache byte
+ len byte
+}
+
+// NewReader crate bit reader
+func NewReader(in io.ByteReader) *Reader {
+ return &Reader{
+ in: in,
+ }
+}
+
+// ReadBool reads a bit, 1 returns true, 0 returns false
+func (r *Reader) ReadBool() (bool, error) {
+ if r.len == 0 {
+ b, err := r.in.ReadByte()
+ if err != nil {
+ return false, err
+ }
+ r.cache = b
+ r.len = 8
+ }
+ r.len--
+ b := r.cache & 0x80
+ r.cache <<= 1
+ return b != 0, nil
+}
+
+// ReadBits read number of bits
+func (r *Reader) ReadBits(numBits int) (uint64, error) {
+ var result uint64
+
+ for ; numBits >= 8; numBits -= 8 {
+ b, err := r.ReadByte()
+ if err != nil {
+ return 0, err
+ }
+
+ result = (result << 8) | uint64(b)
+ }
+
+ for ; numBits > 0; numBits-- {
+ byt, err := r.ReadBool()
+ if err != nil {
+ return 0, err
+ }
+ result <<= 1
+ if byt {
+ result |= 1
+ }
+ }
+
+ return result, nil
+}
+
+// ReadByte reads a byte
+func (r *Reader) ReadByte() (byte, error) {
+ if r.len == 0 {
+ b, err := r.in.ReadByte()
+ if err != nil {
+ return b, err
+ }
+ r.cache = b
+ return b, err
+ }
+ b, err := r.in.ReadByte()
+ if err != nil {
+ return b, err
+ }
+ result := r.cache | b>>r.len
+ r.cache = b << (8 - r.len)
+ return result, nil
+}
+
+// Reset resets the reader to read from a new slice
+func (r *Reader) Reset() {
+ r.len = 0
+ r.cache = 0
+}
diff --git a/pkg/bytes/bytes.go b/pkg/bit/reader_test.go
similarity index 56%
copy from pkg/bytes/bytes.go
copy to pkg/bit/reader_test.go
index b4d051c..6f30bbe 100644
--- a/pkg/bytes/bytes.go
+++ b/pkg/bit/reader_test.go
@@ -15,17 +15,38 @@
// specific language governing permissions and limitations
// under the License.
-package bytes
+package bit
-func Join(s ...[]byte) []byte {
- n := 0
- for _, v := range s {
- n += len(v)
- }
+import (
+ "bytes"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestReader(t *testing.T) {
+ data := []byte{3, 255, 0xcc, 0x1a, 0xbc, 0xde, 0x80}
+
+ r := NewReader(bytes.NewBuffer(data))
+ a := assert.New(t)
+
+ eq(a, byte(3))(r.ReadByte())
+ eq(a, uint64(255))(r.ReadBits(8))
+
+ eq(a, uint64(0xc))(r.ReadBits(4))
+
+ eq(a, uint64(0xc1))(r.ReadBits(8))
+
+ eq(a, uint64(0xabcde))(r.ReadBits(20))
+
+ eq(a, true)(r.ReadBool())
+ eq(a, false)(r.ReadBool())
+
+}
- b, i := make([]byte, n), 0
- for _, v := range s {
- i += copy(b[i:], v)
+func eq(a *assert.Assertions, expected interface{}) func(interface{}, error) {
+ return func(actual interface{}, err error) {
+ a.NoError(err)
+ a.Equal(expected, actual)
}
- return b
}
diff --git a/pkg/bit/writer.go b/pkg/bit/writer.go
new file mode 100644
index 0000000..6302f9e
--- /dev/null
+++ b/pkg/bit/writer.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bit
+
+import (
+ "bytes"
+)
+
+// Writer writes bits to an io.BufferWriter
+type Writer struct {
+ out *bytes.Buffer
+ cache byte
+ available byte
+}
+
+// NewWriter create bit writer
+func NewWriter(buffer *bytes.Buffer) *Writer {
+ var bw Writer
+ bw.Reset(buffer)
+ return &bw
+}
+
+// Reset writes to a new writer
+func (w *Writer) Reset(buffer *bytes.Buffer) {
+ w.out = buffer
+ w.cache = 0
+ w.available = 8
+}
+
+// WriteBool writes a boolean value
+// true: 1
+// false: 0
+func (w *Writer) WriteBool(b bool) {
+ if b {
+ w.cache |= 1 << (w.available - 1)
+ }
+
+ w.available--
+
+ if w.available == 0 {
+ // WriteByte never returns error
+ _ = w.out.WriteByte(w.cache)
+ w.cache = 0
+ w.available = 8
+ }
+}
+
+// WriteBits writes number of bits
+func (w *Writer) WriteBits(u uint64, numBits int) {
+ u <<= 64 - uint(numBits)
+
+ for ; numBits >= 8; numBits -= 8 {
+ byt := byte(u >> 56)
+ w.WriteByte(byt)
+ u <<= 8
+ }
+
+ remainder := byte(u >> 56)
+ for ; numBits > 0; numBits-- {
+ w.WriteBool((remainder & 0x80) != 0)
+ remainder <<= 1
+ }
+}
+
+// WriteByte write a byte
+func (w *Writer) WriteByte(b byte) {
+ _ = w.out.WriteByte(w.cache | (b >> (8 - w.available)))
+ w.cache = b << w.available
+}
+
+// Flush flushes the currently in-process byte
+func (w *Writer) Flush() {
+ if w.available != 8 {
+ _ = w.out.WriteByte(w.cache)
+ }
+ w.Reset(w.out)
+}
diff --git a/pkg/bytes/bytes.go b/pkg/bit/writer_test.go
similarity index 57%
rename from pkg/bytes/bytes.go
rename to pkg/bit/writer_test.go
index b4d051c..7624ff8 100644
--- a/pkg/bytes/bytes.go
+++ b/pkg/bit/writer_test.go
@@ -15,17 +15,41 @@
// specific language governing permissions and limitations
// under the License.
-package bytes
-
-func Join(s ...[]byte) []byte {
- n := 0
- for _, v := range s {
- n += len(v)
- }
-
- b, i := make([]byte, n), 0
- for _, v := range s {
- i += copy(b[i:], v)
- }
- return b
+package bit
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestWriter(t *testing.T) {
+ out := &bytes.Buffer{}
+ w := NewWriter(out)
+
+ a := assert.New(t)
+
+ w.WriteByte(0xc1)
+ w.WriteBool(false)
+ w.WriteBits(0x3f, 6)
+ w.WriteBool(true)
+ w.WriteByte(0xac)
+ w.WriteBits(0x01, 1)
+ w.WriteBits(0x1248f, 20)
+ w.Flush()
+
+ w.WriteByte(0x01)
+ w.WriteByte(0x02)
+
+ w.WriteBits(0x0f, 4)
+
+ w.WriteByte(0x80)
+ w.WriteByte(0x8f)
+ w.Flush()
+
+ w.WriteBits(0x01, 1)
+ w.WriteByte(0xff)
+ w.Flush()
+ a.Equal([]byte{0xc1, 0x7f, 0xac, 0x89, 0x24, 0x78, 0x01, 0x02, 0xf8, 0x08, 0xf0, 0xff, 0x80}, out.Bytes())
}
diff --git a/pkg/buffer/writer.go b/pkg/buffer/writer.go
new file mode 100644
index 0000000..e109d25
--- /dev/null
+++ b/pkg/buffer/writer.go
@@ -0,0 +1,72 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package buffer
+
+import (
+ "bytes"
+ "encoding/binary"
+)
+
+// Writer writes data into a buffer
+type Writer struct {
+ buf *bytes.Buffer
+
+ scratch [binary.MaxVarintLen64]byte
+}
+
+func NewBufferWriter(buf *bytes.Buffer) *Writer {
+ return &Writer{
+ buf: buf,
+ }
+}
+
+func (w *Writer) Write(p []byte) {
+ _, _ = w.buf.Write(p)
+}
+
+func (w *Writer) WriteTo(other *Writer) (n int64) {
+ n, _ = w.buf.WriteTo(other.buf)
+ return n
+}
+
+func (w *Writer) PutUint16(v uint16) {
+ binary.LittleEndian.PutUint16(w.scratch[:], v)
+ _, _ = w.buf.Write(w.scratch[:2])
+}
+
+func (w *Writer) PutUint32(v uint32) {
+ binary.LittleEndian.PutUint32(w.scratch[:], v)
+ _, _ = w.buf.Write(w.scratch[:4])
+}
+
+func (w *Writer) PutUint64(v uint64) {
+ binary.LittleEndian.PutUint64(w.scratch[:], v)
+ _, _ = w.buf.Write(w.scratch[:8])
+}
+
+func (w *Writer) Reset() {
+ w.buf.Reset()
+}
+
+func (w *Writer) Len() int {
+ return w.buf.Len()
+}
+
+func (w *Writer) Bytes() []byte {
+ return w.buf.Bytes()
+}
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
index 5e04e72..05a1f14 100644
--- a/pkg/encoding/encoding.go
+++ b/pkg/encoding/encoding.go
@@ -21,7 +21,10 @@ import "github.com/pkg/errors"
var ErrEncodeEmpty = errors.New("encode an empty value")
-type SeriesEncoderFactory func() SeriesEncoder
+type SeriesEncoderPool interface {
+ Get(metadata []byte) SeriesEncoder
+ Put(encoder SeriesEncoder)
+}
// SeriesEncoder encodes time series data point
type SeriesEncoder interface {
@@ -30,19 +33,22 @@ type SeriesEncoder interface {
// IsFull returns whether the encoded data reached its capacity
IsFull() bool
// Reset the underlying buffer
- Reset()
+ Reset(key []byte)
// Encode the time series data point to a binary
Encode() ([]byte, error)
// StartTime indicates the first entry's time
StartTime() uint64
}
-type SeriesDecoderFactory func() SeriesDecoder
+type SeriesDecoderPool interface {
+ Get(metadata []byte) SeriesDecoder
+ Put(encoder SeriesDecoder)
+}
// SeriesDecoder decodes encoded time series data
type SeriesDecoder interface {
// Decode the time series data
- Decode(data []byte) error
+ Decode(key, data []byte) error
// Len denotes the size of iterator
Len() int
// IsFull returns whether the encoded data reached its capacity
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
new file mode 100644
index 0000000..9571ec4
--- /dev/null
+++ b/pkg/encoding/int.go
@@ -0,0 +1,195 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+ "bytes"
+ "encoding/binary"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/pkg/bit"
+ "github.com/apache/skywalking-banyandb/pkg/buffer"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var (
+ _ SeriesEncoder = (*intEncoder)(nil)
+)
+
+type ParseInterval = func(key []byte) time.Duration
+
+type intEncoder struct {
+ buff *bytes.Buffer
+ bw *bit.Writer
+ values *XOREncoder
+ fn ParseInterval
+ interval time.Duration
+ startTime uint64
+ num int
+ size int
+}
+
+func NewIntEncoder(size int, fn ParseInterval) SeriesEncoder {
+ buff := &bytes.Buffer{}
+ bw := bit.NewWriter(buff)
+ return &intEncoder{
+ buff: buff,
+ bw: bw,
+ values: NewXOREncoder(bw),
+ fn: fn,
+ size: size,
+ }
+}
+
+func (ie *intEncoder) Append(ts uint64, value []byte) {
+ if len(value) > 8 {
+ return
+ }
+ if ie.startTime == 0 {
+ ie.startTime = ts
+ }
+ gap := int(ts) - int(ie.startTime)
+ if gap < 0 {
+ return
+ }
+ zeroNum := gap/int(ie.interval) - 1
+ for i := 0; i < zeroNum; i++ {
+ ie.bw.WriteBool(false)
+ ie.num++
+ }
+ ie.bw.WriteBool(true)
+ ie.values.Write(binary.LittleEndian.Uint64(value))
+ ie.num++
+}
+
+func (ie *intEncoder) IsFull() bool {
+ return ie.num >= ie.size
+}
+
+func (ie *intEncoder) Reset(key []byte) {
+ ie.bw.Reset(nil)
+ ie.interval = ie.fn(key)
+}
+
+func (ie *intEncoder) Encode() ([]byte, error) {
+ ie.bw.Flush()
+ buffWriter := buffer.NewBufferWriter(ie.buff)
+ buffWriter.PutUint64(ie.startTime)
+ buffWriter.PutUint16(uint16(ie.size))
+ return ie.buff.Bytes(), nil
+}
+
+func (ie *intEncoder) StartTime() uint64 {
+ return ie.startTime
+}
+
+var _ SeriesDecoder = (*intDecoder)(nil)
+
+type intDecoder struct {
+ fn ParseInterval
+ size int
+ interval time.Duration
+ startTime uint64
+ num int
+ area []byte
+}
+
+func NewIntDecoder(size int, fn ParseInterval) SeriesDecoder {
+ return &intDecoder{
+ fn: fn,
+ size: size,
+ }
+}
+
+func (i intDecoder) Decode(key, data []byte) error {
+ i.interval = i.fn(key)
+ i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
+ i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
+ i.area = data[:len(data)-10]
+ return nil
+}
+
+func (i intDecoder) Len() int {
+ return i.num
+}
+
+func (i intDecoder) IsFull() bool {
+ return i.num >= i.size
+}
+
+func (i intDecoder) Get(ts uint64) ([]byte, error) {
+ for iter := i.Iterator(); iter.Next(); {
+ if iter.Time() == ts {
+ return iter.Val(), nil
+ }
+ }
+ return nil, nil
+}
+
+func (i intDecoder) Iterator() SeriesIterator {
+ br := bit.NewReader(bytes.NewReader(i.area))
+ return &intIterator{
+ startTime: i.startTime,
+ interval: int(i.interval),
+ br: br,
+ values: NewXORDecoder(br),
+ }
+}
+
+var _ SeriesIterator = (*intIterator)(nil)
+
+type intIterator struct {
+ startTime uint64
+ interval int
+ br *bit.Reader
+ values *XORDecoder
+
+ currVal uint64
+ currTime uint64
+ index int
+ err error
+}
+
+func (i *intIterator) Next() bool {
+ var b bool
+ b, i.err = i.br.ReadBool()
+ if i.err != nil {
+ return false
+ }
+ if b {
+ if i.values.Next() {
+ i.currVal = i.values.Value()
+ }
+ }
+ i.currVal = 0
+ i.currTime = i.startTime + uint64(i.interval*i.index)
+ i.index++
+ return true
+}
+
+func (i *intIterator) Val() []byte {
+ return convert.Uint64ToBytes(i.currVal)
+}
+
+func (i *intIterator) Time() uint64 {
+ return i.currTime
+}
+
+func (i *intIterator) Error() error {
+ return i.err
+}
diff --git a/pkg/encoding/stream_chunk.go b/pkg/encoding/plain.go
similarity index 54%
rename from pkg/encoding/stream_chunk.go
rename to pkg/encoding/plain.go
index 7ff5094..0b3fb8c 100644
--- a/pkg/encoding/stream_chunk.go
+++ b/pkg/encoding/plain.go
@@ -22,106 +22,147 @@ import (
"encoding/binary"
"fmt"
"sort"
+ "sync"
"github.com/klauspost/compress/zstd"
"github.com/pkg/errors"
+
+ "github.com/apache/skywalking-banyandb/pkg/buffer"
)
var (
- decoder, _ = zstd.NewReader(nil)
- encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
- _ SeriesEncoder = (*streamChunkEncoder)(nil)
- _ SeriesDecoder = (*StreamChunkDecoder)(nil)
+ encoderPool = sync.Pool{
+ New: newPlainEncoder,
+ }
+ decoderPool = sync.Pool{
+ New: func() interface{} {
+ return &plainDecoder{}
+ },
+ }
+)
+
+type plainEncoderPool struct {
+ pool *sync.Pool
+ size int
+}
+
+func NewPlainEncoderPool(size int) SeriesEncoderPool {
+ return &plainEncoderPool{
+ pool: &encoderPool,
+ size: size,
+ }
+}
+
+func (b *plainEncoderPool) Get(metadata []byte) SeriesEncoder {
+ encoder := b.pool.Get().(*plainEncoder)
+ encoder.Reset(metadata)
+ encoder.valueSize = b.size
+ return encoder
+}
+
+func (b *plainEncoderPool) Put(encoder SeriesEncoder) {
+ b.pool.Put(encoder)
+}
+
+type plainDecoderPool struct {
+ pool *sync.Pool
+ size int
+}
+
+func NewPlainDecoderPool(size int) SeriesDecoderPool {
+ return &plainDecoderPool{
+ pool: &decoderPool,
+ size: size,
+ }
+}
+
+func (b *plainDecoderPool) Get(_ []byte) SeriesDecoder {
+ decoder := b.pool.Get().(*plainDecoder)
+ decoder.valueSize = b.size
+ return decoder
+}
+
+func (b *plainDecoderPool) Put(decoder SeriesDecoder) {
+ b.pool.Put(decoder)
+}
+
+var (
+ zstdDecoder, _ = zstd.NewReader(nil)
+ zstdEncoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
+ _ SeriesEncoder = (*plainEncoder)(nil)
+ _ SeriesDecoder = (*plainDecoder)(nil)
)
-//streamChunkEncoder backport to reduced value
-type streamChunkEncoder struct {
- tsBuff bytes.Buffer
- valBuff bytes.Buffer
- scratch [binary.MaxVarintLen64]byte
+//plainEncoder backport to reduced value
+type plainEncoder struct {
+ tsBuff *buffer.Writer
+ valBuff *buffer.Writer
len uint32
num uint32
startTime uint64
valueSize int
}
-func NewStreamChunkEncoder(size int) SeriesEncoder {
- return &streamChunkEncoder{
- valueSize: size,
+func newPlainEncoder() interface{} {
+ return &plainEncoder{
+ tsBuff: buffer.NewBufferWriter(&bytes.Buffer{}),
+ valBuff: buffer.NewBufferWriter(&bytes.Buffer{}),
}
}
-func (t *streamChunkEncoder) Append(ts uint64, value []byte) {
+func (t *plainEncoder) Append(ts uint64, value []byte) {
if t.startTime == 0 {
t.startTime = ts
} else if t.startTime > ts {
t.startTime = ts
}
vLen := len(value)
- offset := uint32(len(t.valBuff.Bytes()))
- t.valBuff.Write(t.putUint32(uint32(vLen)))
+ offset := uint32(t.valBuff.Len())
+ t.valBuff.PutUint32(uint32(vLen))
t.valBuff.Write(value)
- t.tsBuff.Write(t.putUint64(ts))
- t.tsBuff.Write(t.putUint32(offset))
- t.num = t.num + 1
+ t.tsBuff.PutUint64(ts)
+ t.tsBuff.PutUint32(offset)
+ t.num++
}
-func (t *streamChunkEncoder) IsFull() bool {
+func (t *plainEncoder) IsFull() bool {
return t.valBuff.Len() >= t.valueSize
}
-func (t *streamChunkEncoder) Reset() {
+func (t *plainEncoder) Reset(_ []byte) {
t.tsBuff.Reset()
t.valBuff.Reset()
t.num = 0
t.startTime = 0
}
-func (t *streamChunkEncoder) Encode() ([]byte, error) {
+func (t *plainEncoder) Encode() ([]byte, error) {
if t.tsBuff.Len() < 1 {
return nil, ErrEncodeEmpty
}
val := t.valBuff.Bytes()
t.len = uint32(len(val))
- _, err := t.tsBuff.WriteTo(&t.valBuff)
- if err != nil {
- return nil, err
- }
- t.valBuff.Write(t.putUint32(t.num))
- t.valBuff.Write(t.putUint32(t.len))
+ t.tsBuff.WriteTo(t.valBuff)
+ t.valBuff.PutUint32(t.num)
+ t.valBuff.PutUint32(t.len)
data := t.valBuff.Bytes()
l := len(data)
dst := make([]byte, 0, compressBound(l))
- dst = encoder.EncodeAll(data, dst)
- result := make([]byte, len(dst)+2)
- copy(result, dst)
- copy(result[len(dst):], t.putUint16(uint16(l)))
- return result, nil
+ dst = zstdEncoder.EncodeAll(data, dst)
+ result := buffer.NewBufferWriter(bytes.NewBuffer(make([]byte, len(dst)+2)))
+ result.Write(dst)
+ result.PutUint16(uint16(l))
+ return result.Bytes(), nil
}
func compressBound(srcSize int) int {
return srcSize + (srcSize >> 8)
}
-func (t *streamChunkEncoder) StartTime() uint64 {
+func (t *plainEncoder) StartTime() uint64 {
return t.startTime
}
-func (t *streamChunkEncoder) putUint16(v uint16) []byte {
- binary.LittleEndian.PutUint16(t.scratch[:], v)
- return t.scratch[:2]
-}
-
-func (t *streamChunkEncoder) putUint32(v uint32) []byte {
- binary.LittleEndian.PutUint32(t.scratch[:], v)
- return t.scratch[:4]
-}
-
-func (t *streamChunkEncoder) putUint64(v uint64) []byte {
- binary.LittleEndian.PutUint64(t.scratch[:], v)
- return t.scratch[:8]
-}
-
const (
// TsLen equals ts(uint64) + data_offset(uint32)
TsLen = 8 + 4
@@ -129,8 +170,8 @@ const (
var ErrInvalidValue = errors.New("invalid encoded value")
-//StreamChunkDecoder decodes encoded time index
-type StreamChunkDecoder struct {
+//plainDecoder decodes encoded time index
+type plainDecoder struct {
ts []byte
val []byte
len uint32
@@ -138,20 +179,14 @@ type StreamChunkDecoder struct {
valueSize int
}
-func NewStreamChunkDecoder(size int) SeriesDecoder {
- return &StreamChunkDecoder{
- valueSize: size,
- }
-}
-
-func (t *StreamChunkDecoder) Len() int {
+func (t *plainDecoder) Len() int {
return int(t.num)
}
-func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
+func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
var data []byte
size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
- if data, err = decoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
+ if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
return err
}
l := uint32(len(data))
@@ -170,11 +205,11 @@ func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
return nil
}
-func (t *StreamChunkDecoder) IsFull() bool {
+func (t *plainDecoder) IsFull() bool {
return int(t.len) >= t.valueSize
}
-func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
+func (t *plainDecoder) Get(ts uint64) ([]byte, error) {
i := sort.Search(int(t.num), func(i int) bool {
slot := getTSSlot(t.ts, i)
return parseTS(slot) <= ts
@@ -189,7 +224,7 @@ func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
return getVal(t.val, parseOffset(slot))
}
-func (t *StreamChunkDecoder) Iterator() SeriesIterator {
+func (t *plainDecoder) Iterator() SeriesIterator {
return newBlockItemIterator(t)
}
@@ -213,17 +248,17 @@ func parseOffset(tsSlot []byte) uint32 {
return binary.LittleEndian.Uint32(tsSlot[8:])
}
-var _ SeriesIterator = (*chunkIterator)(nil)
+var _ SeriesIterator = (*plainIterator)(nil)
-type chunkIterator struct {
+type plainIterator struct {
index []byte
data []byte
idx int
num int
}
-func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
- return &chunkIterator{
+func newBlockItemIterator(decoder *plainDecoder) SeriesIterator {
+ return &plainIterator{
idx: -1,
index: decoder.ts,
data: decoder.val,
@@ -231,20 +266,20 @@ func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
}
}
-func (b *chunkIterator) Next() bool {
+func (b *plainIterator) Next() bool {
b.idx++
return b.idx >= 0 && b.idx < b.num
}
-func (b *chunkIterator) Val() []byte {
+func (b *plainIterator) Val() []byte {
v, _ := getVal(b.data, parseOffset(getTSSlot(b.index, b.idx)))
return v
}
-func (b *chunkIterator) Time() uint64 {
+func (b *plainIterator) Time() uint64 {
return parseTS(getTSSlot(b.index, b.idx))
}
-func (b *chunkIterator) Error() error {
+func (b *plainIterator) Error() error {
return nil
}
diff --git a/pkg/encoding/xor.go b/pkg/encoding/xor.go
new file mode 100644
index 0000000..43203ca
--- /dev/null
+++ b/pkg/encoding/xor.go
@@ -0,0 +1,177 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+ "math/bits"
+
+ "github.com/apache/skywalking-banyandb/pkg/bit"
+)
+
+const (
+ ctrlBitsNoContainMeaningful = 0x2
+ ctrlBitsContainMeaningful = 0x3
+)
+
+// XOREncoder intends to compress uint64 data
+// https://www.vldb.org/pvldb/vol8/p1816-teller.pdf
+type XOREncoder struct {
+ bw *bit.Writer
+ preVal uint64
+ leading int
+ trailing int
+
+ first bool
+}
+
+// NewXOREncoder creates xor zstdEncoder for compressing uint64 data
+func NewXOREncoder(bw *bit.Writer) *XOREncoder {
+ return &XOREncoder{
+ bw: bw,
+ first: true,
+ }
+}
+
+func (e *XOREncoder) Write(val uint64) {
+ if e.first {
+ e.first = false
+ e.preVal = val
+ e.bw.WriteBits(val, 64)
+ return
+ }
+
+ delta := val ^ e.preVal
+ e.preVal = val
+ if delta == 0 {
+ e.bw.WriteBool(false)
+ return
+ }
+
+ leading := bits.LeadingZeros64(delta)
+ trailing := bits.TrailingZeros64(delta)
+ if leading >= e.leading && trailing >= e.trailing {
+ // write control '10' to reuse previous block meaningful bits
+ e.bw.WriteBits(ctrlBitsNoContainMeaningful, 2)
+ e.bw.WriteBits(delta>>uint(e.trailing), 64-e.leading-e.trailing)
+ } else {
+ // write control '11' to create a new block meaningful bits
+ e.bw.WriteBits(ctrlBitsContainMeaningful, 2)
+ meaningfulLen := 64 - leading - trailing
+ e.bw.WriteBits(uint64(leading), 6)
+ // meaningfulLen is at least 1, so we can subtract 1 from it and encode it in 6 bits
+ e.bw.WriteBits(uint64(meaningfulLen-1), 6)
+ e.bw.WriteBits(delta>>uint(trailing), meaningfulLen)
+
+ e.leading = leading
+ e.trailing = trailing
+ }
+}
+
+// XORDecoder decodes buffer to uint64 values using xor compress
+type XORDecoder struct {
+ val uint64
+
+ br *bit.Reader
+
+ leading uint64
+ trailing uint64
+
+ first bool
+ err error
+}
+
+// NewXORDecoder create zstdDecoder decompress buffer using xor
+func NewXORDecoder(br *bit.Reader) *XORDecoder {
+ s := &XORDecoder{
+ br: br,
+ first: true,
+ }
+ return s
+}
+
+// Reset resets the underlying buffer to decode
+func (d *XORDecoder) Reset() {
+ d.first = true
+ d.leading = 0
+ d.trailing = 0
+ d.val = 0
+}
+
+// Next return if zstdDecoder has value in buffer using xor, do uncompress logic in next method,
+// data format reference zstdEncoder format
+func (d *XORDecoder) Next() bool {
+ if d.first {
+ // read first value
+ d.first = false
+ d.val, d.err = d.br.ReadBits(64)
+ return d.err == nil
+ }
+
+ var b bool
+ // read delta control bit
+ b, d.err = d.br.ReadBool()
+ if d.err != nil {
+ return false
+ }
+ if !b {
+ return true
+ }
+ ctrlBits := ctrlBitsNoContainMeaningful
+ // read control bit
+ b, d.err = d.br.ReadBool()
+ if d.err != nil {
+ return false
+ }
+ if b {
+ ctrlBits |= 1
+ }
+ var blockSize uint64
+ if ctrlBits == ctrlBitsNoContainMeaningful {
+ blockSize = 64 - d.leading - d.trailing
+ } else {
+ // read leading and trailing, because block is diff with previous
+ d.leading, d.err = d.br.ReadBits(6)
+ if d.err != nil {
+ return false
+ }
+ blockSize, d.err = d.br.ReadBits(6)
+ if d.err != nil {
+ return false
+ }
+ blockSize++
+ d.trailing = 64 - d.leading - blockSize
+ }
+ delta, err := d.br.ReadBits(int(blockSize))
+ if err != nil {
+ d.err = err
+ return false
+ }
+ val := delta << d.trailing
+ d.val ^= val
+ return true
+}
+
+// Value returns uint64 from buffer
+func (d *XORDecoder) Value() uint64 {
+ return d.val
+}
+
+// Err returns error raised in Next()
+func (d *XORDecoder) Err() error {
+ return d.err
+}
diff --git a/pkg/encoding/xor_test.go b/pkg/encoding/xor_test.go
new file mode 100644
index 0000000..282ab57
--- /dev/null
+++ b/pkg/encoding/xor_test.go
@@ -0,0 +1,60 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+ "bytes"
+ "io"
+ "testing"
+
+ "github.com/pkg/errors"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/pkg/bit"
+)
+
+func TestXOR(t *testing.T) {
+ var buf bytes.Buffer
+ bitWriter := bit.NewWriter(&buf)
+ e := NewXOREncoder(bitWriter)
+ e.Write(uint64(76))
+ e.Write(uint64(50))
+ e.Write(uint64(50))
+ e.Write(uint64(999999999))
+ e.Write(uint64(100))
+
+ bitWriter.Flush()
+ data := buf.Bytes()
+
+ reader := bit.NewReader(bytes.NewReader(data))
+ d := NewXORDecoder(reader)
+ a := assert.New(t)
+ verify(d, a, uint64(76))
+ verify(d, a, uint64(50))
+ verify(d, a, uint64(50))
+ verify(d, a, uint64(999999999))
+ verify(d, a, uint64(100))
+}
+
+func verify(d *XORDecoder, a *assert.Assertions, except uint64) {
+ a.True(d.Next())
+ if d.Err() != nil && !errors.Is(d.Err(), io.EOF) {
+ a.Fail("error: %v", d.Err())
+ }
+ a.Equal(except, d.Value())
+}