You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/07/12 03:26:27 UTC

[skywalking-banyandb] 02/02: Add local discovery and shard event

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch series-trace
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit ccb0d430c5222a7dfbc0a90ca694082c631401a8
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Jul 12 09:13:34 2021 +0800

    Add local discovery and shard event
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/event/discovery.go                    |  5 +++
 api/fbs/v1/database.fbs                   |  4 +-
 api/fbs/v1/database_generated.go          | 61 +++++++++++++++++++++++--------
 banyand/discovery/discovery.go            | 36 ++++++++++++++----
 banyand/internal/cmd/standalone.go        |  4 +-
 banyand/liaison/grpc/grpc.go              | 32 ++++++++++++++--
 banyand/liaison/liaison.go                |  6 ++-
 banyand/queue/local.go                    | 18 ++++-----
 banyand/queue/queue.go                    |  6 +--
 banyand/series/series.go                  |  1 +
 banyand/series/trace/common_test.go       |  2 +-
 banyand/series/trace/trace.go             | 41 ++++++++++++++++++++-
 {banyand/internal => pkg}/bus/bus.go      |  0
 {banyand/internal => pkg}/bus/bus_test.go |  0
 pkg/convert/hash.go                       |  4 ++
 pkg/fb/database.go                        | 59 ++++++++++++++++++++++++++++++
 16 files changed, 234 insertions(+), 45 deletions(-)

