You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/19 12:46:25 UTC
[skywalking-banyandb] branch main updated: Tweak log default settings (#219)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new ed752c4 Tweak log default settings (#219)
ed752c4 is described below
commit ed752c470648d4fcbb507cf38f5060e841a8e621
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Nov 19 20:46:19 2022 +0800
Tweak log default settings (#219)
---
banyand/internal/cmd/standalone.go | 2 +-
banyand/kv/kv.go | 2 -
banyand/liaison/grpc/discovery.go | 4 +-
banyand/liaison/grpc/measure.go | 9 +-
banyand/liaison/grpc/stream.go | 8 +-
banyand/measure/measure_query.go | 7 +-
banyand/query/processor.go | 9 +-
banyand/query/processor_topn.go | 7 +-
banyand/stream/stream_query.go | 2 +-
banyand/tsdb/index/writer.go | 2 +-
dist/LICENSE | 7 +-
dist/licenses/license-github.com-blugelabs-ice.txt | 202 ---------------------
...ense-github.com-zinclabs-bluge-segment-api.txt} | 0
...e.txt => license-github.com-zinclabs-bluge.txt} | 0
...-v2.txt => license-github.com-zinclabs-ice.txt} | 0
go.mod | 4 +-
go.sum | 21 +--
pkg/encoding/int.go | 13 +-
pkg/encoding/int_test.go | 99 +++++++++-
pkg/encoding/plain.go | 2 +-
pkg/grpchelper/client.go | 8 +-
.../helpers/http_health.go => logger/jsonproto.go} | 38 ++--
pkg/logger/setting.go | 4 +-
pkg/pb/v1/write.go | 25 ++-
pkg/query/logical/common.go | 2 +-
.../measure/measure_plan_indexscan_local.go | 6 +-
pkg/schema/metadata.go | 4 +-
pkg/test/helpers/grpc_health.go | 8 +-
pkg/test/helpers/http_health.go | 6 +-
test/cases/measure/data/input/linked_or.yaml | 56 ++++++
test/cases/measure/data/want/linked_or.yaml | 60 ++++++
test/cases/measure/measure.go | 1 +
test/stress/env | 14 +-
test/stress/env.dev | 13 +-
34 files changed, 324 insertions(+), 321 deletions(-)
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 02f625c..440c8c9 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -114,7 +114,7 @@ func newStandaloneCmd() *cobra.Command {
},
}
- standaloneCmd.Flags().StringVarP(&logging.Env, "logging.env", "", "dev", "the logging")
+ standaloneCmd.Flags().StringVarP(&logging.Env, "logging.env", "", "prod", "the logging")
standaloneCmd.Flags().StringVarP(&logging.Level, "logging.level", "", "info", "the level of logging")
standaloneCmd.Flags().AddFlagSet(g.RegisterFlags().FlagSet)
return standaloneCmd
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 1f3cbef..90944a2 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -80,8 +80,6 @@ type TimeSeriesWriter interface {
type TimeSeriesReader interface {
// Get a value by its key and timestamp/version
Get(key []byte, ts uint64) ([]byte, error)
- // GetAll values with an identical key
- GetAll(key []byte) ([][]byte, error)
}
// TimeSeriesStore is time series storage
diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go
index b78cdd5..98cba84 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -88,7 +88,7 @@ func (s *shardRepo) Rev(message bus.Message) (resp bus.Message) {
return
}
s.setShardNum(e)
- s.log.Info().
+ s.log.Debug().
Str("action", databasev1.Action_name[int32(e.Action)]).
Uint64("shardID", e.Shard.Id).
Msg("received a shard e")
@@ -136,7 +136,7 @@ func (s *entityRepo) Rev(message bus.Message) (resp bus.Message) {
return
}
id := getID(e.GetSubject())
- s.log.Info().
+ s.log.Debug().
Str("action", databasev1.Action_name[int32(e.Action)]).
Interface("subject", id).
Msg("received an entity event")
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index ac6a476..8d23d87 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -23,6 +23,7 @@ import (
"time"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -31,6 +32,7 @@ import (
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -46,6 +48,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
}
return nil
}
+ sampled := ms.log.Sample(&zerolog.BasicSampler{N: 10})
for {
writeRequest, err := measure.Recv()
if err == io.EOF {
@@ -55,7 +58,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
return err
}
if errTime := timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil {
- ms.log.Error().Err(errTime).Msg("the data point time is invalid")
+ sampled.Error().Err(errTime).RawJSON("written", logger.Proto(writeRequest)).Msg("the data point time is invalid")
if errResp := reply(); errResp != nil {
return errResp
}
@@ -63,7 +66,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
}
entity, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
if err != nil {
- ms.log.Error().Err(err).Msg("failed to navigate to the write target")
+ sampled.Error().Err(err).Msg("failed to navigate to the write target")
if errResp := reply(); errResp != nil {
return errResp
}
@@ -76,7 +79,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
})
_, errWritePub := ms.pipeline.Publish(data.TopicMeasureWrite, message)
if errWritePub != nil {
- ms.log.Error().Err(errWritePub).Msg("failed to send a message")
+ sampled.Error().Err(errWritePub).Msg("failed to send a message")
if errResp := reply(); errResp != nil {
return errResp
}
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 8cf6e1b..5a3ab2d 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -23,6 +23,7 @@ import (
"time"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -46,6 +47,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
}
return nil
}
+ sampled := s.log.Sample(&zerolog.BasicSampler{N: 10})
for {
writeEntity, err := stream.Recv()
if err == io.EOF {
@@ -55,7 +57,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
return err
}
if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
- s.log.Error().Err(errTime).Msg("the element time is invalid")
+ sampled.Error().Stringer("written", writeEntity).Err(errTime).Msg("the element time is invalid")
if errResp := reply(); errResp != nil {
return errResp
}
@@ -63,7 +65,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
}
entity, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
if err != nil {
- s.log.Error().Err(err).Msg("failed to navigate to the write target")
+ sampled.Error().Err(err).Msg("failed to navigate to the write target")
if errResp := reply(); errResp != nil {
return errResp
}
@@ -76,7 +78,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
})
_, errWritePub := s.pipeline.Publish(data.TopicStreamWrite, message)
if errWritePub != nil {
- s.log.Error().Err(errWritePub).Msg("failed to send a message")
+ sampled.Error().Err(errWritePub).Msg("failed to send a message")
if errResp := reply(); errResp != nil {
return errResp
}
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index ceefa42..89f1b13 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -116,7 +116,7 @@ func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
familyRawBytes, err := item.Family(familyIdentity(family, pbv1.TagFlag))
if err != nil {
- return nil, err
+ return nil, errors.Wrapf(err, "measure %s.%s parse family %s", s.name, s.group, family)
}
tagFamily := &modelv1.TagFamilyForWrite{}
err = proto.Unmarshal(familyRawBytes, tagFamily)
@@ -159,7 +159,10 @@ func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_
if err != nil {
return nil, err
}
- fieldValue := pbv1.DecodeFieldValue(bytes, fieldSpec)
+ fieldValue, err := pbv1.DecodeFieldValue(bytes, fieldSpec)
+ if err != nil {
+ return nil, err
+ }
return &measurev1.DataPoint_Field{
Name: name,
Value: fieldValue,
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index cf9ac2e..d3668c9 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -72,7 +72,8 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("invalid event data type"))
return
}
- p.log.Debug().Stringer("criteria", queryCriteria).Msg("received a query request")
+ queryJSON := logger.Proto(queryCriteria)
+ p.log.Debug().RawJSON("criteria", queryJSON).Msg("received a query request")
meta := queryCriteria.GetMetadata()
ec, err := p.streamService.Stream(meta)
@@ -103,6 +104,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
entities, err := plan.(executor.StreamExecutable).Execute(ec)
if err != nil {
+ p.log.Error().Err(err).RawJSON("req", queryJSON).Msg("fail to execute the query plan")
resp = bus.NewMessage(bus.MessageID(now), common.NewError("execute the query plan for stream %s: %v", meta.GetName(), err))
return
}
@@ -124,7 +126,8 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("invalid event data type"))
return
}
- p.log.Debug().Msg("received a query event")
+ queryJSON := logger.Proto(queryCriteria)
+ p.log.Info().RawJSON("req", queryJSON).Msg("received a query event")
meta := queryCriteria.GetMetadata()
ec, err := p.measureService.Measure(meta)
@@ -160,7 +163,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
}
defer func() {
if err = mIterator.Close(); err != nil {
- p.queryService.log.Error().Err(err).Msg("fail to close the query plan")
+ p.queryService.log.Error().Err(err).RawJSON("req", queryJSON).Msg("fail to close the query plan")
}
}()
result := make([]*measurev1.DataPoint, 0)
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 021c7eb..805937e 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -57,7 +57,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
t.log.Warn().Msg("invalid requested sort direction")
return
}
- t.log.Info().Msg("received a topN query event")
+ t.log.Debug().Msg("received a topN query event")
topNMetadata := request.GetMetadata()
topNSchema, err := t.metaService.TopNAggregationRegistry().GetTopNAggregation(context.TODO(), topNMetadata)
if err != nil {
@@ -182,7 +182,10 @@ func parseTopNFamily(item tsdb.Item, interval time.Duration) (*streaming.Tuple2,
if err != nil {
return nil, err
}
- fieldValue := pbv1.DecodeFieldValue(fieldBytes, measure.TopNValueFieldSpec)
+ fieldValue, err := pbv1.DecodeFieldValue(fieldBytes, measure.TopNValueFieldSpec)
+ if err != nil {
+ return nil, err
+ }
return &streaming.Tuple2{
// GroupValues
V1: tagFamily.GetTags()[1].GetStr().GetValue(),
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 0463c2f..218fc6c 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -88,7 +88,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
familyRawBytes, err := item.Family(tsdb.Hash([]byte(family)))
if err != nil {
- return nil, errors.Wrapf(err, "parse family %s", family)
+ return nil, errors.Wrapf(err, "stream %s.%s parse family %s", s.name, s.group, family)
}
tagFamily := &modelv1.TagFamilyForWrite{}
err = proto.Unmarshal(familyRawBytes, tagFamily)
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 5e02ebc..1dfd30b 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -96,7 +96,7 @@ func NewWriter(ctx context.Context, options WriterOptions) *Writer {
key = key | local
case databasev1.IndexRule_LOCATION_GLOBAL:
if !w.enableGlobalIndex {
- w.l.Warn().Stringer("index-rule", ruleIndex.Rule).Msg("global index is disabled")
+ w.l.Warn().RawJSON("index-rule", logger.Proto(ruleIndex.Rule)).Msg("global index is disabled")
continue
}
key = key | global
diff --git a/dist/LICENSE b/dist/LICENSE
index e70ffd8..9d60250 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -181,10 +181,6 @@ Apache-2.0 licenses
github.com/SkyAPM/badger/v3 v3.0.0-20220817114744-b3711444d876 Apache-2.0
github.com/blevesearch/segment v0.9.0 Apache-2.0
github.com/blevesearch/vellum v1.0.7 Apache-2.0
- github.com/blugelabs/bluge v0.2.2 Apache-2.0
- github.com/blugelabs/bluge_segment_api v0.2.0 Apache-2.0
- github.com/blugelabs/ice v1.0.0 Apache-2.0
- github.com/blugelabs/ice/v2 v2.0.1 Apache-2.0
github.com/coreos/go-semver v0.3.0 Apache-2.0
github.com/coreos/go-systemd/v22 v22.3.2 Apache-2.0
github.com/dgraph-io/ristretto v0.1.0 Apache-2.0
@@ -210,6 +206,9 @@ Apache-2.0 licenses
github.com/soheilhy/cmux v0.1.5 Apache-2.0
github.com/spf13/afero v1.8.2 Apache-2.0
github.com/spf13/cobra v1.4.0 Apache-2.0
+ github.com/zinclabs/bluge v1.1.5 Apache-2.0
+ github.com/zinclabs/bluge_segment_api v1.0.0 Apache-2.0
+ github.com/zinclabs/ice v1.1.3 Apache-2.0
go.etcd.io/etcd/api/v3 v3.5.4 Apache-2.0
go.etcd.io/etcd/client/pkg/v3 v3.5.4 Apache-2.0
go.etcd.io/etcd/client/v2 v2.305.4 Apache-2.0
diff --git a/dist/licenses/license-github.com-blugelabs-ice.txt b/dist/licenses/license-github.com-blugelabs-ice.txt
deleted file mode 100644
index 7a4a3ea..0000000
--- a/dist/licenses/license-github.com-blugelabs-ice.txt
+++ /dev/null
@@ -1,202 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
\ No newline at end of file
diff --git a/dist/licenses/license-github.com-blugelabs-bluge-segment-api.txt b/dist/licenses/license-github.com-zinclabs-bluge-segment-api.txt
similarity index 100%
rename from dist/licenses/license-github.com-blugelabs-bluge-segment-api.txt
rename to dist/licenses/license-github.com-zinclabs-bluge-segment-api.txt
diff --git a/dist/licenses/license-github.com-blugelabs-bluge.txt b/dist/licenses/license-github.com-zinclabs-bluge.txt
similarity index 100%
rename from dist/licenses/license-github.com-blugelabs-bluge.txt
rename to dist/licenses/license-github.com-zinclabs-bluge.txt
diff --git a/dist/licenses/license-github.com-blugelabs-ice-v2.txt b/dist/licenses/license-github.com-zinclabs-ice.txt
similarity index 100%
rename from dist/licenses/license-github.com-blugelabs-ice-v2.txt
rename to dist/licenses/license-github.com-zinclabs-ice.txt
diff --git a/go.mod b/go.mod
index f86f035..a690871 100644
--- a/go.mod
+++ b/go.mod
@@ -54,7 +54,6 @@ require (
github.com/blevesearch/vellum v1.0.7 // indirect
github.com/blugelabs/bluge_segment_api v0.2.0 // indirect
github.com/blugelabs/ice v1.0.0 // indirect
- github.com/blugelabs/ice/v2 v2.0.1 // indirect
github.com/caio/go-tdigest v3.1.0+incompatible // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
@@ -132,5 +131,8 @@ require (
replace (
github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97
+ github.com/blugelabs/bluge => github.com/zinclabs/bluge v1.1.5
+ github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v1.0.0
+ github.com/blugelabs/ice => github.com/zinclabs/ice v1.1.3
github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220817114744-b3711444d876
)
diff --git a/go.sum b/go.sum
index 459669d..41f517d 100644
--- a/go.sum
+++ b/go.sum
@@ -44,9 +44,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
-github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eWBKuPVSXZIpsaMwCI=
-github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
-github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
github.com/SkyAPM/badger/v3 v3.0.0-20220817114744-b3711444d876 h1:zH//2cmDpBla7rL9NzWr+vZ2UskMrvdnUvslz3KYTN0=
@@ -77,7 +74,6 @@ github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edY
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+DdXTAn4YRcHCo=
github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M=
-github.com/blevesearch/mmap-go v1.0.2/go.mod h1:ol2qBqYaOUsGdm7aRMRrYGgPvnwLe6Y+7LMvAB5IbSA=
github.com/blevesearch/mmap-go v1.0.3/go.mod h1:pYvKl/grLQrBxuaRYgoTssa4rVujYYeenDp++2E+yvs=
github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc=
github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs=
@@ -85,17 +81,8 @@ github.com/blevesearch/segment v0.9.0 h1:5lG7yBCx98or7gK2cHMKPukPZ/31Kag7nONpoBt
github.com/blevesearch/segment v0.9.0/go.mod h1:9PfHYUdQCgHktBgvtUOF4x+pc4/l8rdH0u5spnW85UQ=
github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s=
github.com/blevesearch/snowballstem v0.9.0/go.mod h1:PivSj3JMc8WuaFkTSRDW2SlrulNWPl4ABg1tC/hlgLs=
-github.com/blevesearch/vellum v1.0.5/go.mod h1:atE0EH3fvk43zzS7t1YNdNC7DbmcC3uz+eMD5xZ2OyQ=
github.com/blevesearch/vellum v1.0.7 h1:+vn8rfyCRHxKVRgDLeR0FAXej2+6mEb5Q15aQE/XESQ=
github.com/blevesearch/vellum v1.0.7/go.mod h1:doBZpmRhwTsASB4QdUZANlJvqVAUdUyX0ZK7QJCTeBE=
-github.com/blugelabs/bluge v0.2.2 h1:gat8CqE6P6tOgeX30XGLOVNTC26cpM2RWVcreXWtYcM=
-github.com/blugelabs/bluge v0.2.2/go.mod h1:am1LU9jS8dZgWkRzkGLQN3757EgMs3upWrU2fdN9foE=
-github.com/blugelabs/bluge_segment_api v0.2.0 h1:cCX1Y2y8v0LZ7+EEJ6gH7dW6TtVTW4RhG0vp3R+N2Lo=
-github.com/blugelabs/bluge_segment_api v0.2.0/go.mod h1:95XA+ZXfRj/IXADm7gZ+iTcWOJPg5jQTY1EReIzl3LA=
-github.com/blugelabs/ice v1.0.0 h1:um7wf9e6jbkTVCrOyQq3tKK43fBMOvLUYxbj3Qtc4eo=
-github.com/blugelabs/ice v1.0.0/go.mod h1:gNfFPk5zM+yxJROhthxhVQYjpBO9amuxWXJQ2Lo+IbQ=
-github.com/blugelabs/ice/v2 v2.0.1 h1:mzHbntLjk2v7eDRgoXCgzOsPKN1Tenu9Svo6l9cTLS4=
-github.com/blugelabs/ice/v2 v2.0.1/go.mod h1:QxAWSPNwZwsIqS25c3lbIPFQrVvT1sphf5x5DfMLH5M=
github.com/caio/go-tdigest v3.1.0+incompatible h1:uoVMJ3Q5lXmVLCCqaMGHLBWnbGoN6Lpu7OAUPR60cds=
github.com/caio/go-tdigest v3.1.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@@ -232,7 +219,6 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
-github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -510,6 +496,12 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04 h1:qXafrlZL1WsJW5OokjraLLRURHiw0OzKHD/RNdspp4w=
github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04/go.mod h1:FiwNQxz6hGoNFBC4nIx+CxZhI3nne5RmIOlT/MXcSD4=
+github.com/zinclabs/bluge v1.1.5 h1:QJhkweeBVRaaEPdaRptkYOJDLCeyo+JBgc2hNyFehAM=
+github.com/zinclabs/bluge v1.1.5/go.mod h1:IG9JlDUzUGRIxylWmLyF7e1QwMdRWu9FkNSivJ4VB+E=
+github.com/zinclabs/bluge_segment_api v1.0.0 h1:GJvPxdzR7KjwdxmcKleQLvtIYi/J7Q7ehRlZqgGayzg=
+github.com/zinclabs/bluge_segment_api v1.0.0/go.mod h1:mYfPVUdXLZ4iXsicXMER+RcI/avwphjMOi8nhN9HDLA=
+github.com/zinclabs/ice v1.1.3 h1:LNfncdxQw2ix6P1T2ISmhO+6BFRa27qyTTfK0PitF2c=
+github.com/zinclabs/ice v1.1.3/go.mod h1:wTwGEe30mQnSLaR1ezxu4E80GcwO6EyOww67KpJtIiw=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
@@ -702,7 +694,6 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index df6ce8e..c9e1991 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -74,10 +74,10 @@ func (b *intEncoderPoolDelegator) Put(encoder SeriesEncoder) {
}
type intDecoderPoolDelegator struct {
- name string
pool *sync.Pool
- size int
fn ParseInterval
+ name string
+ size int
}
func NewIntDecoderPool(name string, size int, fn ParseInterval) SeriesDecoderPool {
@@ -216,7 +216,7 @@ func (i intDecoder) Get(ts uint64) ([]byte, error) {
return iter.Val(), nil
}
}
- return nil, nil
+ return zeroBytes, nil
}
func (i intDecoder) Iterator() SeriesIterator {
@@ -231,8 +231,9 @@ func (i intDecoder) Iterator() SeriesIterator {
}
var (
- _ SeriesIterator = (*intIterator)(nil)
- zero = convert.BytesToUint64(convert.Int64ToBytes(0))
+ _ SeriesIterator = (*intIterator)(nil)
+ zeroBytes = convert.Int64ToBytes(0)
+ Zero = convert.BytesToUint64(zeroBytes)
)
type intIterator struct {
@@ -265,7 +266,7 @@ func (i *intIterator) Next() bool {
i.currVal = i.values.Value()
}
} else {
- i.currVal = zero
+ i.currVal = Zero
}
i.currTime = i.startTime + uint64(i.interval*i.index)
i.index++
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
index e5ffd4e..657bbe5 100644
--- a/pkg/encoding/int_test.go
+++ b/pkg/encoding/int_test.go
@@ -86,8 +86,8 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
assert.Equal(t, key, k)
return 1 * time.Minute
}
- encoderPool := NewIntEncoderPool("minute", 3, fn)
- decoderPool := NewIntDecoderPool("minute", 3, fn)
+ encoderPool := NewIntEncoderPool("minute", 4, fn)
+ decoderPool := NewIntDecoderPool("minute", 4, fn)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -106,17 +106,102 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
at.NoError(decoder.Decode(key, bb))
at.True(decoder.IsFull())
iter := decoder.Iterator()
- i := 0
- for ; iter.Next(); i++ {
+ for i, t := range tt.want.ts {
+ at.True(iter.Next())
at.NoError(iter.Error())
at.Equal(tt.want.ts[i], iter.Time())
at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val()))
- v, err := decoder.Get(tt.want.ts[i])
+ v, err := decoder.Get(t)
at.NoError(err)
at.Equal(tt.want.data[i], convert.BytesToInt64(v))
}
- if i == 0 {
- at.Fail("empty data")
+ })
+ }
+}
+
+func TestNewIntDecoderGet(t *testing.T) {
+ type tsData struct {
+ ts []uint64
+ data []int64
+ }
+ tests := []struct {
+ name string
+ args tsData
+ want tsData
+ }{
+ {
+ name: "golden path",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ },
+ },
+ {
+ name: "more than the size",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 8, 7, 9, 6},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(5 * time.Minute)},
+ data: []int64{7, 8, 7, 9, 0},
+ },
+ },
+ {
+ name: "less than the size",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+ data: []int64{7, 8, 7},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+ data: []int64{7, 8, 7},
+ },
+ },
+ {
+ name: "empty slot in the middle",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 9},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 0, 0, 9},
+ },
+ },
+ }
+ key := []byte("foo")
+ fn := func(k []byte) time.Duration {
+ assert.Equal(t, key, k)
+ return 1 * time.Minute
+ }
+ encoderPool := NewIntEncoderPool("minute", 4, fn)
+ decoderPool := NewIntDecoderPool("minute", 4, fn)
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ at := assert.New(t)
+ encoder := encoderPool.Get(key)
+ decoder := decoderPool.Get(key)
+ encoder.Reset(key)
+ for i, v := range tt.args.ts {
+ encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
+ if encoder.IsFull() {
+ break
+ }
+ }
+ bb, err := encoder.Encode()
+ at.NoError(err)
+ at.NoError(decoder.Decode(key, bb))
+ at.True(decoder.IsFull())
+ for i, t := range tt.want.ts {
+ v, err := decoder.Get(t)
+ at.NoError(err)
+ at.Equal(tt.want.data[i], convert.BytesToInt64(v))
}
})
}
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index 5c0d6f5..2284273 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -237,7 +237,7 @@ func (t *plainDecoder) Get(ts uint64) ([]byte, error) {
}
slot := getTSSlot(t.ts, i)
if parseTS(slot) != ts {
- return nil, fmt.Errorf("%d doesn't exist", ts)
+ return nil, fmt.Errorf("%d is wrong", ts)
}
return getVal(t.val, parseOffset(slot))
}
diff --git a/pkg/grpchelper/client.go b/pkg/grpchelper/client.go
index 3a9181e..e73e356 100644
--- a/pkg/grpchelper/client.go
+++ b/pkg/grpchelper/client.go
@@ -28,13 +28,12 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
)
-var l = logger.GetLogger()
-
func Conn(addr string, connTimeout time.Duration, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
defaultOpts := []grpc.DialOption{
grpc.WithBlock(),
}
opts = append(opts, defaultOpts...)
+ l := logger.GetLogger("grpc-helper")
connStart := time.Now()
dialCtx, dialCancel := context.WithTimeout(context.Background(), connTimeout)
@@ -49,7 +48,7 @@ func Conn(addr string, connTimeout time.Duration, opts ...grpc.DialOption) (*grp
return nil, err
}
connDuration := time.Since(connStart)
- l.Info().Dur("conn", connDuration).Msg("time elapsed")
+ l.Debug().Dur("conn", connDuration).Msg("time elapsed")
return conn, nil
}
@@ -58,6 +57,7 @@ func Request(ctx context.Context, rpcTimeout time.Duration, fn func(rpcCtx conte
rpcCtx, rpcCancel := context.WithTimeout(ctx, rpcTimeout)
defer rpcCancel()
rpcCtx = metadata.NewOutgoingContext(rpcCtx, make(metadata.MD))
+ l := logger.GetLogger("grpc-helper")
err := fn(rpcCtx)
if err != nil {
@@ -71,6 +71,6 @@ func Request(ctx context.Context, rpcTimeout time.Duration, fn func(rpcCtx conte
return err
}
rpcDuration := time.Since(rpcStart)
- l.Info().Dur("rpc", rpcDuration).Msg("time elapsed")
+ l.Debug().Dur("rpc", rpcDuration).Msg("time elapsed")
return nil
}
diff --git a/pkg/test/helpers/http_health.go b/pkg/logger/jsonproto.go
similarity index 54%
copy from pkg/test/helpers/http_health.go
copy to pkg/logger/jsonproto.go
index abd33d6..9c87037 100644
--- a/pkg/test/helpers/http_health.go
+++ b/pkg/logger/jsonproto.go
@@ -6,7 +6,7 @@
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
-// http://www.apache.org/licenses/LICENSE-2.0
+// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
@@ -14,34 +14,24 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package helpers
+
+package logger
import (
- "fmt"
- "time"
+ "encoding/json"
- "github.com/go-resty/resty/v2"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/proto"
)
-func HTTPHealthCheck(addr string) func() error {
- return func() error {
- client := resty.New()
-
- resp, err := client.R().
- SetHeader("Accept", "application/json").
- Get(fmt.Sprintf("http://%s/api/healthz", addr))
- if err != nil {
- time.Sleep(1 * time.Second)
- return err
- }
+type MarshalError struct {
+ Msg string `json:"msg"`
+}
- if resp.StatusCode() != 200 {
- l.Warn().Str("responded_status", resp.Status()).Msg("service unhealthy")
- time.Sleep(1 * time.Second)
- return ErrServiceUnhealthy
- }
- l.Info().Stringer("response", resp).Msg("connected")
- time.Sleep(500 * time.Millisecond)
- return nil
+func Proto(message proto.Message) []byte {
+ b, err := protojson.Marshal(message)
+ if err != nil {
+ b, _ = json.Marshal(MarshalError{Msg: err.Error()})
}
+ return b
}
diff --git a/pkg/logger/setting.go b/pkg/logger/setting.go
index 9d65e5e..40739b3 100644
--- a/pkg/logger/setting.go
+++ b/pkg/logger/setting.go
@@ -50,7 +50,7 @@ func (rl *rootLogger) setDefault() {
defer atomic.StoreUint32(&rl.done, 1)
var err error
rl.l, err = getLogger(Logging{
- Env: "dev",
+ Env: "prod",
Level: "debug",
})
if err != nil {
@@ -114,7 +114,7 @@ func getLogger(cfg Logging) (*Logger, error) {
}
w = io.Writer(cw)
default:
- w = os.Stderr
+ w = os.Stdout
}
l := zerolog.New(w).Level(lvl).With().Timestamp().Logger()
return &Logger{module: "root", Logger: &l}, nil
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index c8b6a9f..f56a68b 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -19,6 +19,7 @@ package v1
import (
"bytes"
+ "encoding/hex"
"time"
"github.com/pkg/errors"
@@ -31,12 +32,15 @@ import (
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
)
type ID string
const fieldFlagLength = 9
+var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(encoding.Zero)}}}
+
var (
strDelimiter = []byte("\n")
NullTag = &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
@@ -45,7 +49,7 @@ var (
ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value")
ErrNullValue = errors.New("the tag value is null")
ErrMalformedElement = errors.New("element is malformed")
- ErrMalformedFieldFlag = errors.New("field flag is malformed")
+ ErrMalformedField = errors.New("field is malformed")
)
func MarshalIndexFieldValue(tagValue *modelv1.TagValue) ([]byte, error) {
@@ -364,16 +368,23 @@ func EncodeFamily(familySpec *databasev1.TagFamilySpec, family *modelv1.TagFamil
return proto.Marshal(data)
}
-func DecodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) *modelv1.FieldValue {
+func DecodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) (*modelv1.FieldValue, error) {
switch fieldSpec.GetFieldType() {
case databasev1.FieldType_FIELD_TYPE_STRING:
- return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: string(fieldValue)}}}
+ return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: string(fieldValue)}}}, nil
case databasev1.FieldType_FIELD_TYPE_INT:
- return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: convert.BytesToInt64(fieldValue)}}}
+ if len(fieldValue) == 0 {
+ return zeroFieldValue, nil
+ }
+ if len(fieldValue) != 8 {
+ return nil, errors.WithMessagef(ErrMalformedField, "the length of encoded field value(int64) %s is %d, less than 8",
+ hex.EncodeToString(fieldValue), len(fieldValue))
+ }
+ return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: convert.BytesToInt64(fieldValue)}}}, nil
case databasev1.FieldType_FIELD_TYPE_DATA_BINARY:
- return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: fieldValue}}
+ return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: fieldValue}}, nil
}
- return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}
+ return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}, nil
}
func EncoderFieldFlag(fieldSpec *databasev1.FieldSpec, interval time.Duration) []byte {
@@ -387,7 +398,7 @@ func EncoderFieldFlag(fieldSpec *databasev1.FieldSpec, interval time.Duration) [
func DecodeFieldFlag(key []byte) (*databasev1.FieldSpec, time.Duration, error) {
if len(key) < fieldFlagLength {
- return nil, 0, ErrMalformedFieldFlag
+ return nil, 0, errors.WithMessagef(ErrMalformedField, "flag %s is invalid", hex.EncodeToString(key))
}
b := key[len(key)-9:]
return &databasev1.FieldSpec{
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 716be97..2cd6850 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -74,7 +74,7 @@ func ProjectItem(ec executor.ExecutionContext, item tsdb.Item, projectionFieldRe
familyName := refs[0].Tag.GetFamilyName()
parsedTagFamily, err := ec.ParseTagFamily(familyName, item)
if err != nil {
- return nil, err
+ return nil, errors.WithMessage(err, "parse projection")
}
if len(refs) > len(parsedTagFamily.Tags) {
return nil, errors.Wrapf(ErrInvalidData,
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index 69df5b3..d7722d4 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -217,10 +217,8 @@ func (ism *indexScanIterator) Next() bool {
}
nextItem := ism.inner.Next()
var err error
- ism.current, err = transform(nextItem, ism.context)
- if err != nil {
- ism.err = err
- return false
+ if ism.current, err = transform(nextItem, ism.context); err != nil {
+ ism.err = multierr.Append(ism.err, err)
}
return true
}
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index f95737b..3ba04be 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -158,7 +158,7 @@ func (sr *schemaRepo) Watcher() {
if !more {
return
}
- sr.l.Info().Interface("event", evt).Msg("received an event")
+ sr.l.Debug().Interface("event", evt).Msg("received an event")
for i := 0; i < 10; i++ {
var err error
switch evt.Typ {
@@ -365,7 +365,7 @@ func (sr *schemaRepo) Close() {
for _, g := range sr.data {
err := g.close()
if err != nil {
- sr.l.Err(err).Stringer("group", g.GetSchema().Metadata).Msg("closing")
+ sr.l.Err(err).RawJSON("group", logger.Proto(g.GetSchema().Metadata)).Msg("closing")
}
}
}
diff --git a/pkg/test/helpers/grpc_health.go b/pkg/test/helpers/grpc_health.go
index b78c0b1..694dfaf 100644
--- a/pkg/test/helpers/grpc_health.go
+++ b/pkg/test/helpers/grpc_health.go
@@ -28,10 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
)
-var (
- ErrServiceUnhealthy = errors.New("service is unhealthy")
- l = logger.GetLogger()
-)
+var ErrServiceUnhealthy = errors.New("service is unhealthy")
func HealthCheck(addr string, connTimeout time.Duration, rpcTimeout time.Duration, opts ...grpc.DialOption) func() error {
return func() error {
@@ -50,11 +47,12 @@ func HealthCheck(addr string, connTimeout time.Duration, rpcTimeout time.Duratio
}); err != nil {
return err
}
+ l := logger.GetLogger()
if resp.GetStatus() != grpc_health_v1.HealthCheckResponse_SERVING {
l.Warn().Str("responded_status", resp.GetStatus().String()).Msg("service unhealthy")
return ErrServiceUnhealthy
}
- l.Info().Stringer("status", resp.GetStatus()).Msg("connected")
+ l.Debug().Stringer("status", resp.GetStatus()).Msg("connected")
return nil
}
}
diff --git a/pkg/test/helpers/http_health.go b/pkg/test/helpers/http_health.go
index abd33d6..b2c83a1 100644
--- a/pkg/test/helpers/http_health.go
+++ b/pkg/test/helpers/http_health.go
@@ -21,6 +21,8 @@ import (
"time"
"github.com/go-resty/resty/v2"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
func HTTPHealthCheck(addr string) func() error {
@@ -34,13 +36,13 @@ func HTTPHealthCheck(addr string) func() error {
time.Sleep(1 * time.Second)
return err
}
-
+ l := logger.GetLogger("http-health")
if resp.StatusCode() != 200 {
l.Warn().Str("responded_status", resp.Status()).Msg("service unhealthy")
time.Sleep(1 * time.Second)
return ErrServiceUnhealthy
}
- l.Info().Stringer("response", resp).Msg("connected")
+ l.Debug().Stringer("response", resp).Msg("connected")
time.Sleep(500 * time.Millisecond)
return nil
}
diff --git a/test/cases/measure/data/input/linked_or.yaml b/test/cases/measure/data/input/linked_or.yaml
new file mode 100644
index 0000000..5481bcf
--- /dev/null
+++ b/test/cases/measure/data/input/linked_or.yaml
@@ -0,0 +1,56 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+metadata:
+ name: "service_cpm_minute"
+ group: "sw_metric"
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["id", "entity_id"]
+fieldProjection:
+ names: ["total", "value"]
+criteria:
+ le:
+ op: "LOGICAL_OP_OR"
+ left:
+ condition:
+ name: "id"
+ op: "BINARY_OP_EQ"
+ value:
+ id:
+ value: "4"
+ right:
+ le:
+ op: "LOGICAL_OP_OR"
+ left:
+ condition:
+ name: "id"
+ op: "BINARY_OP_EQ"
+ value:
+ id:
+ value: "5"
+ right:
+ le:
+ op: "LOGICAL_OP_OR"
+ left:
+ condition:
+ name: "id"
+ op: "BINARY_OP_EQ"
+ value:
+ id:
+ value: "unknown"
diff --git a/test/cases/measure/data/want/linked_or.yaml b/test/cases/measure/data/want/linked_or.yaml
new file mode 100644
index 0000000..785dba9
--- /dev/null
+++ b/test/cases/measure/data/want/linked_or.yaml
@@ -0,0 +1,60 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+ - name: total
+ value:
+ int:
+ value: "100"
+ - name: value
+ value:
+ int:
+ value: "2"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ id:
+ value: "4"
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
+ timestamp: "2022-10-17T12:50:45.912Z"
+- fields:
+ - name: total
+ value:
+ int:
+ value: "100"
+ - name: value
+ value:
+ int:
+ value: "3"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ id:
+ value: "5"
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
+ timestamp: "2022-10-17T12:51:45.912Z"
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 0ada8fb..5f3bce7 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -56,4 +56,5 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("filter by entity id and service id", helpers.Args{Input: "entity_service", Want: "entity", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("invalid logical expression", helpers.Args{Input: "err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantErr: true}),
+ g.Entry("linked or expressions", helpers.Args{Input: "linked_or", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
)
diff --git a/test/stress/env b/test/stress/env
index 78577df..007f541 100644
--- a/test/stress/env
+++ b/test/stress/env
@@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-SW_AGENT_JAVA_COMMIT=5bc1d1d1f1d9ce6a4f7fce20e8ecc330bccf47ec
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=34a4553e23530e8255efe6f5a0adff9e69555d64
-SW_AGENT_SATELLITE_COMMIT=1987e1d566ac90f6b58a45fd9bfa27bf8faad635
+
+SW_AGENT_JAVA_COMMIT=3f88d735ba2bfd1196aff946502447d4b14450c8
+SW_AGENT_SATELLITE_COMMIT=ea27a3f4e126a24775fe12e2aa2695bcb23d99c3
SW_AGENT_NGINX_LUA_COMMIT=c3cee4841798a147d83b96a10914d4ac0e11d0aa
SW_AGENT_NODEJS_COMMIT=2e7560518aff846befd4d6bc815fe5e38c704a11
SW_AGENT_GO_COMMIT=4af380c2db6243106b0fc650b6003ce3b3eb82a0
@@ -23,10 +23,10 @@ SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
-SW_ROVER_COMMIT=90c93c706743aac1f5853b677730edae8cc32a2c
-SW_CTL_COMMIT=219876daf985fd474955834ef0b65013f0890e96
-
-SW_OAP_COMMIT=dc39ce9bb44ed33d9c2bb0d5a054b1dfd5bbd657
+SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
+SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
+SW_OAP_COMMIT=d5388683322ee6a4aed2a3bc29d439aadfca9a04
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
TARGET=test
VUS=10
diff --git a/test/stress/env.dev b/test/stress/env.dev
index 5234b7a..add472c 100644
--- a/test/stress/env.dev
+++ b/test/stress/env.dev
@@ -13,9 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-SW_AGENT_JAVA_COMMIT=5bc1d1d1f1d9ce6a4f7fce20e8ecc330bccf47ec
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=34a4553e23530e8255efe6f5a0adff9e69555d64
-SW_AGENT_SATELLITE_COMMIT=1987e1d566ac90f6b58a45fd9bfa27bf8faad635
+SW_AGENT_JAVA_COMMIT=3f88d735ba2bfd1196aff946502447d4b14450c8
+SW_AGENT_SATELLITE_COMMIT=ea27a3f4e126a24775fe12e2aa2695bcb23d99c3
SW_AGENT_NGINX_LUA_COMMIT=c3cee4841798a147d83b96a10914d4ac0e11d0aa
SW_AGENT_NODEJS_COMMIT=2e7560518aff846befd4d6bc815fe5e38c704a11
SW_AGENT_GO_COMMIT=4af380c2db6243106b0fc650b6003ce3b3eb82a0
@@ -23,10 +22,10 @@ SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
-SW_ROVER_COMMIT=90c93c706743aac1f5853b677730edae8cc32a2c
-SW_CTL_COMMIT=219876daf985fd474955834ef0b65013f0890e96
-
-SW_OAP_COMMIT=dc39ce9bb44ed33d9c2bb0d5a054b1dfd5bbd657
+SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
+SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
+SW_OAP_COMMIT=d5388683322ee6a4aed2a3bc29d439aadfca9a04
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
TARGET=dev
VUS=1