You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2022/02/24 21:34:05 UTC
[flink-statefun] branch master updated: [FLINK-26340][statefun-golang-sdk] Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context
This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
The following commit(s) were added to refs/heads/master by this push:
new 725202c [FLINK-26340][statefun-golang-sdk] Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context
725202c is described below
commit 725202cd69b78442d1287deec000ae2d52da4bda
Author: Galen Warren <78...@users.noreply.github.com>
AuthorDate: Tue Feb 22 17:30:26 2022 -0500
[FLINK-26340][statefun-golang-sdk] Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context
Change sync.Mutex in statefunContext to be *sync.Mutex, to make it copyable, and update construction of statefunContext accordingly
Add WithContext to statefun.Context interface and implement on statefunContext
Add unit test
This closes #303
---
docs/content/docs/sdk/golang.md | 21 ++++++++++
statefun-sdk-go/v3/pkg/statefun/context.go | 19 ++++++++-
statefun-sdk-go/v3/pkg/statefun/context_test.go | 51 +++++++++++++++++++++++--
statefun-sdk-go/v3/pkg/statefun/handler.go | 7 +++-
4 files changed, 90 insertions(+), 8 deletions(-)
diff --git a/docs/content/docs/sdk/golang.md b/docs/content/docs/sdk/golang.md
index f2ef316..cfceb5f 100644
--- a/docs/content/docs/sdk/golang.md
+++ b/docs/content/docs/sdk/golang.md
@@ -323,6 +323,27 @@ func (g *Greeter) Invoke(ctx statefun.Context, message: statefun.Message) error
{{< /tab >}}
{{< /tabs >}}
+## Context
+
+The `Context` interface exposed by the Golang SDK -- which is used above to access storage, egresses, and invoke other stateful functions -- embeds the standard Golang `Context` interface from the `context` package. You can further customize the wrapped `Context` using `DeriveContext`, for example:
+
+```
+import (
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+)
+
+func (g *Greeter) Invoke(ctx statefun.Context, message: statefun.Message) error {
+
+ ctx = statefun.DeriveContext(ctx, context.WithValue(ctx, "key", "value"))
+
+ // do something with ctx, which now holds key=value
+
+ return nil
+}
+```
+
+`DeriveContext` accepts a stateful-function `Context` and a standard `Context`; it returns a new stateful-function `Context` that is functionally equivalent to the original stateful-function `Context`, as far as stateful-function operations are concerned, but which wraps the supplied standard `Context` instead.
+
## Serving Functions
The Golang SDK ships with a ``RequestReplyHandler`` that is a standard http `Handler` and automatically dispatches function calls based on RESTful HTTP ``POSTS``.
diff --git a/statefun-sdk-go/v3/pkg/statefun/context.go b/statefun-sdk-go/v3/pkg/statefun/context.go
index 677cfcd..7f99826 100644
--- a/statefun-sdk-go/v3/pkg/statefun/context.go
+++ b/statefun-sdk-go/v3/pkg/statefun/context.go
@@ -18,9 +18,11 @@ package statefun
import (
"context"
- "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+ "errors"
"sync"
"time"
+
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
)
// A Context contains information about the current function invocation, such as the invoked
@@ -64,7 +66,7 @@ type Context interface {
}
type statefunContext struct {
- sync.Mutex
+ *sync.Mutex
context.Context
self Address
caller *Address
@@ -160,3 +162,16 @@ func (s *statefunContext) SendEgress(egress EgressBuilder) {
s.response.OutgoingEgresses = append(s.response.OutgoingEgresses, msg)
s.Unlock()
}
+
+// DeriveContext derives a new statefun.Context from an existing one, replacing
+// the wrapped context.Context.
+func DeriveContext(statefunCtx Context, ctx context.Context) Context {
+ switch value := (statefunCtx).(type) {
+ case *statefunContext:
+ newStatefunContext := *value
+ newStatefunContext.Context = ctx
+ return &newStatefunContext
+ default:
+ panic(errors.New("stateful function context supplied to DeriveContext is not recognized"))
+ }
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/context_test.go b/statefun-sdk-go/v3/pkg/statefun/context_test.go
index 18b3d1c..f12f1f4 100644
--- a/statefun-sdk-go/v3/pkg/statefun/context_test.go
+++ b/statefun-sdk-go/v3/pkg/statefun/context_test.go
@@ -17,11 +17,23 @@
package statefun
import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
- "testing"
- "time"
+)
+
+type testContextKeyType string
+
+const (
+ testContextKey1 = testContextKeyType("key1")
+ testContextValue1 = "value1"
+ testContextKey2 = testContextKeyType("key2")
+ testContextValue2 = "value2"
)
func TestStatefunContext_Send(t *testing.T) {
@@ -159,10 +171,41 @@ func TestStatefunContext_SendEgress_Kinesis(t *testing.T) {
assert.Equal(t, "key", kinesis.PartitionKey, "incorrect kinesis key")
}
-// creates a context with the minimal state to
-// run tests.
+func TestStatefunContext_WithContext(t *testing.T) {
+
+ originalContext := createContext()
+
+ // create a new statefun context with a value added to context
+ newContext := DeriveContext(originalContext, context.WithValue(originalContext, testContextKey2, testContextValue2))
+
+ // Context interface properties should be the same
+ assert.Equal(t, originalContext.Self(), newContext.Self())
+ assert.Equal(t, originalContext.Caller(), newContext.Caller())
+ assert.Equal(t, originalContext.Storage(), newContext.Storage())
+
+ // validate a couple of internals, to ensure the derived context updates the same
+ // response as the original using the same mutex
+ assert.Equal(t, originalContext.Mutex, newContext.(*statefunContext).Mutex)
+ assert.Equal(t, originalContext.response, newContext.(*statefunContext).response)
+
+ // the testContextKey1 key/value should be in both the new context and the original,
+ // i.e. the new context inherited the kv pairs from the original
+ assert.Equal(t, testContextValue1, newContext.Value(testContextKey1))
+ assert.Equal(t, testContextValue1, originalContext.Value(testContextKey1))
+
+ // the testContextKey2 key/value should be in the new context but not the original
+ assert.Equal(t, testContextValue2, newContext.Value(testContextKey2))
+ assert.Nil(t, originalContext.Value(testContextKey2))
+}
+
+// creates a context with the minimal state to run tests.
func createContext() *statefunContext {
return &statefunContext{
+ Context: context.WithValue(context.Background(), testContextKey1, testContextValue1),
+ Mutex: new(sync.Mutex),
+ caller: &Address{FunctionType: TypeNameFrom("namespace/function1"), Id: "1"},
+ self: Address{FunctionType: TypeNameFrom("namespace/function2"), Id: "2"},
+ storage: new(storage),
response: &protocol.FromFunction_InvocationResponse{},
}
}
diff --git a/statefun-sdk-go/v3/pkg/statefun/handler.go b/statefun-sdk-go/v3/pkg/statefun/handler.go
index b86a609..3f9f4f6 100644
--- a/statefun-sdk-go/v3/pkg/statefun/handler.go
+++ b/statefun-sdk-go/v3/pkg/statefun/handler.go
@@ -20,10 +20,12 @@ import (
"bytes"
"context"
"fmt"
- "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
- "google.golang.org/protobuf/proto"
"log"
"net/http"
+ "sync"
+
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+ "google.golang.org/protobuf/proto"
)
// StatefulFunctions is a registry for multiple StatefulFunction's. A RequestReplyHandler
@@ -214,6 +216,7 @@ func (h *handler) invoke(ctx context.Context, toFunction *protocol.ToFunction) (
return nil, ctx.Err()
default:
sContext := statefunContext{
+ Mutex: new(sync.Mutex),
self: self,
storage: storage,
response: response,