You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/09/22 14:15:33 UTC

[skywalking-banyandb] branch main updated: To save and retrieve element_id (#50)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c6314fe  To save and retrieve element_id (#50)
c6314fe is described below

commit c6314fe15b56143836e8969bd756a1310779b3b0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Sep 22 22:15:28 2021 +0800

    To save and retrieve element_id (#50)
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/stream/stream_query.go      | 10 +++++++++-
 banyand/stream/stream_query_test.go |  5 +++++
 banyand/stream/stream_write.go      |  2 +-
 banyand/tsdb/series_seek.go         | 12 ++++++++++--
 banyand/tsdb/series_write.go        | 15 +++++++++++++++
 5 files changed, 40 insertions(+), 4 deletions(-)

diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index fd0a05c..f8b7665 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -70,7 +70,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
 }
 
 func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv2.TagFamily, error) {
-	familyRawBytes, err := item.Val(family)
+	familyRawBytes, err := item.Family(family)
 	if err != nil {
 		return nil, err
 	}
@@ -102,3 +102,11 @@ func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv2.TagFami
 		Tags: tags,
 	}, err
 }
+
+func (s *stream) ParseElementID(item tsdb.Item) (string, error) {
+	rawBytes, err := item.Val()
+	if err != nil {
+		return "", err
+	}
+	return string(rawBytes), nil
+}
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index 21669c6..8319487 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -681,6 +681,11 @@ func queryData(tester *assert.Assertions, s *stream, opts queryOpts) (shardsForT
 								elements = append(elements, tag.GetValue().GetStr().GetValue())
 							}
 						}
+						eleID, errInner := s.ParseElementID(iterator.Val())
+						if errInner != nil {
+							return nil, errInner
+						}
+						tester.NotEmpty(eleID)
 					}
 					_ = iterator.Close()
 					g = append(g, shardStruct{
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index dc536bf..610ef31 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -36,7 +36,6 @@ import (
 
 var (
 	ErrMalformedElement            = errors.New("element is malformed")
-	ErrUnsupportedTagTypeAsEntry   = errors.New("the tag type can not be as an entry in an entity")
 	ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value")
 )
 
@@ -92,6 +91,7 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
 			}
 			builder.Family(sm.GetTagFamilies()[fi].GetName(), bb)
 		}
+		builder.Val([]byte(value.GetElementId()))
 		writer, errWrite := builder.Build()
 		if errWrite != nil {
 			return nil, errWrite
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 0c8700b..0aac353 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -32,7 +32,8 @@ type Iterator interface {
 }
 
 type Item interface {
-	Val(family string) ([]byte, error)
+	Family(family string) ([]byte, error)
+	Val() ([]byte, error)
 	ID() common.ItemID
 	SortedField() []byte
 	Time() uint64
@@ -118,7 +119,7 @@ func (i *item) SortedField() []byte {
 	return i.sortedField
 }
 
-func (i *item) Val(family string) ([]byte, error) {
+func (i *item) Family(family string) ([]byte, error) {
 	d := dataBucket{
 		seriesID: i.seriesID,
 		family:   []byte(family),
@@ -126,6 +127,13 @@ func (i *item) Val(family string) ([]byte, error) {
 	return i.data.Get(d.marshal(), uint64(i.itemID))
 }
 
+func (i *item) Val() ([]byte, error) {
+	d := dataBucket{
+		seriesID: i.seriesID,
+	}
+	return i.data.Get(d.marshal(), uint64(i.itemID))
+}
+
 func (i *item) ID() common.ItemID {
 	return i.itemID
 }
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index cdec388..8232648 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -84,6 +84,8 @@ func (w *writerBuilder) Val(val []byte) WriterBuilder {
 var ErrNoTime = errors.New("no time specified")
 var ErrNoVal = errors.New("no value specified")
 
+var ErrDuplicatedFamily = errors.New("duplicated family")
+
 func (w *writerBuilder) Build() (Writer, error) {
 	if w.block == nil {
 		return nil, errors.WithStack(ErrNoTime)
@@ -91,6 +93,16 @@ func (w *writerBuilder) Build() (Writer, error) {
 	if len(w.values) < 1 {
 		return nil, errors.WithStack(ErrNoVal)
 	}
+	for i, value := range w.values {
+		for j := i + 1; j < len(w.values); j = j + 1 {
+			if value.family == nil && w.values[j].family == nil {
+				return nil, errors.Wrap(ErrDuplicatedFamily, "default family")
+			}
+			if bytes.Equal(value.family, w.values[j].family) {
+				return nil, errors.Wrapf(ErrDuplicatedFamily, "family:%s", value.family)
+			}
+		}
+	}
 	segID, blockID := w.block.identity()
 	return &writer{
 		block: w.block,
@@ -145,6 +157,9 @@ type dataBucket struct {
 }
 
 func (d dataBucket) marshal() []byte {
+	if d.family == nil {
+		return d.seriesID.Marshal()
+	}
 	return bytes.Join([][]byte{
 		d.seriesID.Marshal(),
 		hash(d.family),