You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/11/14 00:16:56 UTC

[skywalking-banyandb] branch main updated: Introduce encoding component (#59)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new aa9fb38  Introduce encoding component (#59)
aa9fb38 is described below

commit aa9fb38fb59df1ca4f61c05440089e864bcec27f
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sun Nov 14 08:16:49 2021 +0800

    Introduce encoding component (#59)
    
    * Encoding component helps tsdb module to customize how to
        encode data in a chunk
      * StreamChunkEncoder/Decoder handles stream module's encoding
      * Update badger to support encoding component
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go         |  20 ++++
 banyand/kv/kv.go             |  40 ++++---
 banyand/stream/stream.go     |  12 +++
 banyand/tsdb/block.go        |  22 ++--
 banyand/tsdb/tsdb.go         |  27 ++++-
 banyand/tsdb/tsdb_test.go    |   9 ++
 go.mod                       |   6 +-
 go.sum                       |   8 +-
 pkg/encoding/encoding.go     |  66 ++++++++++++
 pkg/encoding/stream_chunk.go | 250 +++++++++++++++++++++++++++++++++++++++++++
 10 files changed, 419 insertions(+), 41 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index a0b0a1b..24820f0 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -24,8 +24,10 @@ import (
 	"time"
 
 	"github.com/dgraph-io/badger/v3"
+	"github.com/dgraph-io/badger/v3/bydb"
 	"github.com/dgraph-io/badger/v3/y"
 
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -256,3 +258,21 @@ func (l *badgerLog) Infof(f string, v ...interface{}) {
 func (l *badgerLog) Debugf(f string, v ...interface{}) {
 	l.delegated.Debug().Msgf(f, v...)
 }
+
+var _ bydb.TSetDecoder = (*decoderDelegate)(nil)
+
+type decoderDelegate struct {
+	encoding.SeriesDecoder
+}
+
+func (d *decoderDelegate) Iterator() bydb.TSetIterator {
+	return &iterDelegate{
+		SeriesIterator: d.SeriesDecoder.Iterator(),
+	}
+}
+
+var _ bydb.TSetDecoder = (*decoderDelegate)(nil)
+
+type iterDelegate struct {
+	encoding.SeriesIterator
+}
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index fc829d3..ea92d1c 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -23,8 +23,10 @@ import (
 	"math"
 
 	"github.com/dgraph-io/badger/v3"
+	"github.com/dgraph-io/badger/v3/bydb"
 	"github.com/pkg/errors"
 
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -101,6 +103,20 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
 	}
 }
 
+func TSSWithEncoding(encoderFactory encoding.SeriesEncoderFactory, decoderFactory encoding.SeriesDecoderFactory) TimeSeriesOptions {
+	return func(store TimeSeriesStore) {
+		if btss, ok := store.(*badgerTSS); ok {
+			btss.dbOpts = btss.dbOpts.WithExternalCompactor(func() bydb.TSetEncoder {
+				return encoderFactory()
+			}, func() bydb.TSetDecoder {
+				return &decoderDelegate{
+					SeriesDecoder: decoderFactory(),
+				}
+			})
+		}
+	}
+}
+
 type Iterator interface {
 	Next()
 	Rewind()
@@ -125,7 +141,7 @@ type IndexStore interface {
 }
 
 // OpenTimeSeriesStore creates a new TimeSeriesStore
-func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize int, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
+func OpenTimeSeriesStore(shardID int, path string, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
 	btss := new(badgerTSS)
 	btss.shardID = shardID
 	btss.dbOpts = badger.DefaultOptions(path)
@@ -139,7 +155,7 @@ func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize
 	if err != nil {
 		return nil, fmt.Errorf("failed to open time series store: %v", err)
 	}
-	btss.TSet = *badger.NewTSet(btss.db, compressLevel, valueSize)
+	btss.TSet = *badger.NewTSet(btss.db)
 	return btss, nil
 }
 
@@ -161,26 +177,6 @@ func StoreWithNamedLogger(name string, l *logger.Logger) StoreOptions {
 	}
 }
 
-// StoreWithBufferSize sets a external logger into underlying Store
-func StoreWithBufferSize(size int64) StoreOptions {
-	return func(store Store) {
-		if bdb, ok := store.(*badgerDB); ok {
-			bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size)
-		}
-	}
-}
-
-type FlushCallback func()
-
-// StoreWithFlushCallback sets a callback function
-func StoreWithFlushCallback(callback FlushCallback) StoreOptions {
-	return func(store Store) {
-		if bdb, ok := store.(*badgerDB); ok {
-			bdb.dbOpts.FlushCallBack = callback
-		}
-	}
-}
-
 // OpenStore creates a new Store
 func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) {
 	bdb := new(badgerDB)
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 35ae2a5..760687d 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -23,10 +23,14 @@ import (
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 )
 
+// a chunk is 1MB
+const chunkSize = 1 << 20
+
 type indexRule struct {
 	rule       *databasev1.IndexRule
 	tagIndices []partition.TagLocator
@@ -102,6 +106,14 @@ func openStream(root string, spec streamSpec, l *logger.Logger) (*stream, error)
 			Location:   root,
 			ShardNum:   sm.schema.GetOpts().GetShardNum(),
 			IndexRules: spec.indexRules,
+			EncodingMethod: tsdb.EncodingMethod{
+				EncoderFactory: func() encoding.SeriesEncoder {
+					return encoding.NewStreamChunkEncoder(chunkSize)
+				},
+				DecoderFactory: func() encoding.SeriesDecoder {
+					return encoding.NewStreamChunkDecoder(chunkSize)
+				},
+			},
 		})
 	if err != nil {
 		return nil, err
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 231cef9..2f38308 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -23,6 +23,7 @@ import (
 	"time"
 
 	"github.com/dgraph-io/ristretto/z"
+	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -50,11 +51,9 @@ type block struct {
 }
 
 type blockOpts struct {
-	segID         uint16
-	blockID       uint16
-	path          string
-	compressLevel int
-	valueSize     int
+	segID   uint16
+	blockID uint16
+	path    string
 }
 
 func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
@@ -71,8 +70,17 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 			b.l = pl.Named("block")
 		}
 	}
-	if b.store, err = kv.OpenTimeSeriesStore(0, b.path+"/store", opts.compressLevel, opts.valueSize,
-		kv.TSSWithLogger(b.l)); err != nil {
+	encodingMethodObject := ctx.Value(encodingMethodKey)
+	if encodingMethodObject == nil {
+		return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to create a block")
+	}
+	encodingMethod := encodingMethodObject.(EncodingMethod)
+	if b.store, err = kv.OpenTimeSeriesStore(
+		0,
+		b.path+"/store",
+		kv.TSSWithEncoding(encodingMethod.EncoderFactory, encodingMethod.DecoderFactory),
+		kv.TSSWithLogger(b.l),
+	); err != nil {
 		return nil, err
 	}
 	if b.primaryIndex, err = lsm.NewStore(lsm.StoreOpts{
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 80a84f5..8df233b 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -31,6 +31,7 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -47,10 +48,16 @@ const (
 	dirPerm = 0700
 )
 
-var ErrInvalidShardID = errors.New("invalid shard id")
-var indexRulesKey = contextIndexRulesKey{}
+var (
+	ErrInvalidShardID       = errors.New("invalid shard id")
+	ErrEncodingMethodAbsent = errors.New("encoding method is absent")
+
+	indexRulesKey     = contextIndexRulesKey{}
+	encodingMethodKey = contextEncodingMethodKey{}
+)
 
 type contextIndexRulesKey struct{}
+type contextEncodingMethodKey struct{}
 
 type Database interface {
 	io.Closer
@@ -68,9 +75,15 @@ type Shard interface {
 var _ Database = (*database)(nil)
 
 type DatabaseOpts struct {
-	Location   string
-	ShardNum   uint32
-	IndexRules []*databasev1.IndexRule
+	Location       string
+	ShardNum       uint32
+	IndexRules     []*databasev1.IndexRule
+	EncodingMethod EncodingMethod
+}
+
+type EncodingMethod struct {
+	EncoderFactory encoding.SeriesEncoderFactory
+	DecoderFactory encoding.SeriesDecoderFactory
 }
 
 type database struct {
@@ -111,6 +124,9 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 			db.logger = pl.Named("tsdb")
 		}
 	}
+	if opts.EncodingMethod.EncoderFactory == nil || opts.EncodingMethod.DecoderFactory == nil {
+		return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to open database")
+	}
 	if _, err := mkdir(opts.Location); err != nil {
 		return nil, err
 	}
@@ -122,6 +138,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 	}
 	thisContext := context.WithValue(ctx, logger.ContextKey, db.logger)
 	thisContext = context.WithValue(thisContext, indexRulesKey, opts.IndexRules)
+	thisContext = context.WithValue(thisContext, encodingMethodKey, opts.EncodingMethod)
 	if len(entries) > 0 {
 		return loadDatabase(thisContext, db)
 	}
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 3f91e40..89c56d6 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -27,6 +27,7 @@ import (
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test"
 )
@@ -56,6 +57,14 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database
 		DatabaseOpts{
 			Location: tempDir,
 			ShardNum: 1,
+			EncodingMethod: EncodingMethod{
+				EncoderFactory: func() encoding.SeriesEncoder {
+					return nil
+				},
+				DecoderFactory: func() encoding.SeriesDecoder {
+					return nil
+				},
+			},
 		})
 	t.NoError(err)
 	t.NotNil(db)
diff --git a/go.mod b/go.mod
index a1ba284..24e4ce0 100644
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,7 @@ require (
 	github.com/golang/protobuf v1.5.2
 	github.com/google/go-cmp v0.5.6
 	github.com/google/uuid v1.3.0
-	github.com/klauspost/compress v1.13.1 // indirect
+	github.com/klauspost/compress v1.13.1
 	github.com/oklog/run v1.1.0
 	github.com/pkg/errors v0.9.1
 	github.com/rs/zerolog v1.23.0
@@ -43,7 +43,7 @@ require (
 	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
 	github.com/golang/snappy v0.0.3 // indirect
 	github.com/google/btree v1.0.1 // indirect
-	github.com/google/flatbuffers v1.12.0 // indirect
+	github.com/google/flatbuffers v1.12.1 // indirect
 	github.com/gorilla/websocket v1.4.2 // indirect
 	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
@@ -101,4 +101,4 @@ require (
 	sigs.k8s.io/yaml v1.2.0 // indirect
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476
diff --git a/go.sum b/go.sum
index 4ed137e..957109b 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
 github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
 github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165 h1:csoTNiGUMtp4H1AchgaZWJ4WY4uJQ6s+pz3sXS93jAA=
-github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
+github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 h1:MH/Jy2x3WF3RdD+WD25XepG4fzIz3qMOoIUM4Enn+GA=
+github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -192,8 +192,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
 github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
 github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
-github.com/google/flatbuffers v1.12.0 h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w=
-github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
+github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw=
+github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
new file mode 100644
index 0000000..5e04e72
--- /dev/null
+++ b/pkg/encoding/encoding.go
@@ -0,0 +1,66 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import "github.com/pkg/errors"
+
+var ErrEncodeEmpty = errors.New("encode an empty value")
+
+type SeriesEncoderFactory func() SeriesEncoder
+
+// SeriesEncoder encodes time series data point
+type SeriesEncoder interface {
+	// Append a data point
+	Append(ts uint64, value []byte)
+	// IsFull returns whether the encoded data reached its capacity
+	IsFull() bool
+	// Reset the underlying buffer
+	Reset()
+	// Encode the time series data point to a binary
+	Encode() ([]byte, error)
+	// StartTime indicates the first entry's time
+	StartTime() uint64
+}
+
+type SeriesDecoderFactory func() SeriesDecoder
+
+// SeriesDecoder decodes encoded time series data
+type SeriesDecoder interface {
+	// Decode the time series data
+	Decode(data []byte) error
+	// Len denotes the size of iterator
+	Len() int
+	// IsFull returns whether the encoded data reached its capacity
+	IsFull() bool
+	// Get the data point by its time
+	Get(ts uint64) ([]byte, error)
+	// Iterator returns a SeriesIterator
+	Iterator() SeriesIterator
+}
+
+// SeriesIterator iterates time series data
+type SeriesIterator interface {
+	// Next scroll the cursor to the next
+	Next() bool
+	// Val returns the value of the current data point
+	Val() []byte
+	// Time returns the time of the current data point
+	Time() uint64
+	// Error might return an error indicates a decode failure
+	Error() error
+}
diff --git a/pkg/encoding/stream_chunk.go b/pkg/encoding/stream_chunk.go
new file mode 100644
index 0000000..7ff5094
--- /dev/null
+++ b/pkg/encoding/stream_chunk.go
@@ -0,0 +1,250 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+	"bytes"
+	"encoding/binary"
+	"fmt"
+	"sort"
+
+	"github.com/klauspost/compress/zstd"
+	"github.com/pkg/errors"
+)
+
+var (
+	decoder, _               = zstd.NewReader(nil)
+	encoder, _               = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
+	_          SeriesEncoder = (*streamChunkEncoder)(nil)
+	_          SeriesDecoder = (*StreamChunkDecoder)(nil)
+)
+
+//streamChunkEncoder backport to reduced value
+type streamChunkEncoder struct {
+	tsBuff    bytes.Buffer
+	valBuff   bytes.Buffer
+	scratch   [binary.MaxVarintLen64]byte
+	len       uint32
+	num       uint32
+	startTime uint64
+	valueSize int
+}
+
+func NewStreamChunkEncoder(size int) SeriesEncoder {
+	return &streamChunkEncoder{
+		valueSize: size,
+	}
+}
+
+func (t *streamChunkEncoder) Append(ts uint64, value []byte) {
+	if t.startTime == 0 {
+		t.startTime = ts
+	} else if t.startTime > ts {
+		t.startTime = ts
+	}
+	vLen := len(value)
+	offset := uint32(len(t.valBuff.Bytes()))
+	t.valBuff.Write(t.putUint32(uint32(vLen)))
+	t.valBuff.Write(value)
+	t.tsBuff.Write(t.putUint64(ts))
+	t.tsBuff.Write(t.putUint32(offset))
+	t.num = t.num + 1
+}
+
+func (t *streamChunkEncoder) IsFull() bool {
+	return t.valBuff.Len() >= t.valueSize
+}
+
+func (t *streamChunkEncoder) Reset() {
+	t.tsBuff.Reset()
+	t.valBuff.Reset()
+	t.num = 0
+	t.startTime = 0
+}
+
+func (t *streamChunkEncoder) Encode() ([]byte, error) {
+	if t.tsBuff.Len() < 1 {
+		return nil, ErrEncodeEmpty
+	}
+	val := t.valBuff.Bytes()
+	t.len = uint32(len(val))
+	_, err := t.tsBuff.WriteTo(&t.valBuff)
+	if err != nil {
+		return nil, err
+	}
+	t.valBuff.Write(t.putUint32(t.num))
+	t.valBuff.Write(t.putUint32(t.len))
+	data := t.valBuff.Bytes()
+	l := len(data)
+	dst := make([]byte, 0, compressBound(l))
+	dst = encoder.EncodeAll(data, dst)
+	result := make([]byte, len(dst)+2)
+	copy(result, dst)
+	copy(result[len(dst):], t.putUint16(uint16(l)))
+	return result, nil
+}
+
+func compressBound(srcSize int) int {
+	return srcSize + (srcSize >> 8)
+}
+
+func (t *streamChunkEncoder) StartTime() uint64 {
+	return t.startTime
+}
+
+func (t *streamChunkEncoder) putUint16(v uint16) []byte {
+	binary.LittleEndian.PutUint16(t.scratch[:], v)
+	return t.scratch[:2]
+}
+
+func (t *streamChunkEncoder) putUint32(v uint32) []byte {
+	binary.LittleEndian.PutUint32(t.scratch[:], v)
+	return t.scratch[:4]
+}
+
+func (t *streamChunkEncoder) putUint64(v uint64) []byte {
+	binary.LittleEndian.PutUint64(t.scratch[:], v)
+	return t.scratch[:8]
+}
+
+const (
+	// TsLen equals ts(uint64) + data_offset(uint32)
+	TsLen = 8 + 4
+)
+
+var ErrInvalidValue = errors.New("invalid encoded value")
+
+//StreamChunkDecoder decodes encoded time index
+type StreamChunkDecoder struct {
+	ts        []byte
+	val       []byte
+	len       uint32
+	num       uint32
+	valueSize int
+}
+
+func NewStreamChunkDecoder(size int) SeriesDecoder {
+	return &StreamChunkDecoder{
+		valueSize: size,
+	}
+}
+
+func (t *StreamChunkDecoder) Len() int {
+	return int(t.num)
+}
+
+func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
+	var data []byte
+	size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
+	if data, err = decoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
+		return err
+	}
+	l := uint32(len(data))
+	if l <= 8 {
+		return ErrInvalidValue
+	}
+	lenOffset := len(data) - 4
+	numOffset := lenOffset - 4
+	t.num = binary.LittleEndian.Uint32(data[numOffset:lenOffset])
+	t.len = binary.LittleEndian.Uint32(data[lenOffset:])
+	if l <= t.len+8 {
+		return ErrInvalidValue
+	}
+	t.val = data[:t.len]
+	t.ts = data[t.len:numOffset]
+	return nil
+}
+
+func (t *StreamChunkDecoder) IsFull() bool {
+	return int(t.len) >= t.valueSize
+}
+
+func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
+	i := sort.Search(int(t.num), func(i int) bool {
+		slot := getTSSlot(t.ts, i)
+		return parseTS(slot) <= ts
+	})
+	if i >= int(t.num) {
+		return nil, fmt.Errorf("%d doesn't exist", ts)
+	}
+	slot := getTSSlot(t.ts, i)
+	if parseTS(slot) != ts {
+		return nil, fmt.Errorf("%d doesn't exist", ts)
+	}
+	return getVal(t.val, parseOffset(slot))
+}
+
+func (t *StreamChunkDecoder) Iterator() SeriesIterator {
+	return newBlockItemIterator(t)
+}
+
+func getVal(buf []byte, offset uint32) ([]byte, error) {
+	if uint32(len(buf)) <= offset+4 {
+		return nil, ErrInvalidValue
+	}
+	dataLen := binary.LittleEndian.Uint32(buf[offset : offset+4])
+	return buf[offset+4 : offset+4+dataLen], nil
+}
+
+func getTSSlot(data []byte, index int) []byte {
+	return data[index*TsLen : (index+1)*TsLen]
+}
+
+func parseTS(tsSlot []byte) uint64 {
+	return binary.LittleEndian.Uint64(tsSlot[:8])
+}
+
+func parseOffset(tsSlot []byte) uint32 {
+	return binary.LittleEndian.Uint32(tsSlot[8:])
+}
+
+var _ SeriesIterator = (*chunkIterator)(nil)
+
+type chunkIterator struct {
+	index []byte
+	data  []byte
+	idx   int
+	num   int
+}
+
+func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
+	return &chunkIterator{
+		idx:   -1,
+		index: decoder.ts,
+		data:  decoder.val,
+		num:   int(decoder.num),
+	}
+}
+
+func (b *chunkIterator) Next() bool {
+	b.idx++
+	return b.idx >= 0 && b.idx < b.num
+}
+
+func (b *chunkIterator) Val() []byte {
+	v, _ := getVal(b.data, parseOffset(getTSSlot(b.index, b.idx)))
+	return v
+}
+
+func (b *chunkIterator) Time() uint64 {
+	return parseTS(getTSSlot(b.index, b.idx))
+}
+
+func (b *chunkIterator) Error() error {
+	return nil
+}