You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2022/02/24 21:34:05 UTC

[flink-statefun] branch master updated: [FLINK-26340][statefun-golang-sdk] Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git


The following commit(s) were added to refs/heads/master by this push:
     new 725202c  [FLINK-26340][statefun-golang-sdk] Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context
725202c is described below

commit 725202cd69b78442d1287deec000ae2d52da4bda
Author: Galen Warren <78...@users.noreply.github.com>
AuthorDate: Tue Feb 22 17:30:26 2022 -0500

    [FLINK-26340][statefun-golang-sdk] Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context
    
    Change sync.Mutex in statefunContext to be *sync.Mutex, to make it copyable, and update construction of statefunContext accordingly
    Add WithContext to statefun.Context interface and implement on statefunContext
    Add unit test
    
    This closes #303
---
 docs/content/docs/sdk/golang.md                 | 21 ++++++++++
 statefun-sdk-go/v3/pkg/statefun/context.go      | 19 ++++++++-
 statefun-sdk-go/v3/pkg/statefun/context_test.go | 51 +++++++++++++++++++++++--
 statefun-sdk-go/v3/pkg/statefun/handler.go      |  7 +++-
 4 files changed, 90 insertions(+), 8 deletions(-)

diff --git a/docs/content/docs/sdk/golang.md b/docs/content/docs/sdk/golang.md
index f2ef316..cfceb5f 100644
--- a/docs/content/docs/sdk/golang.md
+++ b/docs/content/docs/sdk/golang.md
@@ -323,6 +323,27 @@ func (g *Greeter) Invoke(ctx statefun.Context, message: statefun.Message) error
 {{< /tab >}}
 {{< /tabs >}}
 
+## Context
+
+The `Context` interface exposed by the Golang SDK -- which is used above to access storage, egresses, and invoke other stateful functions -- embeds the standard Golang `Context` interface from the `context` package. You can further customize the wrapped `Context` using `DeriveContext`, for example:
+
+```
+import (
+    "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+)
+
+func (g *Greeter) Invoke(ctx statefun.Context, message: statefun.Message) error {
+
+    ctx = statefun.DeriveContext(ctx, context.WithValue(ctx, "key", "value"))
+
+    // do something with ctx, which now holds key=value
+
+    return nil
+}
+```
+
+`DeriveContext` accepts a stateful-function `Context` and a standard `Context`; it returns a new stateful-function `Context` that is functionally equivalent to the original stateful-function `Context`, as far as stateful-function operations are concerned, but which wraps the supplied standard `Context` instead. 
+
 ## Serving Functions
 
 The Golang SDK ships with a ``RequestReplyHandler`` that is a standard http `Handler` and automatically dispatches function calls based on RESTful HTTP ``POSTS``.
diff --git a/statefun-sdk-go/v3/pkg/statefun/context.go b/statefun-sdk-go/v3/pkg/statefun/context.go
index 677cfcd..7f99826 100644
--- a/statefun-sdk-go/v3/pkg/statefun/context.go
+++ b/statefun-sdk-go/v3/pkg/statefun/context.go
@@ -18,9 +18,11 @@ package statefun
 
 import (
 	"context"
-	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+	"errors"
 	"sync"
 	"time"
+
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
 )
 
 // A Context contains information about the current function invocation, such as the invoked