diff --git a/api/event/discovery.go b/api/event/discovery.go
index 6916c65..40360c4 100644
--- a/api/event/discovery.go
+++ b/api/event/discovery.go
@@ -20,6 +20,7 @@ package event
 import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
 )
 
 var (
@@ -27,6 +28,10 @@ var (
 		Version: "v1",
 		Kind:    "event-shard",
 	}
+	TopicShardEvent = bus.Topic{
+		ID:   ShardEventKindVersion.String(),
+		Type: bus.ChTypeUnidirectional,
+	}
 )
 
 type Shard struct {
diff --git a/api/fbs/v1/database.fbs b/api/fbs/v1/database.fbs
index 836553c..d71c1f1 100644
--- a/api/fbs/v1/database.fbs
+++ b/api/fbs/v1/database.fbs
@@ -25,8 +25,10 @@ table Node {
 }
 
 table Shard {
-    id:int64;
+    id:uint64;
+    series:Metadata;
     node:Node;
+    total:uint8;
     update_time:int64;
     create_time:int64;
 }
diff --git a/api/fbs/v1/database_generated.go b/api/fbs/v1/database_generated.go
index 0021dc6..4236dff 100644
--- a/api/fbs/v1/database_generated.go
+++ b/api/fbs/v1/database_generated.go
@@ -145,23 +145,36 @@ func (rcv *Shard) Table() flatbuffers.Table {
 	return rcv._tab
 }
 
-func (rcv *Shard) Id() int64 {
+func (rcv *Shard) Id() uint64 {
 	o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
 	if o != 0 {
-		return rcv._tab.GetInt64(o + rcv._tab.Pos)
+		return rcv._tab.GetUint64(o + rcv._tab.Pos)
 	}
 	return 0
 }
 
-func (rcv *Shard) MutateId(n int64) bool {
-	return rcv._tab.MutateInt64Slot(4, n)
+func (rcv *Shard) MutateId(n uint64) bool {
+	return rcv._tab.MutateUint64Slot(4, n)
 }
 
-func (rcv *Shard) Node(obj *Node) *Node {
+func (rcv *Shard) Series(obj *Metadata) *Metadata {
 	o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
 	if o != 0 {
 		x := rcv._tab.Indirect(o + rcv._tab.Pos)
 		if obj == nil {
+			obj = new(Metadata)
+		}
+		obj.Init(rcv._tab.Bytes, x)
+		return obj
+	}
+	return nil
+}
+
+func (rcv *Shard) Node(obj *Node) *Node {
+	o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+	if o != 0 {
+		x := rcv._tab.Indirect(o + rcv._tab.Pos)
+		if obj == nil {
 			obj = new(Node)
 		}
 		obj.Init(rcv._tab.Bytes, x)
@@ -170,8 +183,20 @@ func (rcv *Shard) Node(obj *Node) *Node {
 	return nil
 }
 
+func (rcv *Shard) Total() byte {
+	o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+	if o != 0 {
+		return rcv._tab.GetByte(o + rcv._tab.Pos)
+	}
+	return 0
+}
+
+func (rcv *Shard) MutateTotal(n byte) bool {
+	return rcv._tab.MutateByteSlot(10, n)
+}
+
 func (rcv *Shard) UpdateTime() int64 {
-	o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+	o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
 	if o != 0 {
 		return rcv._tab.GetInt64(o + rcv._tab.Pos)
 	}
@@ -179,11 +204,11 @@ func (rcv *Shard) UpdateTime() int64 {
 }
 
 func (rcv *Shard) MutateUpdateTime(n int64) bool {
-	return rcv._tab.MutateInt64Slot(8, n)
+	return rcv._tab.MutateInt64Slot(12, n)
 }
 
 func (rcv *Shard) CreateTime() int64 {
-	o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+	o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
 	if o != 0 {
 		return rcv._tab.GetInt64(o + rcv._tab.Pos)
 	}
@@ -191,23 +216,29 @@ func (rcv *Shard) CreateTime() int64 {
 }
 
 func (rcv *Shard) MutateCreateTime(n int64) bool {
-	return rcv._tab.MutateInt64Slot(10, n)
+	return rcv._tab.MutateInt64Slot(14, n)
 }
 
 func ShardStart(builder *flatbuffers.Builder) {
-	builder.StartObject(4)
+	builder.StartObject(6)
+}
+func ShardAddId(builder *flatbuffers.Builder, id uint64) {
+	builder.PrependUint64Slot(0, id, 0)
 }
-func ShardAddId(builder *flatbuffers.Builder, id int64) {
-	builder.PrependInt64Slot(0, id, 0)
+func ShardAddSeries(builder *flatbuffers.Builder, series flatbuffers.UOffsetT) {
+	builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(series), 0)
 }
 func ShardAddNode(builder *flatbuffers.Builder, node flatbuffers.UOffsetT) {
-	builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(node), 0)
+	builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(node), 0)
+}
+func ShardAddTotal(builder *flatbuffers.Builder, total byte) {
+	builder.PrependByteSlot(3, total, 0)
 }
 func ShardAddUpdateTime(builder *flatbuffers.Builder, updateTime int64) {
-	builder.PrependInt64Slot(2, updateTime, 0)
+	builder.PrependInt64Slot(4, updateTime, 0)
 }
 func ShardAddCreateTime(builder *flatbuffers.Builder, createTime int64) {
-	builder.PrependInt64Slot(3, createTime, 0)
+	builder.PrependInt64Slot(5, createTime, 0)
 }
 func ShardEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
 	return builder.EndObject()
diff --git a/banyand/discovery/discovery.go b/banyand/discovery/discovery.go
index 9e7197c..8856166 100644
--- a/banyand/discovery/discovery.go
+++ b/banyand/discovery/discovery.go
@@ -20,17 +20,39 @@ package discovery
 import (
 	"context"
 
-	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	bus2 "github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 type ServiceRepo interface {
-	run.Config
-	run.Service
-	bus.Subscriber
-	bus.Publisher
+	NodeID() string
+	run.Unit
+	bus2.Subscriber
+	bus2.Publisher
 }
 
-func NewServiceRepo(ctx context.Context) (ServiceRepo, error) {
-	return nil, nil
+type repo struct {
+	local *bus2.Bus
+}
+
+func (r *repo) NodeID() string {
+	return "local"
+}
+
+func (r *repo) Name() string {
+	return "service-discovery"
+}
+
+func (r *repo) Subscribe(topic bus2.Topic, listener bus2.MessageListener) error {
+	return r.local.Subscribe(topic, listener)
+}
+
+func (r *repo) Publish(topic bus2.Topic, message ...bus2.Message) (bus2.Future, error) {
+	return r.local.Publish(topic, message...)
+}
+
+func NewServiceRepo(_ context.Context) (ServiceRepo, error) {
+	return &repo{
+		local: bus2.NewBus(),
+	}, nil
 }
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 69e069e..2d50601 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -61,7 +61,7 @@ func newStandaloneCmd() *cobra.Command {
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate index builder")
 	}
-	traceSeries, err := trace.NewService(ctx, db)
+	traceSeries, err := trace.NewService(ctx, db, repo)
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate trace series")
 	}
@@ -69,7 +69,7 @@ func newStandaloneCmd() *cobra.Command {
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate query executor")
 	}
-	tcp, err := liaison.NewEndpoint(ctx, pipeline)
+	tcp, err := liaison.NewEndpoint(ctx, pipeline, repo)
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer")
 	}
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 36f658c..deade08 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -25,7 +25,11 @@ import (
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/encoding"
 
+	"github.com/apache/skywalking-banyandb/api/event"
+	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -35,10 +39,33 @@ type Server struct {
 	log      *logger.Logger
 	ser      *grpclib.Server
 	pipeline queue.Queue
+	repo     discovery.ServiceRepo
 }
 
-func NewServer(ctx context.Context, pipeline queue.Queue) *Server {
-	return &Server{pipeline: pipeline}
+func (s *Server) Rev(message bus.Message) (resp bus.Message) {
+	data, ok := message.Data().([]byte)
+	if !ok {
+		s.log.Warn().Msg("invalid event data type")
+		return
+	}
+	shardEvent := v1.GetRootAsShardEvent(data, 0)
+	s.log.Info().
+		Str("action", shardEvent.Action().String()).
+		Uint64("shardID", shardEvent.Shard(nil).Id()).
+		Msg("received a shard event")
+	return
+}
+
+func (s *Server) PreRun() error {
+	s.log = logger.GetLogger("liaison-grpc")
+	return s.repo.Subscribe(event.TopicShardEvent, s)
+}
+
+func NewServer(ctx context.Context, pipeline queue.Queue, repo discovery.ServiceRepo) *Server {
+	return &Server{
+		pipeline: pipeline,
+		repo:     repo,
+	}
 }
 
 func (s *Server) Name() string {
@@ -56,7 +83,6 @@ func (s *Server) Validate() error {
 }
 
 func (s *Server) Serve() error {
-	s.log = logger.GetLogger("grpc")
 	lis, err := net.Listen("tcp", s.addr)
 	if err != nil {
 		s.log.Fatal().Err(err).Msg("Failed to listen")
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index 117b361..8eb9274 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -20,6 +20,7 @@ package liaison
 import (
 	"context"
 
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/run"
@@ -27,9 +28,10 @@ import (
 
 type Endpoint interface {
 	run.Config
+	run.PreRunner
 	run.Service
 }
 
-func NewEndpoint(ctx context.Context, pipeline queue.Queue) (Endpoint, error) {
-	return grpc.NewServer(ctx, pipeline), nil
+func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo discovery.ServiceRepo) (Endpoint, error) {
+	return grpc.NewServer(ctx, pipeline, repo), nil
 }
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 2a4fe7a..45bad1d 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -21,7 +21,7 @@ import (
 	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
-	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	bus2 "github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -34,33 +34,33 @@ type Component interface {
 
 type DataSubscriber interface {
 	Component
-	Sub(subscriber bus.Subscriber) error
+	Sub(subscriber bus2.Subscriber) error
 }
 
 type DataPublisher interface {
 	Component
-	Pub(publisher bus.Publisher) error
+	Pub(publisher bus2.Publisher) error
 }
 
 var _ run.PreRunner = (*Local)(nil)
 var _ run.Config = (*Local)(nil)
-var _ bus.Publisher = (*Local)(nil)
-var _ bus.Subscriber = (*Local)(nil)
+var _ bus2.Publisher = (*Local)(nil)
+var _ bus2.Subscriber = (*Local)(nil)
 
 type Local struct {
 	logger  *logger.Logger
 	test    string
-	dataBus *bus.Bus
+	dataBus *bus2.Bus
 	dps     []DataPublisher
 	dss     []DataSubscriber
 	repo    discovery.ServiceRepo
 }
 
-func (e *Local) Subscribe(topic bus.Topic, listener bus.MessageListener) error {
+func (e *Local) Subscribe(topic bus2.Topic, listener bus2.MessageListener) error {
 	return nil
 }
 
-func (e *Local) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, error) {
+func (e *Local) Publish(topic bus2.Topic, message ...bus2.Message) (bus2.Future, error) {
 	return nil, nil
 }
 
@@ -82,7 +82,7 @@ func (e Local) Name() string {
 
 func (e *Local) PreRun() error {
 	var err error
-	e.dataBus = bus.NewBus()
+	e.dataBus = bus2.NewBus()
 	for _, dp := range e.dps {
 		err = multierr.Append(err, dp.Pub(e.dataBus))
 	}
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index ff59cee..e2cd6af 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -21,15 +21,15 @@ import (
 	"context"
 
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
-	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	bus2 "github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 type Queue interface {
 	run.Config
 	run.PreRunner
-	bus.Subscriber
-	bus.Publisher
+	bus2.Subscriber
+	bus2.Publisher
 }
 
 func NewQueue(ctx context.Context, repo discovery.ServiceRepo) (Queue, error) {
diff --git a/banyand/series/series.go b/banyand/series/series.go
index e75db05..db280f3 100644
--- a/banyand/series/series.go
+++ b/banyand/series/series.go
@@ -81,4 +81,5 @@ type Service interface {
 	SchemaRepo
 	IndexFilter
 	run.PreRunner
+	run.Service
 }
diff --git a/banyand/series/trace/common_test.go b/banyand/series/trace/common_test.go
index 205b15e..7616ebd 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -58,7 +58,7 @@ func setup(t *testing.T) (*traceSeries, func()) {
 	db, err := storage.NewDB(context.TODO(), nil)
 	assert.NoError(t, err)
 	assert.NoError(t, db.FlagSet().Parse(nil))
-	svc, err := NewService(context.TODO(), db)
+	svc, err := NewService(context.TODO(), db, nil)
 	assert.NoError(t, err)
 	assert.NoError(t, svc.PreRun())
 	assert.NoError(t, db.PreRun())
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index 3bd6c89..a785c7b 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -22,17 +22,22 @@ import (
 	"context"
 	"fmt"
 	"strconv"
+	"time"
 
 	flatbuffers "github.com/google/flatbuffers/go"
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/data"
+	"github.com/apache/skywalking-banyandb/api/event"
 	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
 	apischema "github.com/apache/skywalking-banyandb/api/schema"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/series"
 	"github.com/apache/skywalking-banyandb/banyand/series/schema"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/fb"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -78,11 +83,16 @@ type service struct {
 	db        storage.Database
 	schemaMap map[string]*traceSeries
 	l         *logger.Logger
+	repo      discovery.ServiceRepo
+	stopCh    chan struct{}
 }
 
 //NewService returns a new service
-func NewService(_ context.Context, db storage.Database) (series.Service, error) {
-	return &service{db: db}, nil
+func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo) (series.Service, error) {
+	return &service{
+		db:   db,
+		repo: repo,
+	}, nil
 }
 
 func (s *service) Name() string {
@@ -109,6 +119,33 @@ func (s *service) PreRun() error {
 	return err
 }
 
+func (s *service) Serve() error {
+	for _, sMeta := range s.schemaMap {
+		for i := 0; i < int(sMeta.shardNum); i++ {
+			now := time.Now().UnixNano()
+			_, err := s.repo.Publish(event.TopicShardEvent, bus.NewMessage(bus.MessageID(now), fb.BuildShardEvent(
+				s.repo.NodeID(),
+				sMeta.name,
+				sMeta.group,
+				uint(i),
+				sMeta.shardNum,
+			)))
+			if err != nil {
+				return err
+			}
+		}
+	}
+	s.stopCh = make(chan struct{})
+	<-s.stopCh
+	return nil
+}
+
+func (s *service) GracefulStop() {
+	if s.stopCh != nil {
+		close(s.stopCh)
+	}
+}
+
 func (s *service) FetchTrace(traceSeries common.Metadata, traceID string, opt series.ScanOptions) (data.Trace, error) {
 	ts, err := s.getSeries(traceSeries)
 	if err != nil {
diff --git a/banyand/internal/bus/bus.go b/pkg/bus/bus.go
similarity index 100%
rename from banyand/internal/bus/bus.go
rename to pkg/bus/bus.go
diff --git a/banyand/internal/bus/bus_test.go b/pkg/bus/bus_test.go
similarity index 100%
rename from banyand/internal/bus/bus_test.go
rename to pkg/bus/bus_test.go
diff --git a/pkg/convert/hash.go b/pkg/convert/hash.go
index e7014fe..b08c2a5 100644
--- a/pkg/convert/hash.go
+++ b/pkg/convert/hash.go
@@ -22,3 +22,7 @@ import "github.com/cespare/xxhash"
 func Hash(key []byte) uint64 {
 	return xxhash.Sum64(key)
 }
+
+func HashStr(key string) uint64 {
+	return xxhash.Sum64String(key)
+}
diff --git a/pkg/fb/database.go b/pkg/fb/database.go
new file mode 100644
index 0000000..0d18fc0
--- /dev/null
+++ b/pkg/fb/database.go
@@ -0,0 +1,59 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fb
+
+import (
+	"time"
+
+	flatbuffers "github.com/google/flatbuffers/go"
+
+	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+)
+
+func BuildShardEvent(nodeID, seriesName, seriesGroup string, shardID, totalShards uint) []byte {
+	now := time.Now().UnixNano()
+	b := flatbuffers.NewBuilder(0)
+	nID := b.CreateString(nodeID)
+	addr := b.CreateString("localhost")
+	v1.NodeStart(b)
+	v1.NodeAddId(b, nID)
+	v1.NodeAddAddr(b, addr)
+	v1.NodeAddCreateTime(b, now)
+	v1.NodeAddUpdateTime(b, now)
+	node := v1.NodeEnd(b)
+	name := b.CreateString(seriesName)
+	group := b.CreateString(seriesGroup)
+	v1.MetadataStart(b)
+	v1.MetadataAddName(b, name)
+	v1.MetadataAddGroup(b, group)
+	md := v1.MetadataEnd(b)
+	v1.ShardStart(b)
+	v1.ShardAddId(b, uint64(shardID))
+	v1.ShardAddNode(b, node)
+	v1.ShardAddSeries(b, md)
+	v1.ShardAddTotal(b, uint8(totalShards))
+	v1.ShardAddCreateTime(b, now)
+	v1.ShardAddUpdateTime(b, now)
+	shard := v1.ShardEnd(b)
+	v1.ShardEventStart(b)
+	v1.ShardEventAddShard(b, shard)
+	v1.ShardEventAddTime(b, time.Now().UnixNano())
+	v1.ShardEventAddAction(b, v1.ActionPut)
+	b.Finish(v1.ShardEventEnd(b))
+	return b.FinishedBytes()
+}