You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/11/13 01:07:12 UTC
[skywalking-banyandb] 01/01: Introduce encoding component
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch encoding
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5b6406a1d57111366fe0c06714208cbf8d952986
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Nov 13 09:00:03 2021 +0800
Introduce encoding component
* Encoding component helps tsdb module to customize how to
encode data in a chunk
* StreamChunkEncoder/Decoder handles stream module's encoding
* Update badger to support encoding component
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/kv/badger.go | 20 ++++
banyand/kv/kv.go | 40 ++++---
banyand/stream/stream.go | 12 +++
banyand/tsdb/block.go | 22 ++--
banyand/tsdb/tsdb.go | 27 ++++-
banyand/tsdb/tsdb_test.go | 9 ++
go.mod | 6 +-
go.sum | 8 +-
pkg/encoding/encoding.go | 66 ++++++++++++
pkg/encoding/stream_chunk.go | 250 +++++++++++++++++++++++++++++++++++++++++++
10 files changed, 419 insertions(+), 41 deletions(-)
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index a0b0a1b..24820f0 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -24,8 +24,10 @@ import (
"time"
"github.com/dgraph-io/badger/v3"
+ "github.com/dgraph-io/badger/v3/bydb"
"github.com/dgraph-io/badger/v3/y"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -256,3 +258,21 @@ func (l *badgerLog) Infof(f string, v ...interface{}) {
func (l *badgerLog) Debugf(f string, v ...interface{}) {
l.delegated.Debug().Msgf(f, v...)
}
+
+var _ bydb.TSetDecoder = (*decoderDelegate)(nil)
+
+type decoderDelegate struct {
+ encoding.SeriesDecoder
+}
+
+func (d *decoderDelegate) Iterator() bydb.TSetIterator {
+ return &iterDelegate{
+ SeriesIterator: d.SeriesDecoder.Iterator(),
+ }
+}
+
+var _ bydb.TSetDecoder = (*decoderDelegate)(nil)
+
+type iterDelegate struct {
+ encoding.SeriesIterator
+}
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index fc829d3..ea92d1c 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -23,8 +23,10 @@ import (
"math"
"github.com/dgraph-io/badger/v3"
+ "github.com/dgraph-io/badger/v3/bydb"
"github.com/pkg/errors"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -101,6 +103,20 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
}
}
+func TSSWithEncoding(encoderFactory encoding.SeriesEncoderFactory, decoderFactory encoding.SeriesDecoderFactory) TimeSeriesOptions {
+ return func(store TimeSeriesStore) {
+ if btss, ok := store.(*badgerTSS); ok {
+ btss.dbOpts = btss.dbOpts.WithExternalCompactor(func() bydb.TSetEncoder {
+ return encoderFactory()
+ }, func() bydb.TSetDecoder {
+ return &decoderDelegate{
+ SeriesDecoder: decoderFactory(),
+ }
+ })
+ }
+ }
+}
+
type Iterator interface {
Next()
Rewind()
@@ -125,7 +141,7 @@ type IndexStore interface {
}
// OpenTimeSeriesStore creates a new TimeSeriesStore
-func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize int, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
+func OpenTimeSeriesStore(shardID int, path string, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
btss := new(badgerTSS)
btss.shardID = shardID
btss.dbOpts = badger.DefaultOptions(path)
@@ -139,7 +155,7 @@ func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize
if err != nil {
return nil, fmt.Errorf("failed to open time series store: %v", err)
}
- btss.TSet = *badger.NewTSet(btss.db, compressLevel, valueSize)
+ btss.TSet = *badger.NewTSet(btss.db)
return btss, nil
}
@@ -161,26 +177,6 @@ func StoreWithNamedLogger(name string, l *logger.Logger) StoreOptions {
}
}
-// StoreWithBufferSize sets a external logger into underlying Store
-func StoreWithBufferSize(size int64) StoreOptions {
- return func(store Store) {
- if bdb, ok := store.(*badgerDB); ok {
- bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size)
- }
- }
-}
-
-type FlushCallback func()
-
-// StoreWithFlushCallback sets a callback function
-func StoreWithFlushCallback(callback FlushCallback) StoreOptions {
- return func(store Store) {
- if bdb, ok := store.(*badgerDB); ok {
- bdb.dbOpts.FlushCallBack = callback
- }
- }
-}
-
// OpenStore creates a new Store
func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) {
bdb := new(badgerDB)
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 35ae2a5..760687d 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -23,10 +23,14 @@ import (
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
)
+// a chunk is 1MB
+const chunkSize = 1 << 20
+
type indexRule struct {
rule *databasev1.IndexRule
tagIndices []partition.TagLocator
@@ -102,6 +106,14 @@ func openStream(root string, spec streamSpec, l *logger.Logger) (*stream, error)
Location: root,
ShardNum: sm.schema.GetOpts().GetShardNum(),
IndexRules: spec.indexRules,
+ EncodingMethod: tsdb.EncodingMethod{
+ EncoderFactory: func() encoding.SeriesEncoder {
+ return encoding.NewStreamChunkEncoder(chunkSize)
+ },
+ DecoderFactory: func() encoding.SeriesDecoder {
+ return encoding.NewStreamChunkDecoder(chunkSize)
+ },
+ },
})
if err != nil {
return nil, err
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 231cef9..2f38308 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -23,6 +23,7 @@ import (
"time"
"github.com/dgraph-io/ristretto/z"
+ "github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -50,11 +51,9 @@ type block struct {
}
type blockOpts struct {
- segID uint16
- blockID uint16
- path string
- compressLevel int
- valueSize int
+ segID uint16
+ blockID uint16
+ path string
}
func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
@@ -71,8 +70,17 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
b.l = pl.Named("block")
}
}
- if b.store, err = kv.OpenTimeSeriesStore(0, b.path+"/store", opts.compressLevel, opts.valueSize,
- kv.TSSWithLogger(b.l)); err != nil {
+ encodingMethodObject := ctx.Value(encodingMethodKey)
+ if encodingMethodObject == nil {
+ return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to create a block")
+ }
+ encodingMethod := encodingMethodObject.(EncodingMethod)
+ if b.store, err = kv.OpenTimeSeriesStore(
+ 0,
+ b.path+"/store",
+ kv.TSSWithEncoding(encodingMethod.EncoderFactory, encodingMethod.DecoderFactory),
+ kv.TSSWithLogger(b.l),
+ ); err != nil {
return nil, err
}
if b.primaryIndex, err = lsm.NewStore(lsm.StoreOpts{
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 80a84f5..8df233b 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -47,10 +48,16 @@ const (
dirPerm = 0700
)
-var ErrInvalidShardID = errors.New("invalid shard id")
-var indexRulesKey = contextIndexRulesKey{}
+var (
+ ErrInvalidShardID = errors.New("invalid shard id")
+ ErrEncodingMethodAbsent = errors.New("encoding method is absent")
+
+ indexRulesKey = contextIndexRulesKey{}
+ encodingMethodKey = contextEncodingMethodKey{}
+)
type contextIndexRulesKey struct{}
+type contextEncodingMethodKey struct{}
type Database interface {
io.Closer
@@ -68,9 +75,15 @@ type Shard interface {
var _ Database = (*database)(nil)
type DatabaseOpts struct {
- Location string
- ShardNum uint32
- IndexRules []*databasev1.IndexRule
+ Location string
+ ShardNum uint32
+ IndexRules []*databasev1.IndexRule
+ EncodingMethod EncodingMethod
+}
+
+type EncodingMethod struct {
+ EncoderFactory encoding.SeriesEncoderFactory
+ DecoderFactory encoding.SeriesDecoderFactory
}
type database struct {
@@ -111,6 +124,9 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
db.logger = pl.Named("tsdb")
}
}
+ if opts.EncodingMethod.EncoderFactory == nil || opts.EncodingMethod.DecoderFactory == nil {
+ return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to open database")
+ }
if _, err := mkdir(opts.Location); err != nil {
return nil, err
}
@@ -122,6 +138,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
}
thisContext := context.WithValue(ctx, logger.ContextKey, db.logger)
thisContext = context.WithValue(thisContext, indexRulesKey, opts.IndexRules)
+ thisContext = context.WithValue(thisContext, encodingMethodKey, opts.EncodingMethod)
if len(entries) > 0 {
return loadDatabase(thisContext, db)
}
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 3f91e40..89c56d6 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
)
@@ -56,6 +57,14 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database
DatabaseOpts{
Location: tempDir,
ShardNum: 1,
+ EncodingMethod: EncodingMethod{
+ EncoderFactory: func() encoding.SeriesEncoder {
+ return nil
+ },
+ DecoderFactory: func() encoding.SeriesDecoder {
+ return nil
+ },
+ },
})
t.NoError(err)
t.NotNil(db)
diff --git a/go.mod b/go.mod
index a1ba284..24e4ce0 100644
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,7 @@ require (
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6
github.com/google/uuid v1.3.0
- github.com/klauspost/compress v1.13.1 // indirect
+ github.com/klauspost/compress v1.13.1
github.com/oklog/run v1.1.0
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.23.0
@@ -43,7 +43,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/btree v1.0.1 // indirect
- github.com/google/flatbuffers v1.12.0 // indirect
+ github.com/google/flatbuffers v1.12.1 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
@@ -101,4 +101,4 @@ require (
sigs.k8s.io/yaml v1.2.0 // indirect
)
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476
diff --git a/go.sum b/go.sum
index 4ed137e..957109b 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165 h1:csoTNiGUMtp4H1AchgaZWJ4WY4uJQ6s+pz3sXS93jAA=
-github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
+github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 h1:MH/Jy2x3WF3RdD+WD25XepG4fzIz3qMOoIUM4Enn+GA=
+github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -192,8 +192,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
-github.com/google/flatbuffers v1.12.0 h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w=
-github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
+github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw=
+github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
new file mode 100644
index 0000000..5e04e72
--- /dev/null
+++ b/pkg/encoding/encoding.go
@@ -0,0 +1,66 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import "github.com/pkg/errors"
+
+var ErrEncodeEmpty = errors.New("encode an empty value")
+
+type SeriesEncoderFactory func() SeriesEncoder
+
+// SeriesEncoder encodes time series data point
+type SeriesEncoder interface {
+ // Append a data point
+ Append(ts uint64, value []byte)
+ // IsFull returns whether the encoded data reached its capacity
+ IsFull() bool
+ // Reset the underlying buffer
+ Reset()
+ // Encode the time series data point to a binary
+ Encode() ([]byte, error)
+ // StartTime indicates the first entry's time
+ StartTime() uint64
+}
+
+type SeriesDecoderFactory func() SeriesDecoder
+
+// SeriesDecoder decodes encoded time series data
+type SeriesDecoder interface {
+ // Decode the time series data
+ Decode(data []byte) error
+ // Len denotes the size of iterator
+ Len() int
+ // IsFull returns whether the encoded data reached its capacity
+ IsFull() bool
+ // Get the data point by its time
+ Get(ts uint64) ([]byte, error)
+ // Iterator returns a SeriesIterator
+ Iterator() SeriesIterator
+}
+
+// SeriesIterator iterates time series data
+type SeriesIterator interface {
+ // Next scroll the cursor to the next
+ Next() bool
+ // Val returns the value of the current data point
+ Val() []byte
+ // Time returns the time of the current data point
+ Time() uint64
+ // Error might return an error indicates a decode failure
+ Error() error
+}
diff --git a/pkg/encoding/stream_chunk.go b/pkg/encoding/stream_chunk.go
new file mode 100644
index 0000000..7ff5094
--- /dev/null
+++ b/pkg/encoding/stream_chunk.go
@@ -0,0 +1,250 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "sort"
+
+ "github.com/klauspost/compress/zstd"
+ "github.com/pkg/errors"
+)
+
+var (
+ decoder, _ = zstd.NewReader(nil)
+ encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
+ _ SeriesEncoder = (*streamChunkEncoder)(nil)
+ _ SeriesDecoder = (*StreamChunkDecoder)(nil)
+)
+
+//streamChunkEncoder backport to reduced value
+type streamChunkEncoder struct {
+ tsBuff bytes.Buffer
+ valBuff bytes.Buffer
+ scratch [binary.MaxVarintLen64]byte
+ len uint32
+ num uint32
+ startTime uint64
+ valueSize int
+}
+
+func NewStreamChunkEncoder(size int) SeriesEncoder {
+ return &streamChunkEncoder{
+ valueSize: size,
+ }
+}
+
+func (t *streamChunkEncoder) Append(ts uint64, value []byte) {
+ if t.startTime == 0 {
+ t.startTime = ts
+ } else if t.startTime > ts {
+ t.startTime = ts
+ }
+ vLen := len(value)
+ offset := uint32(len(t.valBuff.Bytes()))
+ t.valBuff.Write(t.putUint32(uint32(vLen)))
+ t.valBuff.Write(value)
+ t.tsBuff.Write(t.putUint64(ts))
+ t.tsBuff.Write(t.putUint32(offset))
+ t.num = t.num + 1
+}
+
+func (t *streamChunkEncoder) IsFull() bool {
+ return t.valBuff.Len() >= t.valueSize
+}
+
+func (t *streamChunkEncoder) Reset() {
+ t.tsBuff.Reset()
+ t.valBuff.Reset()
+ t.num = 0
+ t.startTime = 0
+}
+
+func (t *streamChunkEncoder) Encode() ([]byte, error) {
+ if t.tsBuff.Len() < 1 {
+ return nil, ErrEncodeEmpty
+ }
+ val := t.valBuff.Bytes()
+ t.len = uint32(len(val))
+ _, err := t.tsBuff.WriteTo(&t.valBuff)
+ if err != nil {
+ return nil, err
+ }
+ t.valBuff.Write(t.putUint32(t.num))
+ t.valBuff.Write(t.putUint32(t.len))
+ data := t.valBuff.Bytes()
+ l := len(data)
+ dst := make([]byte, 0, compressBound(l))
+ dst = encoder.EncodeAll(data, dst)
+ result := make([]byte, len(dst)+2)
+ copy(result, dst)
+ copy(result[len(dst):], t.putUint16(uint16(l)))
+ return result, nil
+}
+
+func compressBound(srcSize int) int {
+ return srcSize + (srcSize >> 8)
+}
+
+func (t *streamChunkEncoder) StartTime() uint64 {
+ return t.startTime
+}
+
+func (t *streamChunkEncoder) putUint16(v uint16) []byte {
+ binary.LittleEndian.PutUint16(t.scratch[:], v)
+ return t.scratch[:2]
+}
+
+func (t *streamChunkEncoder) putUint32(v uint32) []byte {
+ binary.LittleEndian.PutUint32(t.scratch[:], v)
+ return t.scratch[:4]
+}
+
+func (t *streamChunkEncoder) putUint64(v uint64) []byte {
+ binary.LittleEndian.PutUint64(t.scratch[:], v)
+ return t.scratch[:8]
+}
+
+const (
+ // TsLen equals ts(uint64) + data_offset(uint32)
+ TsLen = 8 + 4
+)
+
+var ErrInvalidValue = errors.New("invalid encoded value")
+
+//StreamChunkDecoder decodes encoded time index
+type StreamChunkDecoder struct {
+ ts []byte
+ val []byte
+ len uint32
+ num uint32
+ valueSize int
+}
+
+func NewStreamChunkDecoder(size int) SeriesDecoder {
+ return &StreamChunkDecoder{
+ valueSize: size,
+ }
+}
+
+func (t *StreamChunkDecoder) Len() int {
+ return int(t.num)
+}
+
+func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
+ var data []byte
+ size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
+ if data, err = decoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
+ return err
+ }
+ l := uint32(len(data))
+ if l <= 8 {
+ return ErrInvalidValue
+ }
+ lenOffset := len(data) - 4
+ numOffset := lenOffset - 4
+ t.num = binary.LittleEndian.Uint32(data[numOffset:lenOffset])
+ t.len = binary.LittleEndian.Uint32(data[lenOffset:])
+ if l <= t.len+8 {
+ return ErrInvalidValue
+ }
+ t.val = data[:t.len]
+ t.ts = data[t.len:numOffset]
+ return nil
+}
+
+func (t *StreamChunkDecoder) IsFull() bool {
+ return int(t.len) >= t.valueSize
+}
+
+func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
+ i := sort.Search(int(t.num), func(i int) bool {
+ slot := getTSSlot(t.ts, i)
+ return parseTS(slot) <= ts
+ })
+ if i >= int(t.num) {
+ return nil, fmt.Errorf("%d doesn't exist", ts)
+ }
+ slot := getTSSlot(t.ts, i)
+ if parseTS(slot) != ts {
+ return nil, fmt.Errorf("%d doesn't exist", ts)
+ }
+ return getVal(t.val, parseOffset(slot))
+}
+
+func (t *StreamChunkDecoder) Iterator() SeriesIterator {
+ return newBlockItemIterator(t)
+}
+
+func getVal(buf []byte, offset uint32) ([]byte, error) {
+ if uint32(len(buf)) <= offset+4 {
+ return nil, ErrInvalidValue
+ }
+ dataLen := binary.LittleEndian.Uint32(buf[offset : offset+4])
+ return buf[offset+4 : offset+4+dataLen], nil
+}
+
+func getTSSlot(data []byte, index int) []byte {
+ return data[index*TsLen : (index+1)*TsLen]
+}
+
+func parseTS(tsSlot []byte) uint64 {
+ return binary.LittleEndian.Uint64(tsSlot[:8])
+}
+
+func parseOffset(tsSlot []byte) uint32 {
+ return binary.LittleEndian.Uint32(tsSlot[8:])
+}
+
+var _ SeriesIterator = (*chunkIterator)(nil)
+
+type chunkIterator struct {
+ index []byte
+ data []byte
+ idx int
+ num int
+}
+
+func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
+ return &chunkIterator{
+ idx: -1,
+ index: decoder.ts,
+ data: decoder.val,
+ num: int(decoder.num),
+ }
+}
+
+func (b *chunkIterator) Next() bool {
+ b.idx++
+ return b.idx >= 0 && b.idx < b.num
+}
+
+func (b *chunkIterator) Val() []byte {
+ v, _ := getVal(b.data, parseOffset(getTSSlot(b.index, b.idx)))
+ return v
+}
+
+func (b *chunkIterator) Time() uint64 {
+ return parseTS(getTSSlot(b.index, b.idx))
+}
+
+func (b *chunkIterator) Error() error {
+ return nil
+}