You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/22 13:54:47 UTC

[skywalking-banyandb] branch stream-ele created (now 48ded93)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch stream-ele
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


      at 48ded93  To save and retrieve element_id

This branch includes the following new commits:

     new 48ded93  To save and retrieve element_id

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-banyandb] 01/01: To save and retrieve element_id

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch stream-ele
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 48ded932da5d198d91736d5fea964d302ea34c73
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Sep 22 21:53:14 2021 +0800

    To save and retrieve element_id
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/stream/stream_query.go      | 10 +++++++++-
 banyand/stream/stream_query_test.go |  5 +++++
 banyand/stream/stream_write.go      |  2 +-
 banyand/tsdb/series_seek.go         | 12 ++++++++++--
 banyand/tsdb/series_write.go        | 15 +++++++++++++++
 5 files changed, 40 insertions(+), 4 deletions(-)

diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index fd0a05c..f8b7665 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -70,7 +70,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
 }
 
 func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv2.TagFamily, error) {
-	familyRawBytes, err := item.Val(family)
+	familyRawBytes, err := item.Family(family)
 	if err != nil {
 		return nil, err
 	}
@@ -102,3 +102,11 @@ func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv2.TagFami
 		Tags: tags,
 	}, err
 }
+
+func (s *stream) ParseElementID(item tsdb.Item) (string, error) {
+	rawBytes, err := item.Val()
+	if err != nil {
+		return "", err
+	}
+	return string(rawBytes), nil
+}
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index 21669c6..8319487 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -681,6 +681,11 @@ func queryData(tester *assert.Assertions, s *stream, opts queryOpts) (shardsForT
 								elements = append(elements, tag.GetValue().GetStr().GetValue())
 							}
 						}
+						eleID, errInner := s.ParseElementID(iterator.Val())
+						if errInner != nil {
+							return nil, errInner
+						}
+						tester.NotEmpty(eleID)
 					}
 					_ = iterator.Close()
 					g = append(g, shardStruct{
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index dc536bf..610ef31 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -36,7 +36,6 @@ import (
 
 var (
 	ErrMalformedElement            = errors.New("element is malformed")
-	ErrUnsupportedTagTypeAsEntry   = errors.New("the tag type can not be as an entry in an entity")
 	ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value")
 )
 
@@ -92,6 +91,7 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
 			}
 			builder.Family(sm.GetTagFamilies()[fi].GetName(), bb)
 		}
+		builder.Val([]byte(value.GetElementId()))
 		writer, errWrite := builder.Build()
 		if errWrite != nil {
 			return nil, errWrite
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 0c8700b..0aac353 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -32,7 +32,8 @@ type Iterator interface {
 }
 
 type Item interface {
-	Val(family string) ([]byte, error)
+	Family(family string) ([]byte, error)
+	Val() ([]byte, error)
 	ID() common.ItemID
 	SortedField() []byte
 	Time() uint64
@@ -118,7 +119,7 @@ func (i *item) SortedField() []byte {
 	return i.sortedField
 }
 
-func (i *item) Val(family string) ([]byte, error) {
+func (i *item) Family(family string) ([]byte, error) {
 	d := dataBucket{
 		seriesID: i.seriesID,
 		family:   []byte(family),
@@ -126,6 +127,13 @@ func (i *item) Val(family string) ([]byte, error) {
 	return i.data.Get(d.marshal(), uint64(i.itemID))
 }
 
+func (i *item) Val() ([]byte, error) {
+	d := dataBucket{
+		seriesID: i.seriesID,
+	}
+	return i.data.Get(d.marshal(), uint64(i.itemID))
+}
+
 func (i *item) ID() common.ItemID {
 	return i.itemID
 }
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index cdec388..8232648 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -84,6 +84,8 @@ func (w *writerBuilder) Val(val []byte) WriterBuilder {
 var ErrNoTime = errors.New("no time specified")
 var ErrNoVal = errors.New("no value specified")
 
+var ErrDuplicatedFamily = errors.New("duplicated family")
+
 func (w *writerBuilder) Build() (Writer, error) {
 	if w.block == nil {
 		return nil, errors.WithStack(ErrNoTime)
@@ -91,6 +93,16 @@ func (w *writerBuilder) Build() (Writer, error) {
 	if len(w.values) < 1 {
 		return nil, errors.WithStack(ErrNoVal)
 	}
+	for i, value := range w.values {
+		for j := i + 1; j < len(w.values); j = j + 1 {
+			if value.family == nil && w.values[j].family == nil {
+				return nil, errors.Wrap(ErrDuplicatedFamily, "default family")
+			}
+			if bytes.Equal(value.family, w.values[j].family) {
+				return nil, errors.Wrapf(ErrDuplicatedFamily, "family:%s", value.family)
+			}
+		}
+	}
 	segID, blockID := w.block.identity()
 	return &writer{
 		block: w.block,
@@ -145,6 +157,9 @@ type dataBucket struct {
 }
 
 func (d dataBucket) marshal() []byte {
+	if d.family == nil {
+		return d.seriesID.Marshal()
+	}
 	return bytes.Join([][]byte{
 		d.seriesID.Marshal(),
 		hash(d.family),