You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2023/11/20 09:41:56 UTC

(pulsar-client-go) branch master updated: [Issue 1132] Fix JSONSchema unmarshalling in TableView (#1133)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d4574424 [Issue 1132] Fix JSONSchema unmarshalling in TableView (#1133)
d4574424 is described below

commit d457442434c33f0fef7469c24d25de04bc718aa6
Author: Oliver Muir <ol...@arenko.group>
AuthorDate: Mon Nov 20 09:41:50 2023 +0000

    [Issue 1132] Fix JSONSchema unmarshalling in TableView (#1133)
    
    Fixes #1132
    
    ### Motivation
    Fix issue #1132 - using JSONSchema with TableView
    
    ### Modifications
    
    - Set a concrete type in the `payload` variable before JSON-unmarshalling into that variable. This allows the JSON package to identify and use the type rather than seeing it as `interface{}`.
    - Use `reflect.Indirect(payload).Interface()` when storing the payload and passing it to listeners to remove the pointer from `reflect.New`.
    - Add test coverage for `TableView.Get` covering all supported schema types.
    - Add test coverage for `TableView.ForEachAndListen` for JSONSchema.
    
    Additional minor changes. They didn't seem worth their own MRs but I'm happy to split them out if that's better.
    - Correct typo in comments on `TableView.ForEach` and `TableView.ForEachAndListen` interface methods.
    - Correct `TableView.ForEachAndListen` comment to clarify that it continues to call the given action on future messages.
    - Correct formatting directive (`%w` -> `%v`) in error log `tv.logger.Errorf("msg.GetSchemaValue() failed with %v; msg is %v", err, msg)`. (This indirectly calls `fmt.Sprintf` in logrus which doesn't support `%w`).
---
 pulsar/table_view.go      |   6 +-
 pulsar/table_view_impl.go |  11 ++-
 pulsar/table_view_test.go | 211 ++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 219 insertions(+), 9 deletions(-)

diff --git a/pulsar/table_view.go b/pulsar/table_view.go
index e566bf0b..58a664ae 100644
--- a/pulsar/table_view.go
+++ b/pulsar/table_view.go
@@ -65,12 +65,12 @@ type TableView interface {
 	// Keys returns a slice of the keys contained in this TableView.
 	Keys() []string
 
-	// ForEach performs the give action for each entry in this map until all entries have been processed or the action
+	// ForEach performs the given action for each entry in this map until all entries have been processed or the action
 	// returns an error.
 	ForEach(func(string, interface{}) error) error
 
-	// ForEachAndListen performs the give action for each entry in this map until all entries have been processed or
-	// the action returns an error.
+	// ForEachAndListen performs the given action for each entry in this map until all entries have been processed or
+	// the action returns an error.  The given action will then be performed on each new entry in this map.
 	ForEachAndListen(func(string, interface{}) error) error
 
 	// Close closes the table view and releases resources allocated.
diff --git a/pulsar/table_view_impl.go b/pulsar/table_view_impl.go
index 47f8c6c0..17e0b90f 100644
--- a/pulsar/table_view_impl.go
+++ b/pulsar/table_view_impl.go
@@ -245,19 +245,18 @@ func (tv *TableViewImpl) handleMessage(msg Message) {
 	tv.dataMu.Lock()
 	defer tv.dataMu.Unlock()
 
-	var payload interface{}
+	payload := reflect.New(tv.options.SchemaValueType)
 	if len(msg.Payload()) == 0 {
 		delete(tv.data, msg.Key())
 	} else {
-		payload = reflect.Indirect(reflect.New(tv.options.SchemaValueType)).Interface()
-		if err := msg.GetSchemaValue(&payload); err != nil {
-			tv.logger.Errorf("msg.GetSchemaValue() failed with %w; msg is %v", err, msg)
+		if err := msg.GetSchemaValue(payload.Interface()); err != nil {
+			tv.logger.Errorf("msg.GetSchemaValue() failed with %v; msg is %v", err, msg)
 		}
-		tv.data[msg.Key()] = payload
+		tv.data[msg.Key()] = reflect.Indirect(payload).Interface()
 	}
 
 	for _, listener := range tv.listeners {
-		if err := listener(msg.Key(), payload); err != nil {
+		if err := listener(msg.Key(), reflect.Indirect(payload).Interface()); err != nil {
 			tv.logger.Errorf("table view listener failed for %v: %w", msg, err)
 		}
 	}
diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go
index d29b24d2..45b94411 100644
--- a/pulsar/table_view_test.go
+++ b/pulsar/table_view_test.go
@@ -24,6 +24,7 @@ import (
 	"testing"
 	"time"
 
+	pb "github.com/apache/pulsar-client-go/integration-tests/pb"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 )
@@ -80,6 +81,157 @@ func TestTableView(t *testing.T) {
 	}
 }
 
+func TestTableViewSchemas(t *testing.T) {
+	var tests = []struct {
+		name          string
+		schema        Schema
+		schemaType    interface{}
+		producerValue interface{}
+		expValueOut   interface{}
+		valueCheck    func(t *testing.T, got interface{}) // Overrides expValueOut for more complex checks
+	}{
+		{
+			name:          "StringSchema",
+			schema:        NewStringSchema(nil),
+			schemaType:    strPointer("hello pulsar"),
+			producerValue: "hello pulsar",
+			expValueOut:   strPointer("hello pulsar"),
+		},
+		{
+			name:          "JSONSchema",
+			schema:        NewJSONSchema(exampleSchemaDef, nil),
+			schemaType:    testJSON{},
+			producerValue: testJSON{ID: 1, Name: "Pulsar"},
+			expValueOut:   testJSON{ID: 1, Name: "Pulsar"},
+		},
+		{
+			name:          "JSONSchema pointer type",
+			schema:        NewJSONSchema(exampleSchemaDef, nil),
+			schemaType:    &testJSON{ID: 1, Name: "Pulsar"},
+			producerValue: testJSON{ID: 1, Name: "Pulsar"},
+			expValueOut:   &testJSON{ID: 1, Name: "Pulsar"},
+		},
+		{
+			name:          "AvroSchema",
+			schema:        NewAvroSchema(exampleSchemaDef, nil),
+			schemaType:    testAvro{ID: 1, Name: "Pulsar"},
+			producerValue: testAvro{ID: 1, Name: "Pulsar"},
+			expValueOut:   testAvro{ID: 1, Name: "Pulsar"},
+		},
+		{
+			name:          "Int8Schema",
+			schema:        NewInt8Schema(nil),
+			schemaType:    int8(0),
+			producerValue: int8(1),
+			expValueOut:   int8(1),
+		},
+		{
+			name:          "Int16Schema",
+			schema:        NewInt16Schema(nil),
+			schemaType:    int16(0),
+			producerValue: int16(1),
+			expValueOut:   int16(1),
+		},
+		{
+			name:          "Int32Schema",
+			schema:        NewInt32Schema(nil),
+			schemaType:    int32(0),
+			producerValue: int32(1),
+			expValueOut:   int32(1),
+		},
+		{
+			name:          "Int64Schema",
+			schema:        NewInt64Schema(nil),
+			schemaType:    int64(0),
+			producerValue: int64(1),
+			expValueOut:   int64(1),
+		},
+		{
+			name:          "FloatSchema",
+			schema:        NewFloatSchema(nil),
+			schemaType:    float32(0),
+			producerValue: float32(1),
+			expValueOut:   float32(1),
+		},
+		{
+			name:          "DoubleSchema",
+			schema:        NewDoubleSchema(nil),
+			schemaType:    float64(0),
+			producerValue: float64(1),
+			expValueOut:   float64(1),
+		},
+		{
+			name:          "ProtoSchema",
+			schema:        NewProtoSchema(protoSchemaDef, nil),
+			schemaType:    pb.Test{},
+			producerValue: &pb.Test{Num: 1, Msf: "Pulsar"},
+			valueCheck: func(t *testing.T, got interface{}) {
+				assert.IsType(t, pb.Test{}, got)
+				assert.Equal(t, int32(1), got.(pb.Test).Num)
+				assert.Equal(t, "Pulsar", got.(pb.Test).Msf)
+			},
+		},
+		{
+			name:          "ProtoNativeSchema",
+			schema:        NewProtoNativeSchemaWithMessage(&pb.Test{}, nil),
+			schemaType:    pb.Test{},
+			producerValue: &pb.Test{Num: 1, Msf: "Pulsar"},
+			valueCheck: func(t *testing.T, got interface{}) {
+				assert.IsType(t, pb.Test{}, got)
+				assert.Equal(t, int32(1), got.(pb.Test).Num)
+				assert.Equal(t, "Pulsar", got.(pb.Test).Msf)
+			},
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			client, err := NewClient(ClientOptions{
+				URL: lookupURL,
+			})
+
+			assert.NoError(t, err)
+			defer client.Close()
+
+			topic := newTopicName()
+
+			// create producer
+			producer, err := client.CreateProducer(ProducerOptions{
+				Topic:  topic,
+				Schema: test.schema,
+			})
+			assert.NoError(t, err)
+			defer producer.Close()
+
+			_, err = producer.Send(context.Background(), &ProducerMessage{
+				Key:   "testKey",
+				Value: test.producerValue,
+			})
+			assert.NoError(t, err)
+
+			// create table view
+			tv, err := client.CreateTableView(TableViewOptions{
+				Topic:           topic,
+				Schema:          test.schema,
+				SchemaValueType: reflect.TypeOf(test.schemaType),
+			})
+			assert.NoError(t, err)
+			defer tv.Close()
+
+			value := tv.Get("testKey")
+			if test.valueCheck != nil {
+				test.valueCheck(t, value)
+			} else {
+				assert.IsType(t, test.expValueOut, value)
+				assert.Equal(t, test.expValueOut, value)
+			}
+		})
+	}
+}
+
+func strPointer(s string) *string {
+	return &s
+}
+
 func TestPublishNilValue(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
@@ -143,3 +295,62 @@ func TestPublishNilValue(t *testing.T) {
 
 	assert.Equal(t, *(tv.Get("key-2").(*string)), "value-2")
 }
+
+func TestForEachAndListenJSONSchema(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.NoError(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	schema := NewJSONSchema(exampleSchemaDef, nil)
+
+	// create table view
+	tv, err := client.CreateTableView(TableViewOptions{
+		Topic:           topic,
+		Schema:          schema,
+		SchemaValueType: reflect.TypeOf(testJSON{}),
+	})
+	assert.NoError(t, err)
+	defer tv.Close()
+
+	// create listener
+	valuePrefix := "hello pulsar: "
+	tv.ForEachAndListen(func(key string, value interface{}) error {
+		t.Log("foreach" + key)
+		s, ok := value.(testJSON)
+		assert.Truef(t, ok, "expected value to be testJSON type got %T", value)
+		assert.Equal(t, fmt.Sprintf(valuePrefix+key), s.Name)
+		return nil
+	})
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  topic,
+		Schema: schema,
+	})
+	assert.NoError(t, err)
+	defer producer.Close()
+
+	numMsg := 10
+	for i := 0; i < numMsg; i++ {
+		key := fmt.Sprintf("%d", i)
+		t.Log("producing" + key)
+		_, err = producer.Send(context.Background(), &ProducerMessage{
+			Key: key,
+			Value: testJSON{
+				ID:   i,
+				Name: fmt.Sprintf(valuePrefix + key),
+			},
+		})
+		assert.NoError(t, err)
+	}
+
+	// Wait until tv receives all messages
+	for tv.Size() < 10 {
+		time.Sleep(time.Second * 1)
+		t.Logf("TableView number of elements: %d", tv.Size())
+	}
+}