You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/04/06 12:10:21 UTC

[skywalking-banyandb] branch main updated: Introduce the bus system to implement streaming data ingestion

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e3b44a  Introduce the bus system to implement streaming data ingestion
4e3b44a is described below

commit 4e3b44a16657d507b8ea7aab463e5a51722f3364
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Apr 6 20:06:10 2021 +0800

    Introduce the bus system to implement streaming data ingestion
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 {standalone/cmd/standalone => banyand/cmd}/main.go |   2 +-
 .../main.go => banyand/executor/executor.go        |  34 +++-
 .../standalone/main.go => banyand/index/index.go   |  29 ++-
 banyand/internal/bus/bus.go                        | 133 +++++++++++++
 banyand/internal/bus/bus_test.go                   | 221 +++++++++++++++++++++
 {standalone => banyand}/internal/cmd/root.go       |   5 +-
 banyand/internal/cmd/standalone.go                 |  80 ++++++++
 .../standalone/main.go => banyand/series/series.go |  29 ++-
 .../standalone/main.go => banyand/shard/shard.go   |  33 ++-
 .../main.go => banyand/storage/pipeline.go         |  20 +-
 go.mod                                             |   7 +-
 go.sum                                             |  22 ++
 .../cmd/standalone/main.go => pkg/logger/logger.go |  24 ++-
 13 files changed, 577 insertions(+), 62 deletions(-)

