You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/09/22 14:15:33 UTC
[skywalking-banyandb] branch main updated: To save and retrieve
element_id (#50)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new c6314fe To save and retrieve element_id (#50)
c6314fe is described below
commit c6314fe15b56143836e8969bd756a1310779b3b0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Sep 22 22:15:28 2021 +0800
To save and retrieve element_id (#50)
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/stream/stream_query.go | 10 +++++++++-
banyand/stream/stream_query_test.go | 5 +++++
banyand/stream/stream_write.go | 2 +-
banyand/tsdb/series_seek.go | 12 ++++++++++--
banyand/tsdb/series_write.go | 15 +++++++++++++++
5 files changed, 40 insertions(+), 4 deletions(-)
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index fd0a05c..f8b7665 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -70,7 +70,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
}
func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv2.TagFamily, error) {
- familyRawBytes, err := item.Val(family)
+ familyRawBytes, err := item.Family(family)
if err != nil {
return nil, err
}
@@ -102,3 +102,11 @@ func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv2.TagFami
Tags: tags,
}, err
}
+
+func (s *stream) ParseElementID(item tsdb.Item) (string, error) {
+ rawBytes, err := item.Val()
+ if err != nil {
+ return "", err
+ }
+ return string(rawBytes), nil
+}
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index 21669c6..8319487 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -681,6 +681,11 @@ func queryData(tester *assert.Assertions, s *stream, opts queryOpts) (shardsForT
elements = append(elements, tag.GetValue().GetStr().GetValue())
}
}
+ eleID, errInner := s.ParseElementID(iterator.Val())
+ if errInner != nil {
+ return nil, errInner
+ }
+ tester.NotEmpty(eleID)
}
_ = iterator.Close()
g = append(g, shardStruct{
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index dc536bf..610ef31 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -36,7 +36,6 @@ import (
var (
ErrMalformedElement = errors.New("element is malformed")
- ErrUnsupportedTagTypeAsEntry = errors.New("the tag type can not be as an entry in an entity")
ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value")
)
@@ -92,6 +91,7 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
}
builder.Family(sm.GetTagFamilies()[fi].GetName(), bb)
}
+ builder.Val([]byte(value.GetElementId()))
writer, errWrite := builder.Build()
if errWrite != nil {
return nil, errWrite
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 0c8700b..0aac353 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -32,7 +32,8 @@ type Iterator interface {
}
type Item interface {
- Val(family string) ([]byte, error)
+ Family(family string) ([]byte, error)
+ Val() ([]byte, error)
ID() common.ItemID
SortedField() []byte
Time() uint64
@@ -118,7 +119,7 @@ func (i *item) SortedField() []byte {
return i.sortedField
}
-func (i *item) Val(family string) ([]byte, error) {
+func (i *item) Family(family string) ([]byte, error) {
d := dataBucket{
seriesID: i.seriesID,
family: []byte(family),
@@ -126,6 +127,13 @@ func (i *item) Val(family string) ([]byte, error) {
return i.data.Get(d.marshal(), uint64(i.itemID))
}
+func (i *item) Val() ([]byte, error) {
+ d := dataBucket{
+ seriesID: i.seriesID,
+ }
+ return i.data.Get(d.marshal(), uint64(i.itemID))
+}
+
func (i *item) ID() common.ItemID {
return i.itemID
}
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index cdec388..8232648 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -84,6 +84,8 @@ func (w *writerBuilder) Val(val []byte) WriterBuilder {
var ErrNoTime = errors.New("no time specified")
var ErrNoVal = errors.New("no value specified")
+var ErrDuplicatedFamily = errors.New("duplicated family")
+
func (w *writerBuilder) Build() (Writer, error) {
if w.block == nil {
return nil, errors.WithStack(ErrNoTime)
@@ -91,6 +93,16 @@ func (w *writerBuilder) Build() (Writer, error) {
if len(w.values) < 1 {
return nil, errors.WithStack(ErrNoVal)
}
+ for i, value := range w.values {
+ for j := i + 1; j < len(w.values); j = j + 1 {
+ if value.family == nil && w.values[j].family == nil {
+ return nil, errors.Wrap(ErrDuplicatedFamily, "default family")
+ }
+ if bytes.Equal(value.family, w.values[j].family) {
+ return nil, errors.Wrapf(ErrDuplicatedFamily, "family:%s", value.family)
+ }
+ }
+ }
segID, blockID := w.block.identity()
return &writer{
block: w.block,
@@ -145,6 +157,9 @@ type dataBucket struct {
}
func (d dataBucket) marshal() []byte {
+ if d.family == nil {
+ return d.seriesID.Marshal()
+ }
return bytes.Join([][]byte{
d.seriesID.Marshal(),
hash(d.family),