You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2022/10/11 07:37:28 UTC

[GitHub] [skywalking-banyandb] hanahmily commented on a diff in pull request #164: Add streaming API and topN aggregator

hanahmily commented on code in PR #164:
URL: https://github.com/apache/skywalking-banyandb/pull/164#discussion_r991878854


##########
banyand/measure/measure_write.go:
##########
@@ -55,6 +56,15 @@ func (s *measure) Write(value *measurev1.DataPointValue) error {
 		close(waitCh)
 		return err
 	}
+	// send to stream processor

Review Comment:
   This `Write` method is only for testing. Please add the stream processor to `Rev` of `writeCallback`



##########
banyand/query/processor_topn.go:
##########
@@ -0,0 +1,436 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package query
+
+import (
+	"bytes"
+	"container/heap"
+	"context"
+	"math"
+	"time"
+
+	"github.com/pkg/errors"
+	"golang.org/x/exp/slices"
+	"google.golang.org/protobuf/proto"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	"github.com/apache/skywalking-banyandb/banyand/measure"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/flow"
+	"github.com/apache/skywalking-banyandb/pkg/flow/streaming"
+	"github.com/apache/skywalking-banyandb/pkg/query/aggregation"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+type topNQueryProcessor struct {
+	measureService measure.Service
+	*queryService
+}
+
+func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
+	request, ok := message.Data().(*measurev1.TopNRequest)
+	if !ok {
+		t.log.Warn().Msg("invalid event data type")
+		return
+	}
+	if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
+		t.log.Warn().Msg("invalid requested sort direction")
+		return
+	}
+	t.log.Info().Msg("received a topN query event")
+	topNMetadata := request.GetMetadata()
+	topNSchema, err := t.metaService.TopNAggregationRegistry().GetTopNAggregation(context.TODO(), topNMetadata)
+	if err != nil {
+		t.log.Error().Err(err).
+			Str("topN", topNMetadata.GetName()).
+			Msg("fail to get execution context")
+		return
+	}
+	if topNSchema.GetFieldValueSort() != modelv1.Sort_SORT_UNSPECIFIED &&
+		topNSchema.GetFieldValueSort() != request.GetFieldValueSort() {
+		t.log.Warn().Msg("unmatched sort direction")
+		return
+	}
+	sourceMeasure, err := t.measureService.Measure(topNSchema.GetSourceMeasure())
+	if err != nil {
+		t.log.Error().Err(err).
+			Str("topN", topNMetadata.GetName()).
+			Msg("fail to find source measure")
+		return
+	}
+	shards, err := sourceMeasure.CompanionShards(topNMetadata)
+	if err != nil {
+		t.log.Error().Err(err).
+			Str("topN", topNMetadata.GetName()).
+			Msg("fail to list shards")
+		return
+	}
+	aggregator := createTopNPostAggregator(request.GetTopN(),
+		request.GetAgg(), request.GetFieldValueSort())
+	entity, err := locateEntity(topNSchema, request.GetConditions())
+	if err != nil {
+		t.log.Error().Err(err).
+			Str("topN", topNMetadata.GetName()).
+			Msg("fail to parse entity")
+		return
+	}
+	for _, shard := range shards {
+		// TODO: support condition
+		sl, innerErr := shard.Series().List(tsdb.NewPath(entity))
+		if innerErr != nil {
+			t.log.Error().Err(innerErr).
+				Str("topN", topNMetadata.GetName()).
+				Msg("fail to list series")
+			return
+		}
+		for _, series := range sl {
+			iters, scanErr := t.scanSeries(series, request)
+			if scanErr != nil {
+				t.log.Error().Err(innerErr).
+					Str("topN", topNMetadata.GetName()).
+					Msg("fail to scan series")
+				return
+			}
+			for _, iter := range iters {
+				for iter.Next() {
+					tuple, parseErr := parseTopNFamily(iter.Val(), sourceMeasure.GetInterval())
+					if parseErr != nil {
+						t.log.Error().Err(parseErr).
+							Str("topN", topNMetadata.GetName()).
+							Msg("fail to parse topN family")
+						return
+					}
+					_ = aggregator.put(tuple.V1.(string), tuple.V2.(int64), iter.Val().Time())
+				}
+				_ = iter.Close()
+			}
+		}
+	}
+
+	now := time.Now().UnixNano()
+	resp = bus.NewMessage(bus.MessageID(now), aggregator.val())
+
+	return
+}
+
+func locateEntity(topNSchema *databasev1.TopNAggregation, conditions []*modelv1.Condition) (tsdb.Entity, error) {
+	entityMap := make(map[string]int)
+	entity := make([]tsdb.Entry, 1+len(topNSchema.GetGroupByTagNames()))
+	entity[0] = tsdb.AnyEntry
+	for idx, tagName := range topNSchema.GetGroupByTagNames() {
+		entityMap[tagName] = idx + 1
+		// fill AnyEntry by default
+		entity[idx+1] = tsdb.AnyEntry
+	}
+	for _, pairQuery := range conditions {
+		// TODO: check op?

Review Comment:
   Yep. Raise an error if it's not `EQ` and the tag is not in `group_by_tag_names`



##########
banyand/measure/encode.go:
##########
@@ -101,7 +101,7 @@ func (p *decoderPool) Put(decoder encoding.SeriesDecoder) {
 
 const fieldFlagLength = 9
 
-func encoderFieldFlag(fieldSpec *databasev1.FieldSpec, interval time.Duration) []byte {
+func EncoderFieldFlag(fieldSpec *databasev1.FieldSpec, interval time.Duration) []byte {

Review Comment:
   Please move it to `pkg/pb/v1`



##########
banyand/metadata/metadata.go:
##########
@@ -31,6 +31,12 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+// TopNFilter provides methods to find a specific topNAggregation which can be attached to the given resource.
+// Currently, the resource can only be Measure
+type TopNFilter interface {
+	TopNAggregations(ctx context.Context, source *commonv1.Metadata) ([]*databasev1.TopNAggregation, error)

Review Comment:
   Move it to `MeasureRegistry` to align the API's structure.



##########
banyand/measure/measure_write.go:
##########
@@ -196,7 +205,7 @@ func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte {
 	return nil
 }
 
-func decodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) *modelv1.FieldValue {
+func DecodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) *modelv1.FieldValue {

Review Comment:
   Could you move it to `pkg/pb/v1`?



##########
banyand/liaison/grpc/measure.go:
##########
@@ -111,4 +111,23 @@ func (ms *measureService) Query(_ context.Context, entityCriteria *measurev1.Que
 	return nil, ErrQueryMsg
 }
 
-// TODO: implement topN
+func (ms *measureService) TopN(_ context.Context, topNRequest *measurev1.TopNRequest) (*measurev1.TopNResponse, error) {
+	if err := timestamp.CheckTimeRange(topNRequest.GetTimeRange()); err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", topNRequest.GetTimeRange(), err)
+	}
+
+	message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), topNRequest)
+	feat, errQuery := ms.pipeline.Publish(data.TopicTopNQuery, message)
+	if errQuery != nil {
+		return nil, errQuery
+	}
+	msg, errFeat := feat.Get()
+	if errFeat != nil {
+		return nil, errFeat
+	}
+	queryResp, ok := msg.Data().([]*measurev1.TopNList)

Review Comment:
   Handle `common.Error` as `Query` does. It convices the client's debugging.



##########
banyand/measure/measure_topn.go:
##########
@@ -0,0 +1,681 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+	"context"
+	"encoding/base64"
+	"io"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+	"golang.org/x/exp/slices"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/flow"
+	"github.com/apache/skywalking-banyandb/pkg/flow/streaming"
+	"github.com/apache/skywalking-banyandb/pkg/flow/streaming/sources"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/partition"
+	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const (
+	timeBucketFormat = "200601021504"
+	TopNTagFamily    = "__topN__"
+)
+
+var (
+	_ bus.MessageListener = (*topNProcessCallback)(nil)
+	_ io.Closer           = (*topNStreamingProcessor)(nil)
+	_ io.Closer           = (*topNProcessorManager)(nil)
+	_ flow.Sink           = (*topNStreamingProcessor)(nil)
+
+	errUnsupportedConditionValueType = errors.New("unsupported value type in the condition")
+
+	TopNValueFieldSpec = &databasev1.FieldSpec{
+		Name:              "value",
+		FieldType:         databasev1.FieldType_FIELD_TYPE_INT,
+		EncodingMethod:    databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+		CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+	}
+)
+
+type topNStreamingProcessor struct {
+	flow.ComponentState
+	l                *logger.Logger
+	shardNum         uint32
+	interval         time.Duration
+	topNSchema       *databasev1.TopNAggregation
+	sortDirection    modelv1.Sort
+	databaseSupplier tsdb.Supplier
+	src              chan interface{}
+	in               chan flow.StreamRecord
+	errCh            <-chan error
+	stopCh           chan struct{}
+	streamingFlow    flow.Flow
+}
+
+func (t *topNStreamingProcessor) In() chan<- flow.StreamRecord {
+	return t.in
+}
+
+func (t *topNStreamingProcessor) Setup(ctx context.Context) error {
+	t.Add(1)
+	go t.run(ctx)
+	return nil
+}
+
+func (t *topNStreamingProcessor) run(ctx context.Context) {
+	defer t.Done()
+	for {
+		select {
+		case record, ok := <-t.in:
+			if !ok {
+				return
+			}
+			if err := t.writeStreamRecord(record); err != nil {
+				t.l.Err(err).Msg("fail to write stream record")
+			}
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
+// Teardown is called by the Flow as a lifecycle hook.
+// So we should not block on err channel within this method.
+func (t *topNStreamingProcessor) Teardown(ctx context.Context) error {
+	t.Wait()
+	return nil
+}
+
+func (t *topNStreamingProcessor) Close() error {
+	close(t.src)
+	// close streaming flow
+	err := t.streamingFlow.Close()
+	// and wait for error channel close
+	<-t.stopCh
+	t.stopCh = nil
+	return err
+}
+
+func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) error {
+	tuples, ok := record.Data().([]*streaming.Tuple2)
+	if !ok {
+		return errors.New("invalid data type")
+	}
+	// down-sample the start of the timeWindow to a time-bucket
+	eventTime := t.downSampleTimeBucket(record.TimestampMillis())
+	timeBucket := eventTime.Format(timeBucketFormat)
+	var err error
+	t.l.Warn().
+		Str("TopN", t.topNSchema.GetMetadata().GetName()).
+		Int("rankNums", len(tuples)).
+		Msg("Write a tuple")
+	for rankNum, tuple := range tuples {
+		fieldValue := tuple.V1.(int64)
+		data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
+		err = multierr.Append(err, t.writeData(eventTime, timeBucket, fieldValue, data, rankNum))
+	}
+	return err
+}
+
+func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket string, fieldValue int64, data flow.Data, rankNum int) error {
+	var tagValues []*modelv1.TagValue
+	if len(t.topNSchema.GetGroupByTagNames()) > 0 {
+		var ok bool
+		if tagValues, ok = data[2].([]*modelv1.TagValue); !ok {
+			return errors.New("fail to extract tag values from topN result")
+		}
+	}
+	entity, shardID, err := t.locate(tagValues, rankNum)
+	if err != nil {
+		return err
+	}
+	shard, err := t.databaseSupplier.SupplyTSDB().Shard(shardID)
+	if err != nil {
+		return err
+	}
+	series, err := shard.Series().GetByHashKey(tsdb.HashEntity(entity))
+	if err != nil {
+		return err
+	}
+	span, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(eventTime, 0))
+	if err != nil {
+		if span != nil {
+			_ = span.Close()
+		}
+		return err
+	}
+	// measureID is consist of three parts,
+	// 1. groupValues
+	// 2. rankNumber
+	// 3. timeBucket
+	measureID := data[0].(string) + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
+	writeFn := func() (tsdb.Writer, error) {
+		builder := span.WriterBuilder().Time(eventTime)
+		virtualTagFamily := &modelv1.TagFamilyForWrite{
+			Tags: []*modelv1.TagValue{
+				// MeasureID
+				{
+					Value: &modelv1.TagValue_Id{
+						Id: &modelv1.ID{
+							Value: measureID,
+						},
+					},
+				},
+				// GroupValues for merge in post processor
+				{
+					Value: &modelv1.TagValue_Str{
+						Str: &modelv1.Str{
+							Value: data[0].(string),
+						},
+					},
+				},
+			},
+		}
+		payload, errMarshal := proto.Marshal(virtualTagFamily)
+		if errMarshal != nil {
+			return nil, errMarshal
+		}
+		builder.Family(familyIdentity(TopNTagFamily, TagFlag), payload)
+		virtualFieldValue := &modelv1.FieldValue{
+			Value: &modelv1.FieldValue_Int{
+				Int: &modelv1.Int{
+					Value: fieldValue,
+				},
+			},
+		}
+		fieldData := encodeFieldValue(virtualFieldValue)
+		builder.Family(familyIdentity(TopNValueFieldSpec.GetName(), EncoderFieldFlag(TopNValueFieldSpec, t.interval)), fieldData)
+		writer, errWrite := builder.Build()
+		if errWrite != nil {
+			return nil, errWrite
+		}
+		_, errWrite = writer.Write()
+		t.l.Debug().
+			Time("ts", eventTime).
+			Int("ts_nano", eventTime.Nanosecond()).
+			Uint64("series_id", uint64(series.ID())).
+			Uint64("item_id", uint64(writer.ItemID().ID)).
+			Int("shard_id", int(shardID)).
+			Msg("write measure")
+		return writer, errWrite
+	}
+	_, err = writeFn()
+	if err != nil {
+		_ = span.Close()
+		return err
+	}
+	return span.Close()
+}
+
+func (t *topNStreamingProcessor) downSampleTimeBucket(eventTimeMillis int64) time.Time {
+	return time.UnixMilli(eventTimeMillis - eventTimeMillis%t.interval.Milliseconds())
+}
+
+func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum int) (tsdb.Entity, common.ShardID, error) {
+	if len(t.topNSchema.GetGroupByTagNames()) != len(tagValues) {
+		return nil, 0, errors.New("no enough tag values for the entity")
+	}
+	entity := make(tsdb.Entity, 1+1+len(t.topNSchema.GetGroupByTagNames()))
+	// entity prefix
+	entity[0] = []byte(formatMeasureCompanionPrefix(t.topNSchema.GetSourceMeasure().GetName(),
+		t.topNSchema.GetMetadata().GetName()))
+	entity[1] = convert.Int64ToBytes(int64(rankNum))
+	// measureID as sharding key
+	for idx, tagVal := range tagValues {
+		var innerErr error
+		entity[idx+2], innerErr = pbv1.MarshalIndexFieldValue(tagVal)
+		if innerErr != nil {
+			return nil, 0, innerErr
+		}
+	}
+	id, err := partition.ShardID(entity.Marshal(), t.shardNum)
+	if err != nil {
+		return nil, 0, err
+	}
+	return entity, common.ShardID(id), nil
+}
+
+func (t *topNStreamingProcessor) start() *topNStreamingProcessor {
+	t.errCh = t.streamingFlow.Window(streaming.NewTumblingTimeWindows(t.interval)).
+		AllowedMaxWindows(int(t.topNSchema.GetLruSize())).
+		TopN(int(t.topNSchema.GetCountersNumber()),
+			streaming.WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
+				return record.Data().(flow.Data)[1].(int64)
+			}),
+			OrderBy(t.topNSchema.GetFieldValueSort()),
+		).To(t).Open()
+	go t.handleError()
+	return t
+}
+
+func OrderBy(sort modelv1.Sort) streaming.TopNOption {
+	if sort == modelv1.Sort_SORT_ASC {
+		return streaming.OrderBy(streaming.ASC)
+	}
+	return streaming.OrderBy(streaming.DESC)
+}
+
+func (t *topNStreamingProcessor) handleError() {
+	for err := range t.errCh {
+		t.l.Err(err).Str("topN", t.topNSchema.GetMetadata().GetName()).
+			Msg("error occurred during flow setup or process")
+	}
+	t.stopCh <- struct{}{}
+}
+
+// topNProcessorManager manages multiple topNStreamingProcessor(s) belonging to a single measure
+type topNProcessorManager struct {
+	// RWMutex here is to protect the processorMap from data race, i.e.
+	// the send operation to the underlying channel vs. the close of the channel
+	// TODO: this can be optimized if the bus Listener can be synchronously finished,
+	sync.RWMutex
+	l            *logger.Logger
+	m            *measure
+	topNSchemas  []*databasev1.TopNAggregation
+	processorMap map[*commonv1.Metadata][]*topNStreamingProcessor
+}
+
+func (manager *topNProcessorManager) Close() error {
+	manager.Lock()
+	defer manager.Unlock()
+	var err error
+	for _, processorList := range manager.processorMap {
+		for _, processor := range processorList {
+			err = multierr.Append(err, processor.Close())
+		}
+	}
+	return err
+}
+
+func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.WriteRequest) error {
+	manager.RLock()
+	defer manager.RUnlock()
+	for _, processorList := range manager.processorMap {
+		for _, processor := range processorList {
+			processor.src <- flow.NewStreamRecordWithTimestampPb(request.GetDataPoint(), request.GetDataPoint().GetTimestamp())
+		}
+	}
+
+	return nil
+}
+
+func (manager *topNProcessorManager) start() error {
+	interval := manager.m.interval
+	for _, topNSchema := range manager.topNSchemas {
+		sortDirections := make([]modelv1.Sort, 0, 2)
+		if topNSchema.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
+			sortDirections = append(sortDirections, modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_DESC)
+		} else {
+			sortDirections = append(sortDirections, topNSchema.GetFieldValueSort())
+		}
+
+		processorList := make([]*topNStreamingProcessor, len(sortDirections))
+		for i, sortDirection := range sortDirections {
+			srcCh := make(chan interface{})
+			src, _ := sources.NewChannel(srcCh)
+			streamingFlow := streaming.New(src)
+
+			filters, buildErr := manager.buildFilter(topNSchema.GetCriteria())
+			if buildErr != nil {
+				return buildErr
+			}
+			streamingFlow = streamingFlow.Filter(filters)
+
+			mapper, innerErr := manager.buildMapper(topNSchema.GetFieldName(), topNSchema.GetGroupByTagNames()...)
+			if innerErr != nil {
+				return innerErr
+			}
+			streamingFlow = streamingFlow.Map(mapper)
+
+			processor := &topNStreamingProcessor{
+				l:                manager.l,
+				shardNum:         manager.m.shardNum,
+				interval:         interval,
+				topNSchema:       topNSchema,
+				sortDirection:    sortDirection,
+				databaseSupplier: manager.m.databaseSupplier,
+				src:              srcCh,
+				in:               make(chan flow.StreamRecord),
+				stopCh:           make(chan struct{}),
+				streamingFlow:    streamingFlow,
+			}
+			processorList[i] = processor.start()
+		}
+
+		manager.processorMap[topNSchema.GetSourceMeasure()] = processorList
+	}
+
+	return nil
+}
+
+func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) (flow.UnaryFunc[bool], error) {
+	// if criteria is nil, we handle all incoming elements
+	if criteria == nil {
+		return func(_ context.Context, dataPoint any) bool {
+			return true
+		}, nil
+	}
+
+	f, err := manager.buildFilterForCriteria(criteria)

Review Comment:
   `pkg/query/logical/tag_filter` provided a enhanced filter. Please use it simplify the manager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org