You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/03/31 17:43:02 UTC
[pulsar-client-go] branch master updated: Add TableView support (#743)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 3e1b39d Add TableView support (#743)
3e1b39d is described below
commit 3e1b39dd6f88aa0a12b949b1d1e064bde6355e22
Author: Ziyao Wei <zi...@gmail.com>
AuthorDate: Thu Mar 31 13:42:56 2022 -0400
Add TableView support (#743)
* Add TableView for Pulsar Golang client
* cckellogg
* Return a copy of the underlying map in tv.Entries()
---
pulsar/client.go | 4 +
pulsar/client_impl.go | 9 ++
pulsar/table_view.go | 78 +++++++++++++
pulsar/table_view_impl.go | 271 ++++++++++++++++++++++++++++++++++++++++++++++
pulsar/table_view_test.go | 80 ++++++++++++++
5 files changed, 442 insertions(+)
diff --git a/pulsar/client.go b/pulsar/client.go
index 8ff152a..f4642c6 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -141,6 +141,10 @@ type Client interface {
// This method will block until the reader is created successfully.
CreateReader(ReaderOptions) (Reader, error)
+ // CreateTableView creates a table view instance.
+ // This method will block until the table view is created successfully.
+ CreateTableView(TableViewOptions) (TableView, error)
+
// TopicPartitions Fetches the list of partitions for a given topic
//
// If the topic is partitioned, this will return a list of partition names.
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 5682927..09b72af 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -176,6 +176,15 @@ func (c *client) CreateReader(options ReaderOptions) (Reader, error) {
return reader, nil
}
+func (c *client) CreateTableView(options TableViewOptions) (TableView, error) {
+ tableView, err := newTableView(c, options)
+ if err != nil {
+ return nil, err
+ }
+ c.handlers.Add(tableView)
+ return tableView, nil
+}
+
func (c *client) TopicPartitions(topic string) ([]string, error) {
topicName, err := internal.ParseTopicName(topic)
if err != nil {
diff --git a/pulsar/table_view.go b/pulsar/table_view.go
new file mode 100644
index 0000000..e566bf0
--- /dev/null
+++ b/pulsar/table_view.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "reflect"
+ "time"
+
+ "github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+// TableViewOptions contains the options for creating a TableView
+type TableViewOptions struct {
+ // Topic specifies the topic this table view will subscribe on.
+ // This argument is required when constructing the table view.
+ Topic string
+
+ // Set the interval of updating partitions. Default to 1 minute.
+ AutoUpdatePartitionsInterval time.Duration
+
+ // Schema represents the schema implementation.
+ Schema Schema
+
+ // SchemaValueType represents the type of values for the given schema.
+ SchemaValueType reflect.Type
+
+ // Configure the logger used by the TableView.
+ // By default, a wrapped logrus.StandardLogger will be used, namely,
+ // log.NewLoggerWithLogrus(logrus.StandardLogger())
+ Logger log.Logger
+}
+
+// TableView provides a key-value map view of a compacted topic. Messages without keys will be ignored.
+type TableView interface {
+ // Size returns the number of key-value mappings in the TableView.
+ Size() int
+
+ // IsEmpty returns true if this TableView contains no key-value mappings.
+ IsEmpty() bool
+
+ // ContainsKey returns true if this TableView contains a mapping for the specified key.
+ ContainsKey(key string) bool
+
+ // Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key.
+ Get(key string) interface{}
+
+ // Entries returns a map view of the mappings contained in this TableView.
+ Entries() map[string]interface{}
+
+ // Keys returns a slice of the keys contained in this TableView.
+ Keys() []string
+
+ // ForEach performs the give action for each entry in this map until all entries have been processed or the action
+ // returns an error.
+ ForEach(func(string, interface{}) error) error
+
+ // ForEachAndListen performs the give action for each entry in this map until all entries have been processed or
+ // the action returns an error.
+ ForEachAndListen(func(string, interface{}) error) error
+
+ // Close closes the table view and releases resources allocated.
+ Close()
+}
diff --git a/pulsar/table_view_impl.go b/pulsar/table_view_impl.go
new file mode 100644
index 0000000..56b9fcb
--- /dev/null
+++ b/pulsar/table_view_impl.go
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "reflect"
+ "sync"
+ "time"
+
+ "github.com/apache/pulsar-client-go/pulsar/log"
+ "github.com/sirupsen/logrus"
+)
+
+type cancelReader struct {
+ reader Reader
+ cancelFunc context.CancelFunc
+}
+
+type TableViewImpl struct {
+ client *client
+ options TableViewOptions
+
+ dataMu sync.Mutex
+ data map[string]interface{}
+
+ readersMu sync.Mutex
+ cancelRaders map[string]cancelReader
+
+ listenersMu sync.Mutex
+ listeners []func(string, interface{}) error
+
+ logger log.Logger
+ closed bool
+ closedCh chan struct{}
+}
+
+func newTableView(client *client, options TableViewOptions) (TableView, error) {
+ if options.Topic == "" {
+ return nil, newError(TopicNotFound, "topic is required")
+ }
+
+ if options.Schema != nil && options.SchemaValueType == nil {
+ return nil, newError(InvalidConfiguration, "SchemaValueType is required when Schema is present")
+ }
+
+ var logger log.Logger
+ if options.Logger != nil {
+ logger = options.Logger
+ } else {
+ logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
+ }
+
+ if options.AutoUpdatePartitionsInterval == 0 {
+ options.AutoUpdatePartitionsInterval = time.Minute
+ }
+
+ tv := TableViewImpl{
+ client: client,
+ options: options,
+ data: make(map[string]interface{}),
+ cancelRaders: make(map[string]cancelReader),
+ logger: logger,
+ closedCh: make(chan struct{}),
+ }
+
+ // Do an initial round of partition update check to make sure we can populate the partition readers
+ if err := tv.partitionUpdateCheck(); err != nil {
+ return nil, err
+ }
+ go tv.periodicPartitionUpdateCheck()
+
+ return &tv, nil
+}
+
+func (tv *TableViewImpl) partitionUpdateCheck() error {
+ partitionsArray, err := tv.client.TopicPartitions(tv.options.Topic)
+ if err != nil {
+ return fmt.Errorf("tv.client.TopicPartitions(%s) failed: %w", tv.options.Topic, err)
+ }
+
+ partitions := make(map[string]bool, len(partitionsArray))
+ for _, partition := range partitionsArray {
+ partitions[partition] = true
+ }
+
+ tv.readersMu.Lock()
+ defer tv.readersMu.Unlock()
+
+ for partition, cancelReader := range tv.cancelRaders {
+ if _, ok := partitions[partition]; !ok {
+ cancelReader.cancelFunc()
+ cancelReader.reader.Close()
+ delete(tv.cancelRaders, partition)
+ }
+ }
+
+ for partition := range partitions {
+ if _, ok := tv.cancelRaders[partition]; !ok {
+ reader, err := newReader(tv.client, ReaderOptions{
+ Topic: partition,
+ StartMessageID: EarliestMessageID(),
+ ReadCompacted: true,
+ // TODO: Pooling?
+ Schema: tv.options.Schema,
+ })
+ if err != nil {
+ return fmt.Errorf("create new reader failed for %s: %w", partition, err)
+ }
+ for reader.HasNext() {
+ msg, err := reader.Next(context.Background())
+ if err != nil {
+ tv.logger.Errorf("read next message failed for %s: %w", partition, err)
+ }
+ tv.handleMessage(msg)
+ }
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ tv.cancelRaders[partition] = cancelReader{
+ reader: reader,
+ cancelFunc: cancelFunc,
+ }
+ go tv.watchReaderForNewMessages(ctx, reader)
+ }
+ }
+
+ return nil
+}
+
+func (tv *TableViewImpl) periodicPartitionUpdateCheck() {
+ for {
+ if err := tv.partitionUpdateCheck(); err != nil {
+ tv.logger.Errorf("failed to check for changes in number of partitions: %w", err)
+ }
+ select {
+ case <-tv.closedCh:
+ // If the TableViewImpl has been closed, stop checking for partition updates
+ return
+ case <-time.After(tv.options.AutoUpdatePartitionsInterval):
+ continue
+ }
+ }
+}
+
+func (tv *TableViewImpl) Size() int {
+ tv.dataMu.Lock()
+ defer tv.dataMu.Unlock()
+ return len(tv.data)
+}
+
+func (tv *TableViewImpl) IsEmpty() bool {
+ tv.dataMu.Lock()
+ defer tv.dataMu.Unlock()
+ return tv.Size() == 0
+}
+
+func (tv *TableViewImpl) ContainsKey(key string) bool {
+ tv.dataMu.Lock()
+ defer tv.dataMu.Unlock()
+ _, ok := tv.data[key]
+ return ok
+}
+
+func (tv *TableViewImpl) Get(key string) interface{} {
+ tv.dataMu.Lock()
+ defer tv.dataMu.Unlock()
+ return tv.data[key]
+}
+
+func (tv *TableViewImpl) Entries() map[string]interface{} {
+ tv.dataMu.Lock()
+ defer tv.dataMu.Unlock()
+ data := make(map[string]interface{}, len(tv.data))
+ for k, v := range tv.data {
+ data[k] = v
+ }
+ return tv.data
+}
+
+func (tv *TableViewImpl) Keys() []string {
+ tv.dataMu.Lock()
+ defer tv.dataMu.Unlock()
+ keys := make([]string, len(tv.data))
+ i := 0
+ for k := range tv.data {
+ keys[i] = k
+ i++
+ }
+ return keys
+}
+
+func (tv *TableViewImpl) ForEach(action func(string, interface{}) error) error {
+ tv.dataMu.Lock()
+ defer tv.dataMu.Unlock()
+ for k, v := range tv.data {
+ if err := action(k, v); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (tv *TableViewImpl) ForEachAndListen(action func(string, interface{}) error) error {
+ tv.listenersMu.Lock()
+ defer tv.listenersMu.Unlock()
+
+ if err := tv.ForEach(action); err != nil {
+ return err
+ }
+
+ tv.listeners = append(tv.listeners, action)
+ return nil
+}
+
+func (tv *TableViewImpl) Close() {
+ tv.readersMu.Lock()
+ defer tv.readersMu.Unlock()
+
+ if !tv.closed {
+ tv.closed = true
+ for _, cancelReader := range tv.cancelRaders {
+ cancelReader.reader.Close()
+ }
+ close(tv.closedCh)
+ }
+}
+
+func (tv *TableViewImpl) handleMessage(msg Message) {
+ tv.dataMu.Lock()
+ defer tv.dataMu.Unlock()
+
+ payload := reflect.Indirect(reflect.New(tv.options.SchemaValueType)).Interface()
+ if err := msg.GetSchemaValue(&payload); err != nil {
+ tv.logger.Errorf("msg.GetSchemaValue() failed with %w; msg is %v", msg, err)
+ }
+ tv.data[msg.Key()] = payload
+ for _, listener := range tv.listeners {
+ if err := listener(msg.Key(), payload); err != nil {
+ tv.logger.Errorf("table view listener failed for %v: %w", msg, err)
+ }
+ }
+}
+
+func (tv *TableViewImpl) watchReaderForNewMessages(ctx context.Context, reader Reader) {
+ for {
+ msg, err := reader.Next(ctx)
+ if err != nil {
+ tv.logger.Errorf("read next message failed for %s: %w", reader.Topic(), err)
+ }
+ if errors.Is(err, context.Canceled) {
+ return
+ }
+ tv.handleMessage(msg)
+ }
+}
diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go
new file mode 100644
index 0000000..829187b
--- /dev/null
+++ b/pulsar/table_view_test.go
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestTableView(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ schema := NewStringSchema(nil)
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Schema: schema,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+
+ numMsg := 10
+ valuePrefix := "hello pulsar: "
+ for i := 0; i < numMsg; i++ {
+ key := fmt.Sprintf("%d", i)
+ t.Log(key)
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Key: key,
+ Value: fmt.Sprintf(valuePrefix + key),
+ })
+ assert.NoError(t, err)
+ }
+
+ // create table view
+ v := ""
+ tv, err := client.CreateTableView(TableViewOptions{
+ Topic: topic,
+ Schema: schema,
+ SchemaValueType: reflect.TypeOf(&v),
+ })
+ assert.NoError(t, err)
+ defer tv.Close()
+
+ // Wait until tv receives all messages
+ for tv.Size() < 10 {
+ time.Sleep(time.Second * 1)
+ t.Logf("TableView number of elements: %d", tv.Size())
+ }
+
+ for k, v := range tv.Entries() {
+ assert.Equal(t, valuePrefix+k, *(v.(*string)))
+ }
+}