You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/06/07 02:23:27 UTC
[skywalking-banyandb] 01/01: Add API for trace series,
index rule and index rule binding
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch api-schema
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit ae660ea7655d5b23a43750f8b61e398ac2ee0238
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Jun 7 10:18:01 2021 +0800
Add API for trace series, index rule and index rule binding
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
.licenserc.yaml | 2 +
Makefile | 3 +-
banyand/series/series.go => api/common/metadata.go | 35 +-
api/fbs/v1/schema.fbs | 74 ++--
api/fbs/v1/schema_generated.go | 390 ++++++++++++++++++---
banyand/series/series.go => api/schema/index.go | 36 +-
{banyand/series => api/schema}/series.go | 34 +-
banyand/series/{series.go => schema/schema.go} | 34 +-
banyand/series/schema/sw/index_rule.bin | Bin 0 -> 520 bytes
banyand/series/schema/sw/index_rule.json | 75 ++++
banyand/series/schema/sw/index_rule_binding.bin | Bin 0 -> 224 bytes
banyand/series/schema/sw/index_rule_binding.json | 22 ++
banyand/series/schema/sw/sw.go | 111 ++++++
banyand/series/schema/sw/trace_series.bin | Bin 0 -> 712 bytes
banyand/series/schema/sw/trace_series.json | 85 +++++
banyand/series/series.go | 55 ++-
banyand/series/service.go | 134 +++++++
banyand/series/service_test.go | 91 +++++
18 files changed, 998 insertions(+), 183 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 49f0ed4..f149469 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -70,6 +70,8 @@ header: # `header` section is configurations for source codes license header.
- 'LICENSE'
- 'NOTICE'
- '**/build/**'
+ - '**/*.bin'
+ - '**/*.json'
- '**/*_mock.go'
- '**/*_mock_test.go'
- '**/*_generated.go'
diff --git a/Makefile b/Makefile
index f85313c..d2774c4 100644
--- a/Makefile
+++ b/Makefile
@@ -28,9 +28,10 @@ clean: TARGET=clean test-clean
clean: default ## Clean artifacts in all projects
generate: ## Generate API codes
+ $(MAKE) -C api/fbs generate
+ $(MAKE) format
go install github.com/golang/mock/mockgen@v1.5.0
go generate ./...
- $(MAKE) -C api/fbs generate
$(MAKE) format
build: TARGET=all
diff --git a/banyand/series/series.go b/api/common/metadata.go
similarity index 51%
copy from banyand/series/series.go
copy to api/common/metadata.go
index 3e6de47..87020fa 100644
--- a/banyand/series/series.go
+++ b/api/common/metadata.go
@@ -15,36 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-//go:generate mockgen -destination=./series_mock.go -package=series . UniModel
-package series
+package common
-import (
- "context"
+import v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
- "github.com/apache/skywalking-banyandb/api/data"
- "github.com/apache/skywalking-banyandb/banyand/storage"
- "github.com/apache/skywalking-banyandb/pkg/run"
-)
+var MetadataKindVersion = KindVersion{Version: "v1", Kind: "metadata"}
-type Trace interface {
- FetchTrace(traceID string) (data.Trace, error)
- FetchEntity(chunkIDs []string, fields []string) ([]data.Entity, error)
- ScanEntity(startTime, endTime uint64, fields []string) ([]data.Entity, error)
-}
-
-type UniModel interface {
- Trace
-}
-
-type Service interface {
- UniModel
- run.PreRunner
-}
-
-func NewService(ctx context.Context, db storage.Database) (Service, error) {
- return nil, nil
-}
-
-// TODO: this interface should contains methods to access schema objects
-type SchemaRepo interface {
+type Metadata struct {
+ KindVersion
+ Spec v1.Metadata
}
diff --git a/api/fbs/v1/schema.fbs b/api/fbs/v1/schema.fbs
index 9358af1..cfd5d15 100644
--- a/api/fbs/v1/schema.fbs
+++ b/api/fbs/v1/schema.fbs
@@ -27,21 +27,50 @@ struct Duration {
unit:DurationUint;
}
+enum FieldType:byte { String = 0, Int = 1, StringArray = 2, IntArray = 3 }
+
+table FieldSpec {
+ name:string;
+ type:FieldType;
+}
+
+table TraceStateMap {
+ field:string;
+ val_success:string;
+ val_error:string;
+}
+
+// The key in TraceFieldMap are reserved by trace series engine. Their corresponding value is the Fields or
+// the combination of Fields
+table TraceFieldMap {
+ // trace_id the unique identity of a single trace
+ trace_id:string;
+ // series_id groups entites into a storage bucket
+ series_id:[string];
+ // state indicates the trace is "success" or "error"
+ state:TraceStateMap;
+}
+
// TraceSeries represents a trace storage object
table TraceSeries {
// metadata is the identity of a trace series
metadata:Metadata;
+ // fields defines accetped fields
+ fields:[FieldSpec];
+ // reserved_fields_map indicates how to index reserved fields to ingested fields
+ reserved_fields_map:TraceFieldMap;
// duration determines how long a TraceSeries keeps its data
duration:Duration;
- // updated_at indicates when the TraceSeries is updated
- updated_at:uint64;
+ // updated_nanoseconds_at indicates when the TraceSeries is updated
+ updated_nanoseconds_at:uint64;
}
// Catalog refers to a placement contains objects belonged to a particular data type
enum Catalog:byte { Trace = 0, Log = 1, Metric = 2 }
// IndexType determine the index structure under the hood
-enum IndexType:byte { Text = 0, Numerical = 1, ID = 2}
+// Fields with SeriesInternal type is reserved by Series module, would not be indexed by Index module.
+enum IndexType:byte { Text = 0, Numerical = 1, ID = 2, MultiText = 3, MultiNumberical = 4, SeriesInternal = 5 }
// IndexObject defines who should be indexed.
table IndexObject {
@@ -54,7 +83,7 @@ table IndexObject {
}
// IndexRule defines how to generate indices based on IndexObject
-// IndexRule should bind to an IndexSubject through an IndexRuleBinding to generate proper indices.
+// IndexRule should bind to an Series through an IndexRuleBinding to generate proper indices.
// Example: A trace entity wants to index fields: trace_id, service_name, endpoint_name, and latency.
// and service_name and endpoint_name would combine a multi-field index.
// The index rule could be:
@@ -77,49 +106,48 @@ table IndexObject {
// type: Numerical
// },
// ]
-// update_at: .......
+// update_nanoseconds_at: .......
// }
table IndexRule {
// metadata define the rule's identity
metadata:Metadata;
// objects refer to which fields should be indexed
objects:[IndexObject];
- // updated_at indicates when the IndexRule is updated
- updated_at:uint64;
+ // updated_nanoseconds_at indicates when the IndexRule is updated
+ updated_nanoseconds_at:uint64;
}
-// IndexSubject defines which subject(series) would generate indices
+// Series defines which subject(series) would generate indices
// For example, if a TraceSeries's metadata is {name: sw_trace, group: production},
-// in consequence, the IndexSubject is
-// Index Subject {
+// in consequence, the Series is
+// series {
// catalog: Trace
// series: {name: sw_trace, group: production}
// }
-table IndexSubject {
+table Series {
// catalog is where the subject/series belongs to
catalog:Catalog;
// series refers to a series in a particular catalog
series:Metadata;
}
-
-// IndexRuleBinding is a bridge to connect an IndexRule to several IndexSubjects
-// This binding is valid between begin_at and expire_at, that provides flexible strategies
+// IndexRuleBinding is a bridge to connect an IndexRule to several Seriess
+// This binding is valid between begin_nanoseconds_at and expire_nanoseconds_at, that provides flexible strategies
// to control how to generate time series indices.
table IndexRuleBinding {
// metadata is the identity of this binding
metadata:Metadata;
// rule_ref refers to the IndexRule
rule_ref:Metadata;
- // subjects indicate which subjects will use such IndexRule to generate indices
- subjects:[IndexSubject];
- // begin_at is the timestamp, after which the binding will be active
- begin_at:uint64;
- // expire_at it the timestamp, after which the binding will be inactive
- // expire_at must be larger than begin_at
- expire_at:uint64;
- // updated_at indicates when the IndexRuleBinding is updated
- updated_at:uint64;
+ // subjects indicate the subjects of binding action
+ subjects:[Series];
+ // begin_nanoseconds_at is the timestamp, after which the binding will be active
+ begin_nanoseconds_at:uint64;
+ // expire_nanoseconds_at it the timestamp, after which the binding will be inactive
+ // expire_nanoseconds_at must be larger than begin_nanoseconds_at
+ expire_nanoseconds_at:uint64;
+ // updated_nanoseconds_at indicates when the IndexRuleBinding is updated
+ updated_nanoseconds_at:uint64;
}
root_type TraceSeries;
diff --git a/api/fbs/v1/schema_generated.go b/api/fbs/v1/schema_generated.go
index fa870fd..c8dfb18 100644
--- a/api/fbs/v1/schema_generated.go
+++ b/api/fbs/v1/schema_generated.go
@@ -38,6 +38,36 @@ func (v DurationUint) String() string {
return "DurationUint(" + strconv.FormatInt(int64(v), 10) + ")"
}
+type FieldType int8
+
+const (
+ FieldTypeString FieldType = 0
+ FieldTypeInt FieldType = 1
+ FieldTypeStringArray FieldType = 2
+ FieldTypeIntArray FieldType = 3
+)
+
+var EnumNamesFieldType = map[FieldType]string{
+ FieldTypeString: "String",
+ FieldTypeInt: "Int",
+ FieldTypeStringArray: "StringArray",
+ FieldTypeIntArray: "IntArray",
+}
+
+var EnumValuesFieldType = map[string]FieldType{
+ "String": FieldTypeString,
+ "Int": FieldTypeInt,
+ "StringArray": FieldTypeStringArray,
+ "IntArray": FieldTypeIntArray,
+}
+
+func (v FieldType) String() string {
+ if s, ok := EnumNamesFieldType[v]; ok {
+ return s
+ }
+ return "FieldType(" + strconv.FormatInt(int64(v), 10) + ")"
+}
+
type Catalog int8
const (
@@ -68,21 +98,30 @@ func (v Catalog) String() string {
type IndexType int8
const (
- IndexTypeText IndexType = 0
- IndexTypeNumerical IndexType = 1
- IndexTypeID IndexType = 2
+ IndexTypeText IndexType = 0
+ IndexTypeNumerical IndexType = 1
+ IndexTypeID IndexType = 2
+ IndexTypeMultiText IndexType = 3
+ IndexTypeMultiNumberical IndexType = 4
+ IndexTypeSeriesInternal IndexType = 5
)
var EnumNamesIndexType = map[IndexType]string{
- IndexTypeText: "Text",
- IndexTypeNumerical: "Numerical",
- IndexTypeID: "ID",
+ IndexTypeText: "Text",
+ IndexTypeNumerical: "Numerical",
+ IndexTypeID: "ID",
+ IndexTypeMultiText: "MultiText",
+ IndexTypeMultiNumberical: "MultiNumberical",
+ IndexTypeSeriesInternal: "SeriesInternal",
}
var EnumValuesIndexType = map[string]IndexType{
- "Text": IndexTypeText,
- "Numerical": IndexTypeNumerical,
- "ID": IndexTypeID,
+ "Text": IndexTypeText,
+ "Numerical": IndexTypeNumerical,
+ "ID": IndexTypeID,
+ "MultiText": IndexTypeMultiText,
+ "MultiNumberical": IndexTypeMultiNumberical,
+ "SeriesInternal": IndexTypeSeriesInternal,
}
func (v IndexType) String() string {
@@ -127,6 +166,217 @@ func CreateDuration(builder *flatbuffers.Builder, val uint32, unit DurationUint)
return builder.Offset()
}
+type FieldSpec struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsFieldSpec(buf []byte, offset flatbuffers.UOffsetT) *FieldSpec {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &FieldSpec{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func GetSizePrefixedRootAsFieldSpec(buf []byte, offset flatbuffers.UOffsetT) *FieldSpec {
+ n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
+ x := &FieldSpec{}
+ x.Init(buf, n+offset+flatbuffers.SizeUint32)
+ return x
+}
+
+func (rcv *FieldSpec) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *FieldSpec) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *FieldSpec) Name() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *FieldSpec) Type() FieldType {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ return FieldType(rcv._tab.GetInt8(o + rcv._tab.Pos))
+ }
+ return 0
+}
+
+func (rcv *FieldSpec) MutateType(n FieldType) bool {
+ return rcv._tab.MutateInt8Slot(6, int8(n))
+}
+
+func FieldSpecStart(builder *flatbuffers.Builder) {
+ builder.StartObject(2)
+}
+func FieldSpecAddName(builder *flatbuffers.Builder, name flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(name), 0)
+}
+func FieldSpecAddType(builder *flatbuffers.Builder, type_ FieldType) {
+ builder.PrependInt8Slot(1, int8(type_), 0)
+}
+func FieldSpecEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
+
+type TraceStateMap struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsTraceStateMap(buf []byte, offset flatbuffers.UOffsetT) *TraceStateMap {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &TraceStateMap{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func GetSizePrefixedRootAsTraceStateMap(buf []byte, offset flatbuffers.UOffsetT) *TraceStateMap {
+ n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
+ x := &TraceStateMap{}
+ x.Init(buf, n+offset+flatbuffers.SizeUint32)
+ return x
+}
+
+func (rcv *TraceStateMap) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *TraceStateMap) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *TraceStateMap) Field() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *TraceStateMap) ValSuccess() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *TraceStateMap) ValError() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func TraceStateMapStart(builder *flatbuffers.Builder) {
+ builder.StartObject(3)
+}
+func TraceStateMapAddField(builder *flatbuffers.Builder, field flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(field), 0)
+}
+func TraceStateMapAddValSuccess(builder *flatbuffers.Builder, valSuccess flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(valSuccess), 0)
+}
+func TraceStateMapAddValError(builder *flatbuffers.Builder, valError flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(valError), 0)
+}
+func TraceStateMapEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
+
+type TraceFieldMap struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsTraceFieldMap(buf []byte, offset flatbuffers.UOffsetT) *TraceFieldMap {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &TraceFieldMap{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func GetSizePrefixedRootAsTraceFieldMap(buf []byte, offset flatbuffers.UOffsetT) *TraceFieldMap {
+ n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
+ x := &TraceFieldMap{}
+ x.Init(buf, n+offset+flatbuffers.SizeUint32)
+ return x
+}
+
+func (rcv *TraceFieldMap) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *TraceFieldMap) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *TraceFieldMap) TraceId() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *TraceFieldMap) SeriesId(j int) []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4))
+ }
+ return nil
+}
+
+func (rcv *TraceFieldMap) SeriesIdLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func (rcv *TraceFieldMap) State(obj *TraceStateMap) *TraceStateMap {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ x := rcv._tab.Indirect(o + rcv._tab.Pos)
+ if obj == nil {
+ obj = new(TraceStateMap)
+ }
+ obj.Init(rcv._tab.Bytes, x)
+ return obj
+ }
+ return nil
+}
+
+func TraceFieldMapStart(builder *flatbuffers.Builder) {
+ builder.StartObject(3)
+}
+func TraceFieldMapAddTraceId(builder *flatbuffers.Builder, traceId flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(traceId), 0)
+}
+func TraceFieldMapAddSeriesId(builder *flatbuffers.Builder, seriesId flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(seriesId), 0)
+}
+func TraceFieldMapStartSeriesIdVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(4, numElems, 4)
+}
+func TraceFieldMapAddState(builder *flatbuffers.Builder, state flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(state), 0)
+}
+func TraceFieldMapEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
+
type TraceSeries struct {
_tab flatbuffers.Table
}
@@ -167,9 +417,42 @@ func (rcv *TraceSeries) Metadata(obj *Metadata) *Metadata {
return nil
}
-func (rcv *TraceSeries) Duration(obj *Duration) *Duration {
+func (rcv *TraceSeries) Fields(obj *FieldSpec, j int) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
+ x := rcv._tab.Vector(o)
+ x += flatbuffers.UOffsetT(j) * 4
+ x = rcv._tab.Indirect(x)
+ obj.Init(rcv._tab.Bytes, x)
+ return true
+ }
+ return false
+}
+
+func (rcv *TraceSeries) FieldsLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func (rcv *TraceSeries) ReservedFieldsMap(obj *TraceFieldMap) *TraceFieldMap {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ x := rcv._tab.Indirect(o + rcv._tab.Pos)
+ if obj == nil {
+ obj = new(TraceFieldMap)
+ }
+ obj.Init(rcv._tab.Bytes, x)
+ return obj
+ }
+ return nil
+}
+
+func (rcv *TraceSeries) Duration(obj *Duration) *Duration {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ if o != 0 {
x := o + rcv._tab.Pos
if obj == nil {
obj = new(Duration)
@@ -180,29 +463,38 @@ func (rcv *TraceSeries) Duration(obj *Duration) *Duration {
return nil
}
-func (rcv *TraceSeries) UpdatedAt() uint64 {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+func (rcv *TraceSeries) UpdatedNanosecondsAt() uint64 {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
}
return 0
}
-func (rcv *TraceSeries) MutateUpdatedAt(n uint64) bool {
- return rcv._tab.MutateUint64Slot(8, n)
+func (rcv *TraceSeries) MutateUpdatedNanosecondsAt(n uint64) bool {
+ return rcv._tab.MutateUint64Slot(12, n)
}
func TraceSeriesStart(builder *flatbuffers.Builder) {
- builder.StartObject(3)
+ builder.StartObject(5)
}
func TraceSeriesAddMetadata(builder *flatbuffers.Builder, metadata flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(metadata), 0)
}
+func TraceSeriesAddFields(builder *flatbuffers.Builder, fields flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(fields), 0)
+}
+func TraceSeriesStartFieldsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(4, numElems, 4)
+}
+func TraceSeriesAddReservedFieldsMap(builder *flatbuffers.Builder, reservedFieldsMap flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(reservedFieldsMap), 0)
+}
func TraceSeriesAddDuration(builder *flatbuffers.Builder, duration flatbuffers.UOffsetT) {
- builder.PrependStructSlot(1, flatbuffers.UOffsetT(duration), 0)
+ builder.PrependStructSlot(3, flatbuffers.UOffsetT(duration), 0)
}
-func TraceSeriesAddUpdatedAt(builder *flatbuffers.Builder, updatedAt uint64) {
- builder.PrependUint64Slot(2, updatedAt, 0)
+func TraceSeriesAddUpdatedNanosecondsAt(builder *flatbuffers.Builder, updatedNanosecondsAt uint64) {
+ builder.PrependUint64Slot(4, updatedNanosecondsAt, 0)
}
func TraceSeriesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
@@ -340,7 +632,7 @@ func (rcv *IndexRule) ObjectsLength() int {
return 0
}
-func (rcv *IndexRule) UpdatedAt() uint64 {
+func (rcv *IndexRule) UpdatedNanosecondsAt() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
@@ -348,7 +640,7 @@ func (rcv *IndexRule) UpdatedAt() uint64 {
return 0
}
-func (rcv *IndexRule) MutateUpdatedAt(n uint64) bool {
+func (rcv *IndexRule) MutateUpdatedNanosecondsAt(n uint64) bool {
return rcv._tab.MutateUint64Slot(8, n)
}
@@ -364,41 +656,41 @@ func IndexRuleAddObjects(builder *flatbuffers.Builder, objects flatbuffers.UOffs
func IndexRuleStartObjectsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(4, numElems, 4)
}
-func IndexRuleAddUpdatedAt(builder *flatbuffers.Builder, updatedAt uint64) {
- builder.PrependUint64Slot(2, updatedAt, 0)
+func IndexRuleAddUpdatedNanosecondsAt(builder *flatbuffers.Builder, updatedNanosecondsAt uint64) {
+ builder.PrependUint64Slot(2, updatedNanosecondsAt, 0)
}
func IndexRuleEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}
-type IndexSubject struct {
+type Series struct {
_tab flatbuffers.Table
}
-func GetRootAsIndexSubject(buf []byte, offset flatbuffers.UOffsetT) *IndexSubject {
+func GetRootAsSeries(buf []byte, offset flatbuffers.UOffsetT) *Series {
n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &IndexSubject{}
+ x := &Series{}
x.Init(buf, n+offset)
return x
}
-func GetSizePrefixedRootAsIndexSubject(buf []byte, offset flatbuffers.UOffsetT) *IndexSubject {
+func GetSizePrefixedRootAsSeries(buf []byte, offset flatbuffers.UOffsetT) *Series {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
- x := &IndexSubject{}
+ x := &Series{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
-func (rcv *IndexSubject) Init(buf []byte, i flatbuffers.UOffsetT) {
+func (rcv *Series) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
-func (rcv *IndexSubject) Table() flatbuffers.Table {
+func (rcv *Series) Table() flatbuffers.Table {
return rcv._tab
}
-func (rcv *IndexSubject) Catalog() Catalog {
+func (rcv *Series) Catalog() Catalog {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return Catalog(rcv._tab.GetInt8(o + rcv._tab.Pos))
@@ -406,11 +698,11 @@ func (rcv *IndexSubject) Catalog() Catalog {
return 0
}
-func (rcv *IndexSubject) MutateCatalog(n Catalog) bool {
+func (rcv *Series) MutateCatalog(n Catalog) bool {
return rcv._tab.MutateInt8Slot(4, int8(n))
}
-func (rcv *IndexSubject) Series(obj *Metadata) *Metadata {
+func (rcv *Series) Series(obj *Metadata) *Metadata {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
x := rcv._tab.Indirect(o + rcv._tab.Pos)
@@ -423,16 +715,16 @@ func (rcv *IndexSubject) Series(obj *Metadata) *Metadata {
return nil
}
-func IndexSubjectStart(builder *flatbuffers.Builder) {
+func SeriesStart(builder *flatbuffers.Builder) {
builder.StartObject(2)
}
-func IndexSubjectAddCatalog(builder *flatbuffers.Builder, catalog Catalog) {
+func SeriesAddCatalog(builder *flatbuffers.Builder, catalog Catalog) {
builder.PrependInt8Slot(0, int8(catalog), 0)
}
-func IndexSubjectAddSeries(builder *flatbuffers.Builder, series flatbuffers.UOffsetT) {
+func SeriesAddSeries(builder *flatbuffers.Builder, series flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(series), 0)
}
-func IndexSubjectEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+func SeriesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}
@@ -489,7 +781,7 @@ func (rcv *IndexRuleBinding) RuleRef(obj *Metadata) *Metadata {
return nil
}
-func (rcv *IndexRuleBinding) Subjects(obj *IndexSubject, j int) bool {
+func (rcv *IndexRuleBinding) Subjects(obj *Series, j int) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
x := rcv._tab.Vector(o)
@@ -509,7 +801,7 @@ func (rcv *IndexRuleBinding) SubjectsLength() int {
return 0
}
-func (rcv *IndexRuleBinding) BeginAt() uint64 {
+func (rcv *IndexRuleBinding) BeginNanosecondsAt() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
@@ -517,11 +809,11 @@ func (rcv *IndexRuleBinding) BeginAt() uint64 {
return 0
}
-func (rcv *IndexRuleBinding) MutateBeginAt(n uint64) bool {
+func (rcv *IndexRuleBinding) MutateBeginNanosecondsAt(n uint64) bool {
return rcv._tab.MutateUint64Slot(10, n)
}
-func (rcv *IndexRuleBinding) ExpireAt() uint64 {
+func (rcv *IndexRuleBinding) ExpireNanosecondsAt() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
@@ -529,11 +821,11 @@ func (rcv *IndexRuleBinding) ExpireAt() uint64 {
return 0
}
-func (rcv *IndexRuleBinding) MutateExpireAt(n uint64) bool {
+func (rcv *IndexRuleBinding) MutateExpireNanosecondsAt(n uint64) bool {
return rcv._tab.MutateUint64Slot(12, n)
}
-func (rcv *IndexRuleBinding) UpdatedAt() uint64 {
+func (rcv *IndexRuleBinding) UpdatedNanosecondsAt() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
@@ -541,7 +833,7 @@ func (rcv *IndexRuleBinding) UpdatedAt() uint64 {
return 0
}
-func (rcv *IndexRuleBinding) MutateUpdatedAt(n uint64) bool {
+func (rcv *IndexRuleBinding) MutateUpdatedNanosecondsAt(n uint64) bool {
return rcv._tab.MutateUint64Slot(14, n)
}
@@ -560,14 +852,14 @@ func IndexRuleBindingAddSubjects(builder *flatbuffers.Builder, subjects flatbuff
func IndexRuleBindingStartSubjectsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(4, numElems, 4)
}
-func IndexRuleBindingAddBeginAt(builder *flatbuffers.Builder, beginAt uint64) {
- builder.PrependUint64Slot(3, beginAt, 0)
+func IndexRuleBindingAddBeginNanosecondsAt(builder *flatbuffers.Builder, beginNanosecondsAt uint64) {
+ builder.PrependUint64Slot(3, beginNanosecondsAt, 0)
}
-func IndexRuleBindingAddExpireAt(builder *flatbuffers.Builder, expireAt uint64) {
- builder.PrependUint64Slot(4, expireAt, 0)
+func IndexRuleBindingAddExpireNanosecondsAt(builder *flatbuffers.Builder, expireNanosecondsAt uint64) {
+ builder.PrependUint64Slot(4, expireNanosecondsAt, 0)
}
-func IndexRuleBindingAddUpdatedAt(builder *flatbuffers.Builder, updatedAt uint64) {
- builder.PrependUint64Slot(5, updatedAt, 0)
+func IndexRuleBindingAddUpdatedNanosecondsAt(builder *flatbuffers.Builder, updatedNanosecondsAt uint64) {
+ builder.PrependUint64Slot(5, updatedNanosecondsAt, 0)
}
func IndexRuleBindingEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
diff --git a/banyand/series/series.go b/api/schema/index.go
similarity index 52%
copy from banyand/series/series.go
copy to api/schema/index.go
index 3e6de47..99f43d1 100644
--- a/banyand/series/series.go
+++ b/api/schema/index.go
@@ -15,36 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-//go:generate mockgen -destination=./series_mock.go -package=series . UniModel
-package series
+package schema
import (
- "context"
-
- "github.com/apache/skywalking-banyandb/api/data"
- "github.com/apache/skywalking-banyandb/banyand/storage"
- "github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/api/common"
+ v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
)
-type Trace interface {
- FetchTrace(traceID string) (data.Trace, error)
- FetchEntity(chunkIDs []string, fields []string) ([]data.Entity, error)
- ScanEntity(startTime, endTime uint64, fields []string) ([]data.Entity, error)
-}
-
-type UniModel interface {
- Trace
-}
-
-type Service interface {
- UniModel
- run.PreRunner
-}
+var IndexRuleKindVersion = common.KindVersion{Version: "v1", Kind: "schema-index-rule"}
+var IndexRuleBindingKindVersion = common.KindVersion{Version: "v1", Kind: "schema-index-rule-binding"}
-func NewService(ctx context.Context, db storage.Database) (Service, error) {
- return nil, nil
+type IndexRule struct {
+ common.KindVersion
+ Spec v1.IndexRule
}
-// TODO: this interface should contains methods to access schema objects
-type SchemaRepo interface {
+type IndexRuleBinding struct {
+ common.KindVersion
+ Spec v1.IndexRuleBinding
}
diff --git a/banyand/series/series.go b/api/schema/series.go
similarity index 52%
copy from banyand/series/series.go
copy to api/schema/series.go
index 3e6de47..da1aab1 100644
--- a/banyand/series/series.go
+++ b/api/schema/series.go
@@ -15,36 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-//go:generate mockgen -destination=./series_mock.go -package=series . UniModel
-package series
+package schema
import (
- "context"
-
- "github.com/apache/skywalking-banyandb/api/data"
- "github.com/apache/skywalking-banyandb/banyand/storage"
- "github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/api/common"
+ v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
)
-type Trace interface {
- FetchTrace(traceID string) (data.Trace, error)
- FetchEntity(chunkIDs []string, fields []string) ([]data.Entity, error)
- ScanEntity(startTime, endTime uint64, fields []string) ([]data.Entity, error)
-}
-
-type UniModel interface {
- Trace
-}
-
-type Service interface {
- UniModel
- run.PreRunner
-}
-
-func NewService(ctx context.Context, db storage.Database) (Service, error) {
- return nil, nil
-}
+var SeriesKindVersion = common.KindVersion{Version: "v1", Kind: "schema-series"}
-// TODO: this interface should contains methods to access schema objects
-type SchemaRepo interface {
+type TraceSeries struct {
+ common.KindVersion
+ Spec v1.TraceSeries
}
diff --git a/banyand/series/series.go b/banyand/series/schema/schema.go
similarity index 53%
copy from banyand/series/series.go
copy to banyand/series/schema/schema.go
index 3e6de47..d56c63e 100644
--- a/banyand/series/series.go
+++ b/banyand/series/schema/schema.go
@@ -15,36 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-//go:generate mockgen -destination=./series_mock.go -package=series . UniModel
-package series
+package schema
import (
"context"
- "github.com/apache/skywalking-banyandb/api/data"
- "github.com/apache/skywalking-banyandb/banyand/storage"
- "github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/api/common"
+ apischema "github.com/apache/skywalking-banyandb/api/schema"
)
-type Trace interface {
- FetchTrace(traceID string) (data.Trace, error)
- FetchEntity(chunkIDs []string, fields []string) ([]data.Entity, error)
- ScanEntity(startTime, endTime uint64, fields []string) ([]data.Entity, error)
+type ListOpt struct {
+ Group string
}
-type UniModel interface {
- Trace
+type TraceSeries interface {
+ Get(ctx context.Context, metadata common.Metadata) (apischema.TraceSeries, error)
+ List(ctx context.Context, opt ListOpt) ([]apischema.TraceSeries, error)
}
-type Service interface {
- UniModel
- run.PreRunner
+type IndexRule interface {
+ Get(ctx context.Context, metadata common.Metadata) (apischema.IndexRule, error)
+ List(ctx context.Context, opt ListOpt) ([]apischema.IndexRule, error)
}
-func NewService(ctx context.Context, db storage.Database) (Service, error) {
- return nil, nil
-}
-
-// TODO: this interface should contains methods to access schema objects
-type SchemaRepo interface {
+type IndexRuleBinding interface {
+ Get(ctx context.Context, metadata common.Metadata) (apischema.IndexRuleBinding, error)
+ List(ctx context.Context, opt ListOpt) ([]apischema.IndexRuleBinding, error)
}
diff --git a/banyand/series/schema/sw/index_rule.bin b/banyand/series/schema/sw/index_rule.bin
new file mode 100644
index 0000000..4d1204d
Binary files /dev/null and b/banyand/series/schema/sw/index_rule.bin differ
diff --git a/banyand/series/schema/sw/index_rule.json b/banyand/series/schema/sw/index_rule.json
new file mode 100644
index 0000000..da1c71b
--- /dev/null
+++ b/banyand/series/schema/sw/index_rule.json
@@ -0,0 +1,75 @@
+{
+ "metadata": {
+ "name": "sw-index-rule",
+ "group": "default"
+ },
+ "objects": [
+ {
+ "fields": [
+ "trace_id"
+ ],
+ "type": "ID"
+ },
+ {
+ "fields": [
+ "service_id"
+ ],
+ "type": "Text"
+ },
+ {
+ "fields": [
+ "service_instance_id"
+ ],
+ "type": "Text"
+ },
+ {
+ "fields": [
+ "endpoint_id"
+ ],
+ "type": "Text"
+ },
+ {
+ "fields": [
+ "http.method"
+ ],
+ "type": "Text"
+ },
+ {
+ "fields": [
+ "status_code"
+ ],
+ "type": "Text"
+ },
+ {
+ "fields": [
+ "db.type"
+ ],
+ "type": "Text"
+ },
+ {
+ "fields": [
+ "db.instance"
+ ],
+ "type": "Text"
+ },
+ {
+ "fields": [
+ "mq.queue"
+ ],
+ "type": "Text"
+ },
+ {
+ "fields": [
+ "mq.topic"
+ ],
+ "type": "Text"
+ },
+ {
+ "fields": [
+ "mq.broker"
+ ],
+ "type": "Text"
+ }
+ ],
+ "updated_nanoseconds_at": 1622933202000000000
+}
\ No newline at end of file
diff --git a/banyand/series/schema/sw/index_rule_binding.bin b/banyand/series/schema/sw/index_rule_binding.bin
new file mode 100644
index 0000000..f791732
Binary files /dev/null and b/banyand/series/schema/sw/index_rule_binding.bin differ
diff --git a/banyand/series/schema/sw/index_rule_binding.json b/banyand/series/schema/sw/index_rule_binding.json
new file mode 100644
index 0000000..4edbed4
--- /dev/null
+++ b/banyand/series/schema/sw/index_rule_binding.json
@@ -0,0 +1,22 @@
+{
+ "metadata": {
+ "name": "sw-index-rule-binding",
+ "group": "default"
+ },
+ "rule_ref": {
+ "name": "sw-index-rule",
+ "group": "default"
+ },
+ "subjects": [
+ {
+ "catalog": "Trace",
+ "series": {
+ "name": "sw",
+ "group": "default"
+ }
+ }
+ ],
+ "begin_nanoseconds_at": 1622933202000000000,
+ "expire_nanoseconds_at": 4778577979000000000,
+ "updated_nanoseconds_at": 1622933202000000000
+}
\ No newline at end of file
diff --git a/banyand/series/schema/sw/sw.go b/banyand/series/schema/sw/sw.go
new file mode 100644
index 0000000..472b73d
--- /dev/null
+++ b/banyand/series/schema/sw/sw.go
@@ -0,0 +1,111 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:generate flatc --root-type "banyandb.v1.TraceSeries" --binary ../../../../api/fbs/v1/schema.fbs trace_series.json
+//go:generate flatc --root-type "banyandb.v1.IndexRule" --binary ../../../../api/fbs/v1/schema.fbs index_rule.json
+//go:generate flatc --root-type "banyandb.v1.IndexRuleBinding" --binary ../../../../api/fbs/v1/schema.fbs index_rule_binding.json
+package sw
+
+import (
+ "context"
+ //nolint:golint
+ _ "embed"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+ apischema "github.com/apache/skywalking-banyandb/api/schema"
+ "github.com/apache/skywalking-banyandb/banyand/series/schema"
+)
+
+var (
+ _ schema.TraceSeries = (*traceSeriesRepo)(nil)
+ _ schema.IndexRule = (*indexRuleRepo)(nil)
+
+ //go:embed trace_series.bin
+ traceSeriesBin []byte
+ //go:embed index_rule.bin
+ indexRuleBin []byte
+ //go:embed index_rule_binding.bin
+ indexRuleBindingBin []byte
+)
+
+type traceSeriesRepo struct {
+}
+
+func NewTraceSeries() schema.TraceSeries {
+ return &traceSeriesRepo{}
+}
+
+func (l *traceSeriesRepo) Get(_ context.Context, _ common.Metadata) (apischema.TraceSeries, error) {
+ return apischema.TraceSeries{
+ KindVersion: apischema.SeriesKindVersion,
+ Spec: *v1.GetRootAsTraceSeries(traceSeriesBin, 0),
+ }, nil
+
+}
+
+func (l *traceSeriesRepo) List(ctx context.Context, _ schema.ListOpt) ([]apischema.TraceSeries, error) {
+ t, err := l.Get(ctx, common.Metadata{})
+ if err != nil {
+ return nil, err
+ }
+ return []apischema.TraceSeries{t}, nil
+}
+
+type indexRuleRepo struct {
+}
+
+func NewIndexRule() schema.IndexRule {
+ return &indexRuleRepo{}
+}
+
+func (i *indexRuleRepo) Get(ctx context.Context, metadata common.Metadata) (apischema.IndexRule, error) {
+ return apischema.IndexRule{
+ KindVersion: apischema.IndexRuleKindVersion,
+ Spec: *v1.GetRootAsIndexRule(indexRuleBin, 0),
+ }, nil
+}
+
+func (i *indexRuleRepo) List(ctx context.Context, opt schema.ListOpt) ([]apischema.IndexRule, error) {
+ t, err := i.Get(ctx, common.Metadata{})
+ if err != nil {
+ return nil, err
+ }
+ return []apischema.IndexRule{t}, nil
+}
+
+type indexRuleBindingRepo struct {
+}
+
+func NewIndexRuleBinding() schema.IndexRuleBinding {
+ return &indexRuleBindingRepo{}
+}
+
+func (i *indexRuleBindingRepo) Get(_ context.Context, _ common.Metadata) (apischema.IndexRuleBinding, error) {
+ return apischema.IndexRuleBinding{
+ KindVersion: apischema.IndexRuleBindingKindVersion,
+ Spec: *v1.GetRootAsIndexRuleBinding(indexRuleBindingBin, 0),
+ }, nil
+}
+
+func (i *indexRuleBindingRepo) List(ctx context.Context, _ schema.ListOpt) ([]apischema.IndexRuleBinding, error) {
+ t, err := i.Get(ctx, common.Metadata{})
+ if err != nil {
+ return nil, err
+ }
+ return []apischema.IndexRuleBinding{t}, nil
+}
diff --git a/banyand/series/schema/sw/trace_series.bin b/banyand/series/schema/sw/trace_series.bin
new file mode 100644
index 0000000..a089d72
Binary files /dev/null and b/banyand/series/schema/sw/trace_series.bin differ
diff --git a/banyand/series/schema/sw/trace_series.json b/banyand/series/schema/sw/trace_series.json
new file mode 100644
index 0000000..89f9959
--- /dev/null
+++ b/banyand/series/schema/sw/trace_series.json
@@ -0,0 +1,85 @@
+{
+ "metadata": {
+ "name": "sw",
+ "group": "default"
+ },
+ "fields": [
+ {
+ "name": "trace_id",
+ "type": "String"
+ },
+ {
+ "name": "service_id",
+ "type": "String"
+ },
+ {
+ "name": "service_instance_id",
+ "type": "String"
+ },
+ {
+ "name": "endpoint_id",
+ "type": "String"
+ },
+ {
+ "name": "service_name",
+ "type": "String"
+ },
+ {
+ "name": "service_instance_name",
+ "type": "String"
+ },
+ {
+ "name": "endpoint_name",
+ "type": "String"
+ },
+ {
+ "name": "state",
+ "type": "Int"
+ },
+ {
+ "name": "http.method",
+ "type": "String"
+ },
+ {
+ "name": "status_code",
+ "type": "String"
+ },
+ {
+ "name": "db.type",
+ "type": "String"
+ },
+ {
+ "name": "db.instance",
+ "type": "String"
+ },
+ {
+ "name": "mq.queue",
+ "type": "String"
+ },
+ {
+ "name": "mq.topic",
+ "type": "String"
+ },
+ {
+ "name": "mq.broker",
+ "type": "String"
+ }
+ ],
+ "reserved_fields_map": {
+ "trace_id": "trace_id",
+ "series_id": [
+ "service_id",
+ "service_instance_id"
+ ],
+ "state": {
+ "field": "state",
+ "val_success": "0",
+ "val_error": "1"
+ }
+ },
+ "duration": {
+ "val": 7,
+ "unit": "Day"
+ },
+ "updated_nanoseconds_at": 1622933202000000000
+}
\ No newline at end of file
diff --git a/banyand/series/series.go b/banyand/series/series.go
index 3e6de47..2b3a657 100644
--- a/banyand/series/series.go
+++ b/banyand/series/series.go
@@ -22,29 +22,66 @@ import (
"context"
"github.com/apache/skywalking-banyandb/api/data"
+ v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+ "github.com/apache/skywalking-banyandb/banyand/series/schema"
"github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/run"
)
-type Trace interface {
+// TraceState represents the state of a trace link
+type TraceState int
+
+var (
+ TraceStateSuccess = 0
+ TraceStateError = 1
+)
+
+//ScanOptions contain options
+//nolint
+type ScanOptions struct {
+ projection []string
+ state TraceState
+ limit uint32
+}
+
+//TraceRepo contains trace and entity data
+type TraceRepo interface {
+ //FetchTrace returns data.Trace by traceID
FetchTrace(traceID string) (data.Trace, error)
- FetchEntity(chunkIDs []string, fields []string) ([]data.Entity, error)
- ScanEntity(startTime, endTime uint64, fields []string) ([]data.Entity, error)
+ //FetchEntity returns data.Entity by ChunkID
+ FetchEntity(chunkIDs []string, opt ScanOptions) ([]data.Entity, error)
+ //ScanEntity returns data.Entity between a duration by ScanOptions
+ ScanEntity(startTime, endTime uint64, opt ScanOptions) ([]data.Entity, error)
}
+//UniModel combines Trace, Metric and Log repositories into a union interface
type UniModel interface {
- Trace
+ TraceRepo
}
+//SchemaRepo contains schema definition
+type SchemaRepo interface {
+ TraceSeries() schema.TraceSeries
+ IndexRule() schema.IndexRule
+ IndexRuleBinding() schema.IndexRuleBinding
+}
+
+//IndexFilter provides methods to find a specific index related objects
+type IndexFilter interface {
+ //RulesBySubject fetches IndexRule by Series defined in IndexRuleBinding
+ RulesBySubject(ctx context.Context, subject v1.Series) ([]v1.IndexRule, error)
+}
+
+//Service provides operations how to access series module
type Service interface {
UniModel
+ SchemaRepo
+ IndexFilter
+ run.Config
run.PreRunner
}
+//NewService returns a new service
func NewService(ctx context.Context, db storage.Database) (Service, error) {
- return nil, nil
-}
-
-// TODO: this interface should contains methods to access schema objects
-type SchemaRepo interface {
+ return &service{db: db}, nil
}
diff --git a/banyand/series/service.go b/banyand/series/service.go
new file mode 100644
index 0000000..77f64c3
--- /dev/null
+++ b/banyand/series/service.go
@@ -0,0 +1,134 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package series
+
+import (
+ "bytes"
+ "context"
+ "time"
+
+ "go.uber.org/multierr"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/data"
+ v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+ "github.com/apache/skywalking-banyandb/banyand/series/schema"
+ "github.com/apache/skywalking-banyandb/banyand/series/schema/sw"
+ "github.com/apache/skywalking-banyandb/banyand/storage"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+var _ Service = (*service)(nil)
+
+type service struct {
+ db storage.Database
+}
+
+//Methods for query objects in the schema
+
+func (s *service) TraceSeries() schema.TraceSeries {
+ return sw.NewTraceSeries()
+}
+
+func (s *service) IndexRule() schema.IndexRule {
+ return sw.NewIndexRule()
+}
+
+func (s *service) IndexRuleBinding() schema.IndexRuleBinding {
+ return sw.NewIndexRuleBinding()
+}
+
+func (s *service) RulesBySubject(ctx context.Context, subject v1.Series) ([]v1.IndexRule, error) {
+ group := subject.Series(nil).Group()
+ var groupStr string
+ if group != nil {
+ groupStr = string(group)
+ }
+ bindings, err := s.IndexRuleBinding().List(ctx, schema.ListOpt{Group: groupStr})
+ if err != nil {
+ return nil, err
+ }
+ subjectSeries := subject.Series(nil)
+ if subjectSeries == nil {
+ return nil, nil
+ }
+ now := uint64(time.Now().UnixNano())
+ foundRules := make([]v1.Metadata, 0)
+ for _, binding := range bindings {
+ spec := binding.Spec
+ if spec.BeginNanosecondsAt() > now ||
+ spec.ExpireNanosecondsAt() < now {
+ continue
+ }
+ for i := 0; i < spec.SubjectsLength(); i++ {
+ sub := &v1.Series{}
+ if spec.Subjects(sub, i) &&
+ sub.Catalog() == subject.Catalog() {
+ s1 := sub.Series(nil)
+ if s1 != nil &&
+ bytes.Equal(s1.Name(), subjectSeries.Name()) &&
+ bytes.Equal(s1.Group(), subjectSeries.Group()) {
+ ruleRef := spec.RuleRef(nil)
+ if ruleRef != nil {
+ foundRules = append(foundRules, *ruleRef)
+ }
+ }
+ break
+ }
+ }
+ }
+ result := make([]v1.IndexRule, 0, len(foundRules))
+ var indexRuleErr error
+ for _, rule := range foundRules {
+ object, getErr := s.IndexRule().Get(ctx, common.Metadata{KindVersion: common.MetadataKindVersion, Spec: rule})
+ if getErr != nil {
+ indexRuleErr = multierr.Append(indexRuleErr, err)
+ continue
+ }
+ result = append(result, object.Spec)
+ }
+ return result, indexRuleErr
+}
+
+func (s *service) FetchTrace(traceID string) (data.Trace, error) {
+ panic("implement me")
+}
+
+func (s *service) FetchEntity(chunkIDs []string, opt ScanOptions) ([]data.Entity, error) {
+ panic("implement me")
+}
+
+func (s *service) ScanEntity(startTime, endTime uint64, opt ScanOptions) ([]data.Entity, error) {
+ panic("implement me")
+}
+
+func (s *service) Name() string {
+ panic("implement me")
+}
+
+func (s *service) FlagSet() *run.FlagSet {
+ panic("implement me")
+}
+
+func (s *service) Validate() error {
+ panic("implement me")
+}
+
+func (s *service) PreRun() error {
+ panic("implement me")
+}
diff --git a/banyand/series/service_test.go b/banyand/series/service_test.go
new file mode 100644
index 0000000..e95a6eb
--- /dev/null
+++ b/banyand/series/service_test.go
@@ -0,0 +1,91 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package series
+
+import (
+ "context"
+ "reflect"
+ "testing"
+
+ flatbuffers "github.com/google/flatbuffers/go"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+ "github.com/apache/skywalking-banyandb/banyand/series/schema/sw"
+)
+
+func Test_service_RulesBySubject(t *testing.T) {
+ tests := []struct {
+ name string
+ arg v1.Series
+ want []v1.IndexRule
+ wantErr bool
+ }{
+ {
+ name: "golden path",
+ arg: createSubject("sw", "default"),
+ want: getIndexRule("sw-index-rule", "default"),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := &service{}
+ ctx := context.Background()
+ got, err := s.RulesBySubject(ctx, tt.arg)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("RulesBySubject() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("RulesBySubject() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func getIndexRule(name, group string) []v1.IndexRule {
+ b := flatbuffers.NewBuilder(0)
+ b.Finish(createMetadata(b, name, group))
+ md := v1.GetRootAsMetadata(b.FinishedBytes(), 0)
+ indexRule, _ := sw.NewIndexRule().Get(context.Background(), common.Metadata{KindVersion: common.MetadataKindVersion, Spec: *md})
+ return []v1.IndexRule{indexRule.Spec}
+}
+
+func createMetadata(b *flatbuffers.Builder, name, group string) flatbuffers.UOffsetT {
+ namePos := b.CreateString(name)
+ groupPos := b.CreateString(group)
+ v1.MetadataStart(b)
+ v1.MetadataAddName(b, namePos)
+ v1.MetadataAddGroup(b, groupPos)
+ return v1.MetadataEnd(b)
+}
+
+func createSubject(name, group string) v1.Series {
+ b := flatbuffers.NewBuilder(0)
+ namePos := b.CreateString(name)
+ groupPos := b.CreateString(group)
+ v1.MetadataStart(b)
+ v1.MetadataAddName(b, namePos)
+ v1.MetadataAddGroup(b, groupPos)
+ s := v1.MetadataEnd(b)
+ v1.IndexRuleStart(b)
+ v1.SeriesAddCatalog(b, v1.CatalogTrace)
+ v1.SeriesAddSeries(b, s)
+ b.Finish(v1.IndexRuleEnd(b))
+ return *v1.GetRootAsSeries(b.FinishedBytes(), 0)
+}