You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/08/19 15:01:23 UTC

[beam] branch master updated: [Go SDK] Fix go lint errors (#22796)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8912f4d3584 [Go SDK] Fix go lint errors (#22796)
8912f4d3584 is described below

commit 8912f4d3584c2916de6d1e865f2c055aeac8700b
Author: Ritesh Ghorse <ri...@gmail.com>
AuthorDate: Fri Aug 19 11:01:15 2022 -0400

    [Go SDK] Fix go lint errors (#22796)
    
    * fix go lint errors
    
    * fix return
---
 sdks/go/pkg/beam/core/funcx/fn.go                 |  2 +-
 sdks/go/pkg/beam/core/graph/fn.go                 | 14 +++++++-------
 sdks/go/pkg/beam/core/runtime/exec/fn.go          |  6 +++---
 sdks/go/pkg/beam/core/runtime/exec/translate.go   |  4 ++--
 sdks/go/pkg/beam/core/runtime/graphx/translate.go | 16 ++++++++--------
 sdks/go/pkg/beam/core/state/state.go              | 18 +++++++++++-------
 sdks/go/pkg/beam/core/state/state_test.go         | 18 +++++++++---------
 7 files changed, 41 insertions(+), 37 deletions(-)

diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go
index 788d6d52df6..89d92529f01 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -83,7 +83,7 @@ const (
 	FnBundleFinalization FnParamKind = 0x800
 	// FnWatermarkEstimator indicates a function input parameter that implements sdf.WatermarkEstimator
 	FnWatermarkEstimator FnParamKind = 0x1000
-	// FnState indicates a function input parameter that implements state.Provider
+	// FnStateProvider indicates a function input parameter that implements state.Provider
 	FnStateProvider FnParamKind = 0x2000
 )
 
diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go
index 2034349cdf2..bc54ad3362d 100644
--- a/sdks/go/pkg/beam/core/graph/fn.go
+++ b/sdks/go/pkg/beam/core/graph/fn.go
@@ -1272,15 +1272,15 @@ func validateState(fn *DoFn, numIn mainInputs) error {
 				err := errors.Errorf("Duplicate state key %v", k)
 				return errors.SetTopLevelMsgf(err, "Duplicate state key %v used by %v and %v. Ensure that state keys are"+
 					"unique per DoFn", k, orig, s)
-			} else {
-				stateKeys[k] = s
 			}
+			stateKeys[k] = s
+		}
+		if len(ps) > 0 {
+			// TODO(#22736) - Remove this once state is fully supported
+			err := errors.Errorf("ProcessElement uses a StateProvider, but state is not supported in this release.")
+			return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but state is not supported in this release. "+
+				"Please try upgrading to a newer release if one exists or wait for state support to be released.")
 		}
