You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/03/31 17:43:02 UTC

[pulsar-client-go] branch master updated: Add TableView support (#743)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e1b39d  Add TableView support (#743)
3e1b39d is described below

commit 3e1b39dd6f88aa0a12b949b1d1e064bde6355e22
Author: Ziyao Wei <zi...@gmail.com>
AuthorDate: Thu Mar 31 13:42:56 2022 -0400

    Add TableView support (#743)
    
    * Add TableView for Pulsar Golang client
    
    * cckellogg
    
    * Return a copy of the underlying map in tv.Entries()
---
 pulsar/client.go          |   4 +
 pulsar/client_impl.go     |   9 ++
 pulsar/table_view.go      |  78 +++++++++++++
 pulsar/table_view_impl.go | 271 ++++++++++++++++++++++++++++++++++++++++++++++
 pulsar/table_view_test.go |  80 ++++++++++++++
 5 files changed, 442 insertions(+)

diff --git a/pulsar/client.go b/pulsar/client.go
index 8ff152a..f4642c6 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -141,6 +141,10 @@ type Client interface {
 	// This method will block until the reader is created successfully.
 	CreateReader(ReaderOptions) (Reader, error)
 
+	// CreateTableView creates a table view instance.
+	// This method will block until the table view is created successfully.
+	CreateTableView(TableViewOptions) (TableView, error)
+
 	// TopicPartitions Fetches the list of partitions for a given topic
 	//
 	// If the topic is partitioned, this will return a list of partition names.
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 5682927..09b72af 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -176,6 +176,15 @@ func (c *client) CreateReader(options ReaderOptions) (Reader, error) {
 	return reader, nil
 }
 
+func (c *client) CreateTableView(options TableViewOptions) (TableView, error) {
+	tableView, err := newTableView(c, options)
+	if err != nil {
+		return nil, err
+	}
+	c.handlers.Add(tableView)
+	return tableView, nil
+}
+
 func (c *client) TopicPartitions(topic string) ([]string, error) {
 	topicName, err := internal.ParseTopicName(topic)
 	if err != nil {
diff --git a/pulsar/table_view.go b/pulsar/table_view.go
new file mode 100644
index 0000000..e566bf0
--- /dev/null
+++ b/pulsar/table_view.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"reflect"
+	"time"
+
+	"github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+// TableViewOptions contains the options for creating a TableView
+type TableViewOptions struct {
+	// Topic specifies the topic this table view will subscribe on.
+	// This argument is required when constructing the table view.
+	Topic string
+
+	// Set the interval of updating partitions. Default to 1 minute.
+	AutoUpdatePartitionsInterval time.Duration
+
+	// Schema represents the schema implementation.
+	Schema Schema
+
+	// SchemaValueType represents the type of values for the given schema.
+	SchemaValueType reflect.Type
+
+	// Configure the logger used by the TableView.
+	// By default, a wrapped logrus.StandardLogger will be used, namely,
+	// log.NewLoggerWithLogrus(logrus.StandardLogger())
+	Logger log.Logger
+}
+
+// TableView provides a key-value map view of a compacted topic. Messages without keys will be ignored.
+type TableView interface {
+	// Size returns the number of key-value mappings in the TableView.
+	Size() int
+
+	// IsEmpty returns true if this TableView contains no key-value mappings.
+	IsEmpty() bool
+
+	// ContainsKey returns true if this TableView contains a mapping for the specified key.
+	ContainsKey(key string) bool
+
+	// Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key.
+	Get(key string) interface{}
+
+	// Entries returns a map view of the mappings contained in this TableView.
+	Entries() map[string]interface{}
+
+	// Keys returns a slice of the keys contained in this TableView.
+	Keys() []string
+
+	// ForEach performs the give action for each entry in this map until all entries have been processed or the action
+	// returns an error.
+	ForEach(func(string, interface{}) error) error
+
+	// ForEachAndListen performs the give action for each entry in this map until all entries have been processed or
+	// the action returns an error.
+	ForEachAndListen(func(string, interface{}) error) error
+
+	// Close closes the table view and releases resources allocated.
+	Close()
+}
diff --git a/pulsar/table_view_impl.go b/pulsar/table_view_impl.go
new file mode 100644
index 0000000..56b9fcb
--- /dev/null
+++ b/pulsar/table_view_impl.go
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"reflect"
+	"sync"
+	"time"
+
+	"github.com/apache/pulsar-client-go/pulsar/log"
+	"github.com/sirupsen/logrus"
+)
+
+type cancelReader struct {
+	reader     Reader
+	cancelFunc context.CancelFunc
+}
+
+type TableViewImpl struct {
+	client  *client
+	options TableViewOptions
+
+	dataMu sync.Mutex
+	data   map[string]interface{}
+
+	readersMu    sync.Mutex
+	cancelRaders map[string]cancelReader
+
+	listenersMu sync.Mutex
+	listeners   []func(string, interface{}) error
+
+	logger   log.Logger
+	closed   bool
+	closedCh chan struct{}
+}
+
+func newTableView(client *client, options TableViewOptions) (TableView, error) {
+	if options.Topic == "" {
+		return nil, newError(TopicNotFound, "topic is required")
+	}
+
+	if options.Schema != nil && options.SchemaValueType == nil {
+		return nil, newError(InvalidConfiguration, "SchemaValueType is required when Schema is present")
+	}
+
+	var logger log.Logger
+	if options.Logger != nil {
+		logger = options.Logger
+	} else {
+		logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
+	}
+
+	if options.AutoUpdatePartitionsInterval == 0 {
+		options.AutoUpdatePartitionsInterval = time.Minute
+	}
+
+	tv := TableViewImpl{
+		client:       client,
+		options:      options,
+		data:         make(map[string]interface{}),
+		cancelRaders: make(map[string]cancelReader),
+		logger:       logger,
+		closedCh:     make(chan struct{}),
+	}
+
+	// Do an initial round of partition update check to make sure we can populate the partition readers
+	if err := tv.partitionUpdateCheck(); err != nil {
+		return nil, err
+	}
+	go tv.periodicPartitionUpdateCheck()
+
+	return &tv, nil
+}
+
+func (tv *TableViewImpl) partitionUpdateCheck() error {
+	partitionsArray, err := tv.client.TopicPartitions(tv.options.Topic)
+	if err != nil {
+		return fmt.Errorf("tv.client.TopicPartitions(%s) failed: %w", tv.options.Topic, err)
+	}
+
+	partitions := make(map[string]bool, len(partitionsArray))
+	for _, partition := range partitionsArray {
+		partitions[partition] = true
+	}
+
+	tv.readersMu.Lock()
+	defer tv.readersMu.Unlock()
+
+	for partition, cancelReader := range tv.cancelRaders {
+		if _, ok := partitions[partition]; !ok {
+			cancelReader.cancelFunc()
+			cancelReader.reader.Close()
+			delete(tv.cancelRaders, partition)
+		}
+	}
+
+	for partition := range partitions {
+		if _, ok := tv.cancelRaders[partition]; !ok {
+			reader, err := newReader(tv.client, ReaderOptions{
+				Topic:          partition,
+				StartMessageID: EarliestMessageID(),
+				ReadCompacted:  true,
+				// TODO: Pooling?
+				Schema: tv.options.Schema,
+			})
+			if err != nil {
+				return fmt.Errorf("create new reader failed for %s: %w", partition, err)
+			}
+			for reader.HasNext() {
+				msg, err := reader.Next(context.Background())
+				if err != nil {
+					tv.logger.Errorf("read next message failed for %s: %w", partition, err)
+				}
+				tv.handleMessage(msg)
+			}
+			ctx, cancelFunc := context.WithCancel(context.Background())
+			tv.cancelRaders[partition] = cancelReader{
+				reader:     reader,
+				cancelFunc: cancelFunc,
+			}
+			go tv.watchReaderForNewMessages(ctx, reader)
+		}
+	}
+
+	return nil
+}
+
+func (tv *TableViewImpl) periodicPartitionUpdateCheck() {
+	for {
+		if err := tv.partitionUpdateCheck(); err != nil {
+			tv.logger.Errorf("failed to check for changes in number of partitions: %w", err)
+		}
+		select {
+		case <-tv.closedCh:
+			// If the TableViewImpl has been closed, stop checking for partition updates
+			return
+		case <-time.After(tv.options.AutoUpdatePartitionsInterval):
+			continue
+		}
+	}
+}
+
+func (tv *TableViewImpl) Size() int {
+	tv.dataMu.Lock()
+	defer tv.dataMu.Unlock()
+	return len(tv.data)
+}
+
+func (tv *TableViewImpl) IsEmpty() bool {
+	tv.dataMu.Lock()
+	defer tv.dataMu.Unlock()
+	return tv.Size() == 0
+}
+
+func (tv *TableViewImpl) ContainsKey(key string) bool {
+	tv.dataMu.Lock()
+	defer tv.dataMu.Unlock()
+	_, ok := tv.data[key]
+	return ok
+}
+
+func (tv *TableViewImpl) Get(key string) interface{} {
+	tv.dataMu.Lock()
+	defer tv.dataMu.Unlock()
+	return tv.data[key]
+}
+
+func (tv *TableViewImpl) Entries() map[string]interface{} {
+	tv.dataMu.Lock()
+	defer tv.dataMu.Unlock()
+	data := make(map[string]interface{}, len(tv.data))
+	for k, v := range tv.data {
+		data[k] = v
+	}
+	return tv.data
+}
+
+func (tv *TableViewImpl) Keys() []string {
+	tv.dataMu.Lock()
+	defer tv.dataMu.Unlock()
+	keys := make([]string, len(tv.data))
+	i := 0
+	for k := range tv.data {
+		keys[i] = k
+		i++
+	}
+	return keys
+}
+
+func (tv *TableViewImpl) ForEach(action func(string, interface{}) error) error {
+	tv.dataMu.Lock()
+	defer tv.dataMu.Unlock()
+	for k, v := range tv.data {
+		if err := action(k, v); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (tv *TableViewImpl) ForEachAndListen(action func(string, interface{}) error) error {
+	tv.listenersMu.Lock()
+	defer tv.listenersMu.Unlock()
+
+	if err := tv.ForEach(action); err != nil {
+		return err
+	}
+
+	tv.listeners = append(tv.listeners, action)
+	return nil
+}
+
+func (tv *TableViewImpl) Close() {
+	tv.readersMu.Lock()
+	defer tv.readersMu.Unlock()
+
+	if !tv.closed {
+		tv.closed = true
+		for _, cancelReader := range tv.cancelRaders {
+			cancelReader.reader.Close()
+		}
+		close(tv.closedCh)
+	}
+}
+
+func (tv *TableViewImpl) handleMessage(msg Message) {
+	tv.dataMu.Lock()
+	defer tv.dataMu.Unlock()
+
+	payload := reflect.Indirect(reflect.New(tv.options.SchemaValueType)).Interface()
+	if err := msg.GetSchemaValue(&payload); err != nil {
+		tv.logger.Errorf("msg.GetSchemaValue() failed with %w; msg is %v", msg, err)
+	}
+	tv.data[msg.Key()] = payload
+	for _, listener := range tv.listeners {
+		if err := listener(msg.Key(), payload); err != nil {
+			tv.logger.Errorf("table view listener failed for %v: %w", msg, err)
+		}
+	}
+}
+
+func (tv *TableViewImpl) watchReaderForNewMessages(ctx context.Context, reader Reader) {
+	for {
+		msg, err := reader.Next(ctx)
+		if err != nil {
+			tv.logger.Errorf("read next message failed for %s: %w", reader.Topic(), err)
+		}
+		if errors.Is(err, context.Canceled) {
+			return
+		}
+		tv.handleMessage(msg)
+	}
+}
diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go
new file mode 100644
index 0000000..829187b
--- /dev/null
+++ b/pulsar/table_view_test.go
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"fmt"
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestTableView(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.NoError(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	schema := NewStringSchema(nil)
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  topic,
+		Schema: schema,
+	})
+	assert.NoError(t, err)
+	defer producer.Close()
+
+	numMsg := 10
+	valuePrefix := "hello pulsar: "
+	for i := 0; i < numMsg; i++ {
+		key := fmt.Sprintf("%d", i)
+		t.Log(key)
+		_, err = producer.Send(context.Background(), &ProducerMessage{
+			Key:   key,
+			Value: fmt.Sprintf(valuePrefix + key),
+		})
+		assert.NoError(t, err)
+	}
+
+	// create table view
+	v := ""
+	tv, err := client.CreateTableView(TableViewOptions{
+		Topic:           topic,
+		Schema:          schema,
+		SchemaValueType: reflect.TypeOf(&v),
+	})
+	assert.NoError(t, err)
+	defer tv.Close()
+
+	// Wait until tv receives all messages
+	for tv.Size() < 10 {
+		time.Sleep(time.Second * 1)
+		t.Logf("TableView number of elements: %d", tv.Size())
+	}
+
+	for k, v := range tv.Entries() {
+		assert.Equal(t, valuePrefix+k, *(v.(*string)))
+	}
+}