diff --git a/standalone/cmd/standalone/main.go b/banyand/cmd/main.go
similarity index 93%
copy from standalone/cmd/standalone/main.go
copy to banyand/cmd/main.go
index caf1295..b6167e8 100644
--- a/standalone/cmd/standalone/main.go
+++ b/banyand/cmd/main.go
@@ -23,7 +23,7 @@ import (
 	"fmt"
 	"os"
 
-	"github.com/apache/skywalking-banyandb/standalone/internal/cmd"
+	"github.com/apache/skywalking-banyandb/banyand/internal/cmd"
 )
 
 func main() {
diff --git a/standalone/cmd/standalone/main.go b/banyand/executor/executor.go
similarity index 51%
copy from standalone/cmd/standalone/main.go
copy to banyand/executor/executor.go
index caf1295..c89ab31 100644
--- a/standalone/cmd/standalone/main.go
+++ b/banyand/executor/executor.go
@@ -17,19 +17,37 @@
  *  under the License.
  */
 
-package main
+package executor
 
 import (
-	"fmt"
-	"os"
+	"time"
 
-	"github.com/apache/skywalking-banyandb/standalone/internal/cmd"
+	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	"github.com/apache/skywalking-banyandb/banyand/storage"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-func main() {
-	if err := cmd.NewRoot().Execute(); err != nil {
-		_, _ = fmt.Fprintln(os.Stderr, err)
-		os.Exit(1)
+var _ bus.MessageListener = (*Executor)(nil)
+
+type Executor struct {
+	log *logger.Logger
+	bus *bus.Bus
+}
+
+func NewExecutor(bus *bus.Bus) *Executor {
+	return &Executor{
+		bus: bus,
+		log: logger.Log.Scope("executor"),
 	}
 }
 
+func (s Executor) Rev(message bus.Message) {
+	s.log.Sugar().Infow("rev", "msg", message.Data())
+	_ = s.bus.Publish(storage.TraceIndex, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "index message"))
+	_ = s.bus.Publish(storage.TraceData, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "data message"))
+}
+
+func (s Executor) Close() error {
+	s.log.Sugar().Infow("closed")
+	return nil
+}
diff --git a/standalone/cmd/standalone/main.go b/banyand/index/index.go
similarity index 65%
copy from standalone/cmd/standalone/main.go
copy to banyand/index/index.go
index caf1295..c88c445 100644
--- a/standalone/cmd/standalone/main.go
+++ b/banyand/index/index.go
@@ -17,19 +17,30 @@
  *  under the License.
  */
 
-package main
+package index
 
 import (
-	"fmt"
-	"os"
-
-	"github.com/apache/skywalking-banyandb/standalone/internal/cmd"
+	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-func main() {
-	if err := cmd.NewRoot().Execute(); err != nil {
-		_, _ = fmt.Fprintln(os.Stderr, err)
-		os.Exit(1)
+var _ bus.MessageListener = (*Index)(nil)
+
+type Index struct {
+	log *logger.Logger
+}
+
+func NewIndex() *Index {
+	return &Index{
+		log: logger.Log.Scope("Index"),
 	}
 }
 
+func (s Index) Rev(message bus.Message) {
+	s.log.Sugar().Infow("rev", "msg", message.Data())
+}
+
+func (s Index) Close() error {
+	s.log.Sugar().Infow("closed")
+	return nil
+}
diff --git a/banyand/internal/bus/bus.go b/banyand/internal/bus/bus.go
new file mode 100644
index 0000000..d181dbd
--- /dev/null
+++ b/banyand/internal/bus/bus.go
@@ -0,0 +1,133 @@
+/*
+ *  Licensed to Apache Software Foundation (ASF) under one or more contributor
+ *  license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright
+ *  ownership. Apache Software Foundation (ASF) licenses this file to you under
+ *  the Apache License, Version 2.0 (the "License"); you may
+ *  not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http:www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package bus
+
+import (
+	"errors"
+	"io"
+	"sync"
+
+	"go.uber.org/atomic"
+)
+
+// Payload represents a simple data
+type Payload interface{}
+type MessageID uint64
+
+// Message is send on the bus to all subscribed listeners
+type Message struct {
+	id      MessageID
+	payload Payload
+}
+
+func (m Message) Data() interface{} {
+	return m.payload
+}
+
+func NewMessage(id MessageID, data interface{}) Message {
+	return Message{id: id, payload: data}
+}
+
+// EventListener is the signature of functions that can handle an EventMessage.
+type MessageListener interface {
+	Rev(message Message)
+	io.Closer
+}
+
+type Channel chan Message
+
+type Topic string
+
+// The Bus allows publish-subscribe-style communication between components
+type Bus struct {
+	topics map[Topic][]Channel
+	closed atomic.Bool
+	mutex  sync.RWMutex
+}
+
+func NewBus() *Bus {
+	b := new(Bus)
+	b.topics = make(map[Topic][]Channel)
+	return b
+}
+
+var (
+	ErrTopicEmpty    = errors.New("the topic is empty")
+	ErrTopicNotExist = errors.New("the topic does not exist")
+	ErrListenerEmpty = errors.New("the message listener is empty")
+	ErrClosed        = errors.New("the bus is closed")
+)
+
+func (b *Bus) Publish(topic Topic, message ...Message) error {
+	if topic == "" {
+		return ErrTopicEmpty
+	}
+	if b.closed.Load() {
+		return ErrClosed
+	}
+	cc, exit := b.topics[topic]
+	if !exit {
+		return ErrTopicNotExist
+	}
+	b.mutex.RLock()
+	defer b.mutex.RUnlock()
+	for _, each := range cc {
+		for _, m := range message {
+			go func(ch Channel, message Message) {
+				ch <- message
+			}(each, m)
+		}
+	}
+	return nil
+}
+
+// Subscribe adds an MessageListener to be called when a message of a Topic is posted.
+func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {
+	if topic == "" {
+		return ErrTopicEmpty
+	}
+	if listener == nil {
+		return ErrListenerEmpty
+	}
+	if b.closed.Load() {
+		return ErrClosed
+	}
+	b.mutex.Lock()
+	defer b.mutex.Unlock()
+	if _, exist := b.topics[topic]; !exist {
+		b.topics[topic] = make([]Channel, 0)
+	}
+	ch := make(Channel)
+	list, _ := b.topics[topic]
+	list = append(list, ch)
+	b.topics[topic] = list
+	go func(listener MessageListener, ch Channel) {
+		for {
+			c, ok := <-ch
+			if ok {
+				listener.Rev(c)
+			} else {
+				_ = listener.Close()
+				break
+			}
+		}
+	}(listener, ch)
+	return nil
+}
diff --git a/banyand/internal/bus/bus_test.go b/banyand/internal/bus/bus_test.go
new file mode 100644
index 0000000..bd08475
--- /dev/null
+++ b/banyand/internal/bus/bus_test.go
@@ -0,0 +1,221 @@
+/*
+ *  Licensed to Apache Software Foundation (ASF) under one or more contributor
+ *  license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright
+ *  ownership. Apache Software Foundation (ASF) licenses this file to you under
+ *  the Apache License, Version 2.0 (the "License"); you may
+ *  not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http:www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package bus
+
+import (
+	"reflect"
+	"sort"
+	"sync"
+	"testing"
+	"time"
+)
+
+func TestBus_PubAndSub(t *testing.T) {
+	type message struct {
+		topic      Topic
+		messageIDS []MessageID
+		wantErr    bool
+	}
+	type listener struct {
+		wantTopic    Topic
+		wantMessages []MessageID
+		wantErr      bool
+	}
+	tests := []struct {
+		name      string
+		messages  []message
+		listeners []listener
+	}{
+		{
+			name: "golden path",
+			messages: []message{
+				{
+					topic:      Topic("default"),
+					messageIDS: []MessageID{12, 33},
+				},
+			},
+			listeners: []listener{
+				{
+					wantTopic:    Topic("default"),
+					wantMessages: []MessageID{12, 33},
+				},
+			},
+		},
+		{
+			name: "two topics",
+			messages: []message{
+				{
+					topic:      Topic("t1"),
+					messageIDS: []MessageID{12, 33},
+				},
+				{
+					topic:      Topic("t2"),
+					messageIDS: []MessageID{101, 102},
+				},
+			},
+			listeners: []listener{
+				{
+					wantTopic:    Topic("t1"),
+					wantMessages: []MessageID{12, 33},
+				},
+				{
+					wantTopic:    Topic("t2"),
+					wantMessages: []MessageID{101, 102},
+				},
+			},
+		},
+		{
+			name: "two topics with two listeners",
+			messages: []message{
+				{
+					topic:      Topic("t1"),
+					messageIDS: []MessageID{12, 33},
+				},
+				{
+					topic:      Topic("t2"),
+					messageIDS: []MessageID{101, 102},
+				},
+			},
+			listeners: []listener{
+				{
+					wantTopic:    Topic("t1"),
+					wantMessages: []MessageID{12, 33},
+				},
+				{
+					wantTopic:    Topic("t1"),
+					wantMessages: []MessageID{12, 33},
+				},
+				{
+					wantTopic:    Topic("t2"),
+					wantMessages: []MessageID{101, 102},
+				},
+				{
+					wantTopic:    Topic("t2"),
+					wantMessages: []MessageID{101, 102},
+				},
+			},
+		},
+		{
+			name: "publish invalid topic",
+			messages: []message{
+				{
+					topic:      Topic(""),
+					messageIDS: []MessageID{12, 33},
+					wantErr:    true,
+				},
+			},
+		},
+		{
+			name: "publish empty message",
+			messages: []message{
+				{
+					topic:      Topic("default"),
+					messageIDS: []MessageID{},
+				},
+			},
+			listeners: []listener{
+				{
+					wantTopic:    Topic("default"),
+					wantMessages: []MessageID{},
+				},
+			},
+		},
+		{
+			name: "subscribe invalid topic",
+			listeners: []listener{
+				{
+					wantTopic: Topic(""),
+					wantErr:   true,
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		e := NewBus()
+		t.Run(tt.name, func(t *testing.T) {
+			wg := sync.WaitGroup{}
+			mll := make([]*mockListener, 0)
+			for _, l := range tt.listeners {
+				ml := &mockListener{wg: &wg}
+				mll = append(mll, ml)
+				wg.Add(len(l.wantMessages))
+				if err := e.Subscribe(l.wantTopic, ml); (err != nil) != l.wantErr {
+					t.Errorf("Subscribe() error = %v, wantErr %v", err, l.wantErr)
+				}
+			}
+			for _, m := range tt.messages {
+				mm := make([]Message, 0)
+				for _, id := range m.messageIDS {
+					mm = append(mm, NewMessage(id, nil))
+				}
+				err := e.Publish(m.topic, mm...)
+				if (err != nil) != m.wantErr {
+					t.Errorf("Publish() error = %v, wantErr %v", err, m.wantErr)
+				}
+			}
+			if waitTimeout(&wg, 10*time.Second) {
+				t.Error("message receiving is time out")
+			}
+			wg.Wait()
+			for i, l := range tt.listeners {
+				if len(mll[i].queue) > 0 && len(l.wantMessages) > 0 &&
+					!reflect.DeepEqual(mll[i].queue, l.wantMessages) {
+					t.Errorf("Bus got = %v, wanted %v", mll[i].queue, l.wantMessages)
+				}
+			}
+		})
+	}
+}
+
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+	c := make(chan struct{})
+	go func() {
+		defer close(c)
+		wg.Wait()
+	}()
+	select {
+	case <-c:
+		return false // completed normally
+	case <-time.After(timeout):
+		return true // timed out
+	}
+}
+
+var _ MessageListener = new(mockListener)
+
+type mockListener struct {
+	queue   []MessageID
+	wg      *sync.WaitGroup
+	closeWg *sync.WaitGroup
+}
+
+func (m *mockListener) Rev(message Message) {
+	m.queue = append(m.queue, message.id)
+	sort.SliceStable(m.queue, func(i, j int) bool {
+		return uint64(m.queue[i]) < uint64(m.queue[j])
+	})
+	m.wg.Done()
+}
+
+func (m *mockListener) Close() error {
+	m.queue = nil
+	m.closeWg.Done()
+	return nil
+}
diff --git a/standalone/internal/cmd/root.go b/banyand/internal/cmd/root.go
similarity index 95%
rename from standalone/internal/cmd/root.go
rename to banyand/internal/cmd/root.go
index 212aa90..a34e10b 100644
--- a/standalone/internal/cmd/root.go
+++ b/banyand/internal/cmd/root.go
@@ -33,10 +33,11 @@ const logo = `
 func NewRoot() *cobra.Command {
 	cmd := &cobra.Command{
 		DisableAutoGenTag: true,
-		Short: "BanyanDB is an observability database",
+		Short:             "BanyanDB is an observability database",
 		Long: logo + `
 BanyanDB, as an observability database, aims to ingest, analyze and store Metrics, Tracing and Logging data
 `,
 	}
+	cmd.AddCommand(newStandaloneCmd())
 	return cmd
-}
\ No newline at end of file
+}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
new file mode 100644
index 0000000..386ca73
--- /dev/null
+++ b/banyand/internal/cmd/standalone.go
@@ -0,0 +1,80 @@
+/*
+ *  Licensed to Apache Software Foundation (ASF) under one or more contributor
+ *  license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright
+ *  ownership. Apache Software Foundation (ASF) licenses this file to you under
+ *  the Apache License, Version 2.0 (the "License"); you may
+ *  not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http:www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package cmd
+
+import (
+	"context"
+	"os"
+	"os/signal"
+	"syscall"
+
+	"github.com/spf13/cobra"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/banyand/executor"
+	"github.com/apache/skywalking-banyandb/banyand/index"
+	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	"github.com/apache/skywalking-banyandb/banyand/series"
+	"github.com/apache/skywalking-banyandb/banyand/shard"
+	"github.com/apache/skywalking-banyandb/banyand/storage"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func newStandaloneCmd() *cobra.Command {
+	standaloneCmd := &cobra.Command{
+		Use:   "standalone",
+		Short: "Run as the standalone mode",
+		RunE: func(cmd *cobra.Command, args []string) (err error) {
+			logger.Log.Info("starting as a standalone server")
+			dataBus := bus.NewBus()
+			err = multierr.Append(err, dataBus.Subscribe(storage.TraceRaw, shard.NewShard(dataBus)))
+			err = multierr.Append(err, dataBus.Subscribe(storage.TraceSharded, executor.NewExecutor(dataBus)))
+			err = multierr.Append(err, dataBus.Subscribe(storage.TraceIndex, index.NewIndex()))
+			err = multierr.Append(err, dataBus.Subscribe(storage.TraceData, series.NewSeries()))
+			if err != nil {
+				return err
+			}
+			if err = dataBus.Publish(storage.TraceRaw, bus.NewMessage(0, "initialization")); err != nil {
+				return err
+			}
+			ctx := newContext()
+			<-ctx.Done()
+			return nil
+		},
+	}
+
+	return standaloneCmd
+}
+
+func newContext() context.Context {
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
+	ctx, cancel := context.WithCancel(context.Background())
+	go func() {
+		defer cancel()
+		select {
+		case <-ctx.Done():
+			return
+		case <-c:
+			return
+		}
+	}()
+	return ctx
+}
diff --git a/standalone/cmd/standalone/main.go b/banyand/series/series.go
similarity index 64%
copy from standalone/cmd/standalone/main.go
copy to banyand/series/series.go
index caf1295..9829468 100644
--- a/standalone/cmd/standalone/main.go
+++ b/banyand/series/series.go
@@ -17,19 +17,30 @@
  *  under the License.
  */
 
-package main
+package series
 
 import (
-	"fmt"
-	"os"
-
-	"github.com/apache/skywalking-banyandb/standalone/internal/cmd"
+	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-func main() {
-	if err := cmd.NewRoot().Execute(); err != nil {
-		_, _ = fmt.Fprintln(os.Stderr, err)
-		os.Exit(1)
+var _ bus.MessageListener = (*Series)(nil)
+
+type Series struct {
+	log *logger.Logger
+}
+
+func NewSeries() *Series {
+	return &Series{
+		log: logger.Log.Scope("series"),
 	}
 }
 
+func (s Series) Rev(message bus.Message) {
+	s.log.Sugar().Infow("rev", "msg", message.Data())
+}
+
+func (s Series) Close() error {
+	s.log.Sugar().Infow("closed")
+	return nil
+}
diff --git a/standalone/cmd/standalone/main.go b/banyand/shard/shard.go
similarity index 55%
copy from standalone/cmd/standalone/main.go
copy to banyand/shard/shard.go
index caf1295..0d95c86 100644
--- a/standalone/cmd/standalone/main.go
+++ b/banyand/shard/shard.go
@@ -17,19 +17,36 @@
  *  under the License.
  */
 
-package main
+package shard
 
 import (
-	"fmt"
-	"os"
+	"time"
 
-	"github.com/apache/skywalking-banyandb/standalone/internal/cmd"
+	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	"github.com/apache/skywalking-banyandb/banyand/storage"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-func main() {
-	if err := cmd.NewRoot().Execute(); err != nil {
-		_, _ = fmt.Fprintln(os.Stderr, err)
-		os.Exit(1)
+var _ bus.MessageListener = (*Shard)(nil)
+
+type Shard struct {
+	log *logger.Logger
+	bus *bus.Bus
+}
+
+func NewShard(bus *bus.Bus) *Shard {
+	return &Shard{
+		bus: bus,
+		log: logger.Log.Scope("shard"),
 	}
 }
 
+func (s Shard) Rev(message bus.Message) {
+	s.log.Sugar().Infow("rev", "msg", message.Data())
+	_ = s.bus.Publish(storage.TraceSharded, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "sharded message"))
+}
+
+func (s Shard) Close() error {
+	s.log.Sugar().Infow("closed")
+	return nil
+}
diff --git a/standalone/cmd/standalone/main.go b/banyand/storage/pipeline.go
similarity index 79%
copy from standalone/cmd/standalone/main.go
copy to banyand/storage/pipeline.go
index caf1295..bf4c3ce 100644
--- a/standalone/cmd/standalone/main.go
+++ b/banyand/storage/pipeline.go
@@ -17,19 +17,11 @@
  *  under the License.
  */
 
-package main
+package storage
 
-import (
-	"fmt"
-	"os"
-
-	"github.com/apache/skywalking-banyandb/standalone/internal/cmd"
+const (
+	TraceRaw     = "trace-raw"
+	TraceSharded = "trace-sharded"
+	TraceIndex   = "trace-index"
+	TraceData    = "trace-data"
 )
-
-func main() {
-	if err := cmd.NewRoot().Execute(); err != nil {
-		_, _ = fmt.Fprintln(os.Stderr, err)
-		os.Exit(1)
-	}
-}
-
diff --git a/go.mod b/go.mod
index abb526c..dc54b5a 100644
--- a/go.mod
+++ b/go.mod
@@ -2,4 +2,9 @@ module github.com/apache/skywalking-banyandb
 
 go 1.16
 
-require github.com/spf13/cobra v1.1.3
+require (
+	github.com/spf13/cobra v1.1.3
+	go.uber.org/atomic v1.7.0
+	go.uber.org/multierr v1.6.0 // indirect
+	go.uber.org/zap v1.16.0
+)
diff --git a/go.sum b/go.sum
index a759029..2461967 100644
--- a/go.sum
+++ b/go.sum
@@ -11,6 +11,7 @@ cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqCl
 cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
 cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
+github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
@@ -32,6 +33,7 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7
 github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
 github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
@@ -122,7 +124,9 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
 github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
@@ -158,6 +162,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
@@ -165,8 +171,17 @@ go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
 go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
 go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
 go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
+go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
+go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
+go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
 go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
+go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -185,6 +200,7 @@ golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTk
 golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
 golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
 golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
 golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
 golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
 golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
@@ -246,6 +262,9 @@ golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgw
 golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8=
 golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
@@ -277,10 +296,13 @@ gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
 gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
 honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
 rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
diff --git a/standalone/cmd/standalone/main.go b/pkg/logger/logger.go
similarity index 78%
rename from standalone/cmd/standalone/main.go
rename to pkg/logger/logger.go
index caf1295..09e5a5e 100644
--- a/standalone/cmd/standalone/main.go
+++ b/pkg/logger/logger.go
@@ -17,19 +17,23 @@
  *  under the License.
  */
 
-package main
+package logger
 
 import (
-	"fmt"
-	"os"
-
-	"github.com/apache/skywalking-banyandb/standalone/internal/cmd"
+	"go.uber.org/zap"
 )
 
-func main() {
-	if err := cmd.NewRoot().Execute(); err != nil {
-		_, _ = fmt.Fprintln(os.Stderr, err)
-		os.Exit(1)
-	}
+type Logger struct {
+	*zap.Logger
+}
+
+func (l Logger) Scope(scope string) *Logger {
+	return &Logger{l.Logger.Named(scope)}
 }
 
+var Log *Logger
+
+func init() {
+	l, _ := zap.NewDevelopment()
+	Log = &Logger{l}
+}