-
-		// TODO(#22736) - Remove this once state is fully supported
-		err := errors.Errorf("ProcessElement uses a StateProvider, but state is not supported in this release.")
-		return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but state is not supported in this release. "+
-			"Please try upgrading to a newer release if one exists or wait for state support to be released.")
 	} else {
 		if len(ps) > 0 {
 			err := errors.Errorf("ProcessElement doesn't use a StateProvider, but State structs are attached to "+
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 51f3296ca44..c07c45745e1 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -73,15 +73,15 @@ type stateProvider struct {
 }
 
 // ReadValueState reads a value state from the State API
-func (s *stateProvider) ReadValueState(userStateId string) (interface{}, []state.Transaction, error) {
+func (s *stateProvider) ReadValueState(userStateID string) (interface{}, []state.Transaction, error) {
 	// TODO(#22736) - read from the state api.
-	return nil, nil, errors.New("Stateful DoFns are not supported yet.")
+	return nil, nil, errors.New("stateful DoFns are not supported yet")
 }
 
 // WriteValueState writes a value state to the State API
 func (s *stateProvider) WriteValueState(val state.Transaction) error {
 	// TODO(#22736) - read from the state api.
-	return errors.New("Stateful DoFns are not supported yet.")
+	return errors.New("stateful DoFns are not supported yet")
 }
 
 // Invoke invokes the fn with the given values. The extra values must match the non-main
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 53ddf7843c2..5c66c609aee 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -466,7 +466,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 					input := unmarshalKeyedValues(transform.GetInputs())
 
 					if len(userState) > 0 {
-						stateIdToCoder := make(map[string]*coder.Coder)
+						stateIDToCoder := make(map[string]*coder.Coder)
 						for key, spec := range userState {
 							// TODO(#22736) - this will eventually need to be aware of which type of state its modifying to support non-Value state types.
 							cID := spec.GetReadModifyWriteSpec().CoderId
@@ -474,7 +474,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 							if err != nil {
 								return nil, err
 							}
-							stateIdToCoder[key] = c
+							stateIDToCoder[key] = c
 						}
 					}
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 653dacce7a3..a5ef6d75e5d 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -448,11 +448,11 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
 			SideInputs: si,
 		}
 		if edge.Edge.DoFn.IsSplittable() {
-			coderId, err := m.coders.Add(edge.Edge.RestrictionCoder)
+			coderID, err := m.coders.Add(edge.Edge.RestrictionCoder)
 			if err != nil {
 				return handleErr(err)
 			}
-			payload.RestrictionCoderId = coderId
+			payload.RestrictionCoderId = coderID
 			m.requirements[URNRequiresSplittableDoFn] = true
 		}
 		if _, ok := edge.Edge.DoFn.ProcessElementFn().BundleFinalization(); ok {
@@ -462,7 +462,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
 			m.requirements[URNRequiresStatefulProcessing] = true
 			stateSpecs := make(map[string]*pipepb.StateSpec)
 			for _, ps := range edge.Edge.DoFn.PipelineState() {
-				coderId, err := m.coders.Add(edge.Edge.StateCoders[ps.StateKey()])
+				coderID, err := m.coders.Add(edge.Edge.StateCoders[ps.StateKey()])
 				if err != nil {
 					return handleErr(err)
 				}
@@ -471,7 +471,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
 					// See https://github.com/apache/beam/blob/54b0784da7ccba738deff22bd83fbc374ad21d2e/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go#L2635
 					Spec: &pipepb.StateSpec_ReadModifyWriteSpec{
 						ReadModifyWriteSpec: &pipepb.ReadModifyWriteStateSpec{
-							CoderId: coderId,
+							CoderId: coderID,
 						},
 					},
 					Protocol: &pipepb.FunctionSpec{
@@ -829,11 +829,11 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) (string, error) {
 		if err != nil {
 			return handleErr(err)
 		}
-		coderId, err := makeWindowCoder(wfn)
+		coderID, err := makeWindowCoder(wfn)
 		if err != nil {
 			return handleErr(err)
 		}
-		windowCoderId, err := m.coders.AddWindowCoder(coderId)
+		windowCoderId, err := m.coders.AddWindowCoder(coderID)
 		if err != nil {
 			return handleErr(err)
 		}
@@ -1054,11 +1054,11 @@ func MarshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (
 	if err != nil {
 		return nil, err
 	}
-	coderId, err := makeWindowCoder(w.Fn)
+	coderID, err := makeWindowCoder(w.Fn)
 	if err != nil {
 		return nil, err
 	}
-	windowCoderId, err := c.AddWindowCoder(coderId)
+	windowCoderId, err := c.AddWindowCoder(coderID)
 	if err != nil {
 		return nil, err
 	}
diff --git a/sdks/go/pkg/beam/core/state/state.go b/sdks/go/pkg/beam/core/state/state.go
index 62fd4509936..37adcdd0ca9 100644
--- a/sdks/go/pkg/beam/core/state/state.go
+++ b/sdks/go/pkg/beam/core/state/state.go
@@ -20,14 +20,18 @@ import (
 	"reflect"
 )
 
-type TransactionType_Enum int32
+// TransactionTypeEnum represents the type of state transaction
+type TransactionTypeEnum int32
 
 const (
-	TransactionType_Set   TransactionType_Enum = 0
-	TransactionType_Clear TransactionType_Enum = 1
+	// TransactionTypeSet is the set transaction type
+	TransactionTypeSet TransactionTypeEnum = 0
+	// TransactionTypeClear is the set transaction type
+	TransactionTypeClear TransactionTypeEnum = 1
 )
 
 var (
+	// ProviderType is the state provider type
 	ProviderType = reflect.TypeOf((*Provider)(nil)).Elem()
 )
 
@@ -37,7 +41,7 @@ var (
 // it is primarily used for implementations of the Provider interface to talk to the various State objects.
 type Transaction struct {
 	Key  string
-	Type TransactionType_Enum
+	Type TransactionTypeEnum
 	Val  interface{}
 }
 
@@ -67,7 +71,7 @@ type Value[T any] struct {
 func (s *Value[T]) Write(p Provider, val T) error {
 	return p.WriteValueState(Transaction{
 		Key:  s.Key,
-		Type: TransactionType_Set,
+		Type: TransactionTypeSet,
 		Val:  val,
 	})
 }
@@ -84,9 +88,9 @@ func (s *Value[T]) Read(p Provider) (T, bool, error) {
 	}
 	for _, t := range bufferedTransactions {
 		switch t.Type {
-		case TransactionType_Set:
+		case TransactionTypeSet:
 			cur = t.Val
-		case TransactionType_Clear:
+		case TransactionTypeClear:
 			cur = nil
 		}
 	}
diff --git a/sdks/go/pkg/beam/core/state/state_test.go b/sdks/go/pkg/beam/core/state/state_test.go
index b1297891c1c..536fa00a60e 100644
--- a/sdks/go/pkg/beam/core/state/state_test.go
+++ b/sdks/go/pkg/beam/core/state/state_test.go
@@ -30,12 +30,12 @@ type fakeProvider struct {
 	err          map[string]error
 }
 
-func (s *fakeProvider) ReadValueState(userStateId string) (interface{}, []Transaction, error) {
-	if err, ok := s.err[userStateId]; ok {
+func (s *fakeProvider) ReadValueState(userStateID string) (interface{}, []Transaction, error) {
+	if err, ok := s.err[userStateID]; ok {
 		return nil, nil, err
 	}
-	base := s.initialState[userStateId]
-	trans, ok := s.transactions[userStateId]
+	base := s.initialState[userStateID]
+	trans, ok := s.transactions[userStateID]
 	if !ok {
 		trans = []Transaction{}
 	}
@@ -58,15 +58,15 @@ func TestValueRead(t *testing.T) {
 	is["no_transactions"] = 1
 	ts["no_transactions"] = nil
 	is["basic_set"] = 1
-	ts["basic_set"] = []Transaction{{Key: "basic_set", Type: TransactionType_Set, Val: 3}}
+	ts["basic_set"] = []Transaction{{Key: "basic_set", Type: TransactionTypeSet, Val: 3}}
 	is["basic_clear"] = 1
-	ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type: TransactionType_Clear, Val: nil}}
+	ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type: TransactionTypeClear, Val: nil}}
 	is["set_then_clear"] = 1
-	ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear", Type: TransactionType_Clear, Val: nil}}
+	ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type: TransactionTypeSet, Val: 3}, {Key: "set_then_clear", Type: TransactionTypeClear, Val: nil}}
 	is["set_then_clear_then_set"] = 1
-	ts["set_then_clear_then_set"] = []Transaction{{Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear_then_set", Type: TransactionType_Clear, Val: nil}, {Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 4}}
+	ts["set_then_clear_then_set"] = []Transaction{{Key: "set_then_clear_then_set", Type: TransactionTypeSet, Val: 3}, {Key: "set_then_clear_then_set", Type: TransactionTypeClear, Val: nil}, {Key: "set_then_clear_then_set", Type: TransactionTypeSet, Val: 4}}
 	is["err"] = 1
-	ts["err"] = []Transaction{{Key: "err", Type: TransactionType_Set, Val: 3}}
+	ts["err"] = []Transaction{{Key: "err", Type: TransactionTypeSet, Val: 3}}
 	es["err"] = errFake
 
 	f := fakeProvider{