@@ -64,7 +66,7 @@ type Context interface {
 }
 
 type statefunContext struct {
-	sync.Mutex
+	*sync.Mutex
 	context.Context
 	self     Address
 	caller   *Address
@@ -160,3 +162,16 @@ func (s *statefunContext) SendEgress(egress EgressBuilder) {
 	s.response.OutgoingEgresses = append(s.response.OutgoingEgresses, msg)
 	s.Unlock()
 }
+
+// DeriveContext derives a new statefun.Context from an existing one, replacing
+// the wrapped context.Context.
+func DeriveContext(statefunCtx Context, ctx context.Context) Context {
+	switch value := (statefunCtx).(type) {
+	case *statefunContext:
+		newStatefunContext := *value
+		newStatefunContext.Context = ctx
+		return &newStatefunContext
+	default:
+		panic(errors.New("stateful function context supplied to DeriveContext is not recognized"))
+	}
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/context_test.go b/statefun-sdk-go/v3/pkg/statefun/context_test.go
index 18b3d1c..f12f1f4 100644
--- a/statefun-sdk-go/v3/pkg/statefun/context_test.go
+++ b/statefun-sdk-go/v3/pkg/statefun/context_test.go
@@ -17,11 +17,23 @@
 package statefun
 
 import (
+	"context"
+	"sync"
+	"testing"
+	"time"
+
 	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
 	"github.com/stretchr/testify/assert"
 	"google.golang.org/protobuf/proto"
-	"testing"
-	"time"
+)
+
+type testContextKeyType string
+
+const (
+	testContextKey1   = testContextKeyType("key1")
+	testContextValue1 = "value1"
+	testContextKey2   = testContextKeyType("key2")
+	testContextValue2 = "value2"
 )
 
 func TestStatefunContext_Send(t *testing.T) {
@@ -159,10 +171,41 @@ func TestStatefunContext_SendEgress_Kinesis(t *testing.T) {
 	assert.Equal(t, "key", kinesis.PartitionKey, "incorrect kinesis key")
 }
 
-// creates a context with the minimal state to
-// run tests.
+func TestStatefunContext_WithContext(t *testing.T) {
+
+	originalContext := createContext()
+
+	// create a new statefun context with a value added to context
+	newContext := DeriveContext(originalContext, context.WithValue(originalContext, testContextKey2, testContextValue2))
+
+	// Context interface properties should be the same
+	assert.Equal(t, originalContext.Self(), newContext.Self())
+	assert.Equal(t, originalContext.Caller(), newContext.Caller())
+	assert.Equal(t, originalContext.Storage(), newContext.Storage())
+
+	// validate a couple of internals, to ensure the derived context updates the same
+	// response as the original using the same mutex
+	assert.Equal(t, originalContext.Mutex, newContext.(*statefunContext).Mutex)
+	assert.Equal(t, originalContext.response, newContext.(*statefunContext).response)
+
+	// the testContextKey1 key/value should be in both the new context and the original,
+	// i.e. the new context inherited the kv pairs from the original
+	assert.Equal(t, testContextValue1, newContext.Value(testContextKey1))
+	assert.Equal(t, testContextValue1, originalContext.Value(testContextKey1))
+
+	// the testContextKey2 key/value should be in the new context but not the original
+	assert.Equal(t, testContextValue2, newContext.Value(testContextKey2))
+	assert.Nil(t, originalContext.Value(testContextKey2))
+}
+
+// creates a context with the minimal state to run tests.
 func createContext() *statefunContext {
 	return &statefunContext{
+		Context:  context.WithValue(context.Background(), testContextKey1, testContextValue1),
+		Mutex:    new(sync.Mutex),
+		caller:   &Address{FunctionType: TypeNameFrom("namespace/function1"), Id: "1"},
+		self:     Address{FunctionType: TypeNameFrom("namespace/function2"), Id: "2"},
+		storage:  new(storage),
 		response: &protocol.FromFunction_InvocationResponse{},
 	}
 }
diff --git a/statefun-sdk-go/v3/pkg/statefun/handler.go b/statefun-sdk-go/v3/pkg/statefun/handler.go
index b86a609..3f9f4f6 100644
--- a/statefun-sdk-go/v3/pkg/statefun/handler.go
+++ b/statefun-sdk-go/v3/pkg/statefun/handler.go
@@ -20,10 +20,12 @@ import (
 	"bytes"
 	"context"
 	"fmt"
-	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
-	"google.golang.org/protobuf/proto"
 	"log"
 	"net/http"
+	"sync"
+
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+	"google.golang.org/protobuf/proto"
 )
 
 // StatefulFunctions is a registry for multiple StatefulFunction's. A RequestReplyHandler
@@ -214,6 +216,7 @@ func (h *handler) invoke(ctx context.Context, toFunction *protocol.ToFunction) (
 			return nil, ctx.Err()
 		default:
 			sContext := statefunContext{
+				Mutex:    new(sync.Mutex),
 				self:     self,
 				storage:  storage,
 				response: response,