You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2023/11/20 09:41:56 UTC
(pulsar-client-go) branch master updated: [Issue 1132] Fix JSONSchema unmarshalling in TableView (#1133)
This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d4574424 [Issue 1132] Fix JSONSchema unmarshalling in TableView (#1133)
d4574424 is described below
commit d457442434c33f0fef7469c24d25de04bc718aa6
Author: Oliver Muir <ol...@arenko.group>
AuthorDate: Mon Nov 20 09:41:50 2023 +0000
[Issue 1132] Fix JSONSchema unmarshalling in TableView (#1133)
Fixes #1132
### Motivation
Fix issue #1132 - using JSONSchema with TableView
### Modifications
- Set a concrete type in the `payload` variable before JSON-unmarshalling into that variable. This allows the JSON package to identify and use the type rather than seeing it as `interface{}`.
- Use `reflect.Indirect(payload).Interface()` when storing the payload and passing it to listeners to remove the pointer from `reflect.New`.
- Add test coverage for `TableView.Get` covering all supported schema types.
- Add test coverage for `TableView.ForEachAndListen` for JSONSchema.
Additional minor changes. They didn't seem worth their own MRs but I'm happy to split them out if that's better.
- Correct typo in comments on `TableView.ForEach` and `TableView.ForEachAndListen` interface methods.
- Correct `TableView.ForEachAndListen` comment to clarify that it continues to call the given action on future messages.
- Correct formatting directive (`%w` -> `%v`) in error log `tv.logger.Errorf("msg.GetSchemaValue() failed with %v; msg is %v", err, msg)`. (This indirectly calls `fmt.Sprintf` in logrus which doesn't support `%w`).
---
pulsar/table_view.go | 6 +-
pulsar/table_view_impl.go | 11 ++-
pulsar/table_view_test.go | 211 ++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 219 insertions(+), 9 deletions(-)
diff --git a/pulsar/table_view.go b/pulsar/table_view.go
index e566bf0b..58a664ae 100644
--- a/pulsar/table_view.go
+++ b/pulsar/table_view.go
@@ -65,12 +65,12 @@ type TableView interface {
// Keys returns a slice of the keys contained in this TableView.
Keys() []string
- // ForEach performs the give action for each entry in this map until all entries have been processed or the action
+ // ForEach performs the given action for each entry in this map until all entries have been processed or the action
// returns an error.
ForEach(func(string, interface{}) error) error
- // ForEachAndListen performs the give action for each entry in this map until all entries have been processed or
- // the action returns an error.
+ // ForEachAndListen performs the given action for each entry in this map until all entries have been processed or
+ // the action returns an error. The given action will then be performed on each new entry in this map.
ForEachAndListen(func(string, interface{}) error) error
// Close closes the table view and releases resources allocated.
diff --git a/pulsar/table_view_impl.go b/pulsar/table_view_impl.go
index 47f8c6c0..17e0b90f 100644
--- a/pulsar/table_view_impl.go
+++ b/pulsar/table_view_impl.go
@@ -245,19 +245,18 @@ func (tv *TableViewImpl) handleMessage(msg Message) {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
- var payload interface{}
+ payload := reflect.New(tv.options.SchemaValueType)
if len(msg.Payload()) == 0 {
delete(tv.data, msg.Key())
} else {
- payload = reflect.Indirect(reflect.New(tv.options.SchemaValueType)).Interface()
- if err := msg.GetSchemaValue(&payload); err != nil {
- tv.logger.Errorf("msg.GetSchemaValue() failed with %w; msg is %v", err, msg)
+ if err := msg.GetSchemaValue(payload.Interface()); err != nil {
+ tv.logger.Errorf("msg.GetSchemaValue() failed with %v; msg is %v", err, msg)
}
- tv.data[msg.Key()] = payload
+ tv.data[msg.Key()] = reflect.Indirect(payload).Interface()
}
for _, listener := range tv.listeners {
- if err := listener(msg.Key(), payload); err != nil {
+ if err := listener(msg.Key(), reflect.Indirect(payload).Interface()); err != nil {
tv.logger.Errorf("table view listener failed for %v: %w", msg, err)
}
}
diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go
index d29b24d2..45b94411 100644
--- a/pulsar/table_view_test.go
+++ b/pulsar/table_view_test.go
@@ -24,6 +24,7 @@ import (
"testing"
"time"
+ pb "github.com/apache/pulsar-client-go/integration-tests/pb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -80,6 +81,157 @@ func TestTableView(t *testing.T) {
}
}
+func TestTableViewSchemas(t *testing.T) {
+ var tests = []struct {
+ name string
+ schema Schema
+ schemaType interface{}
+ producerValue interface{}
+ expValueOut interface{}
+ valueCheck func(t *testing.T, got interface{}) // Overrides expValueOut for more complex checks
+ }{
+ {
+ name: "StringSchema",
+ schema: NewStringSchema(nil),
+ schemaType: strPointer("hello pulsar"),
+ producerValue: "hello pulsar",
+ expValueOut: strPointer("hello pulsar"),
+ },
+ {
+ name: "JSONSchema",
+ schema: NewJSONSchema(exampleSchemaDef, nil),
+ schemaType: testJSON{},
+ producerValue: testJSON{ID: 1, Name: "Pulsar"},
+ expValueOut: testJSON{ID: 1, Name: "Pulsar"},
+ },
+ {
+ name: "JSONSchema pointer type",
+ schema: NewJSONSchema(exampleSchemaDef, nil),
+ schemaType: &testJSON{ID: 1, Name: "Pulsar"},
+ producerValue: testJSON{ID: 1, Name: "Pulsar"},
+ expValueOut: &testJSON{ID: 1, Name: "Pulsar"},
+ },
+ {
+ name: "AvroSchema",
+ schema: NewAvroSchema(exampleSchemaDef, nil),
+ schemaType: testAvro{ID: 1, Name: "Pulsar"},
+ producerValue: testAvro{ID: 1, Name: "Pulsar"},
+ expValueOut: testAvro{ID: 1, Name: "Pulsar"},
+ },
+ {
+ name: "Int8Schema",
+ schema: NewInt8Schema(nil),
+ schemaType: int8(0),
+ producerValue: int8(1),
+ expValueOut: int8(1),
+ },
+ {
+ name: "Int16Schema",
+ schema: NewInt16Schema(nil),
+ schemaType: int16(0),
+ producerValue: int16(1),
+ expValueOut: int16(1),
+ },
+ {
+ name: "Int32Schema",
+ schema: NewInt32Schema(nil),
+ schemaType: int32(0),
+ producerValue: int32(1),
+ expValueOut: int32(1),
+ },
+ {
+ name: "Int64Schema",
+ schema: NewInt64Schema(nil),
+ schemaType: int64(0),
+ producerValue: int64(1),
+ expValueOut: int64(1),
+ },
+ {
+ name: "FloatSchema",
+ schema: NewFloatSchema(nil),
+ schemaType: float32(0),
+ producerValue: float32(1),
+ expValueOut: float32(1),
+ },
+ {
+ name: "DoubleSchema",
+ schema: NewDoubleSchema(nil),
+ schemaType: float64(0),
+ producerValue: float64(1),
+ expValueOut: float64(1),
+ },
+ {
+ name: "ProtoSchema",
+ schema: NewProtoSchema(protoSchemaDef, nil),
+ schemaType: pb.Test{},
+ producerValue: &pb.Test{Num: 1, Msf: "Pulsar"},
+ valueCheck: func(t *testing.T, got interface{}) {
+ assert.IsType(t, pb.Test{}, got)
+ assert.Equal(t, int32(1), got.(pb.Test).Num)
+ assert.Equal(t, "Pulsar", got.(pb.Test).Msf)
+ },
+ },
+ {
+ name: "ProtoNativeSchema",
+ schema: NewProtoNativeSchemaWithMessage(&pb.Test{}, nil),
+ schemaType: pb.Test{},
+ producerValue: &pb.Test{Num: 1, Msf: "Pulsar"},
+ valueCheck: func(t *testing.T, got interface{}) {
+ assert.IsType(t, pb.Test{}, got)
+ assert.Equal(t, int32(1), got.(pb.Test).Num)
+ assert.Equal(t, "Pulsar", got.(pb.Test).Msf)
+ },
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Schema: test.schema,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Key: "testKey",
+ Value: test.producerValue,
+ })
+ assert.NoError(t, err)
+
+ // create table view
+ tv, err := client.CreateTableView(TableViewOptions{
+ Topic: topic,
+ Schema: test.schema,
+ SchemaValueType: reflect.TypeOf(test.schemaType),
+ })
+ assert.NoError(t, err)
+ defer tv.Close()
+
+ value := tv.Get("testKey")
+ if test.valueCheck != nil {
+ test.valueCheck(t, value)
+ } else {
+ assert.IsType(t, test.expValueOut, value)
+ assert.Equal(t, test.expValueOut, value)
+ }
+ })
+ }
+}
+
+func strPointer(s string) *string {
+ return &s
+}
+
func TestPublishNilValue(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
@@ -143,3 +295,62 @@ func TestPublishNilValue(t *testing.T) {
assert.Equal(t, *(tv.Get("key-2").(*string)), "value-2")
}
+
+func TestForEachAndListenJSONSchema(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ schema := NewJSONSchema(exampleSchemaDef, nil)
+
+ // create table view
+ tv, err := client.CreateTableView(TableViewOptions{
+ Topic: topic,
+ Schema: schema,
+ SchemaValueType: reflect.TypeOf(testJSON{}),
+ })
+ assert.NoError(t, err)
+ defer tv.Close()
+
+ // create listener
+ valuePrefix := "hello pulsar: "
+ tv.ForEachAndListen(func(key string, value interface{}) error {
+ t.Log("foreach" + key)
+ s, ok := value.(testJSON)
+ assert.Truef(t, ok, "expected value to be testJSON type got %T", value)
+ assert.Equal(t, fmt.Sprintf(valuePrefix+key), s.Name)
+ return nil
+ })
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Schema: schema,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+
+ numMsg := 10
+ for i := 0; i < numMsg; i++ {
+ key := fmt.Sprintf("%d", i)
+ t.Log("producing" + key)
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Key: key,
+ Value: testJSON{
+ ID: i,
+ Name: fmt.Sprintf(valuePrefix + key),
+ },
+ })
+ assert.NoError(t, err)
+ }
+
+ // Wait until tv receives all messages
+ for tv.Size() < 10 {
+ time.Sleep(time.Second * 1)
+ t.Logf("TableView number of elements: %d", tv.Size())
+ }
+}