You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/08/19 15:01:23 UTC
[beam] branch master updated: [Go SDK] Fix go lint errors (#22796)
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8912f4d3584 [Go SDK] Fix go lint errors (#22796)
8912f4d3584 is described below
commit 8912f4d3584c2916de6d1e865f2c055aeac8700b
Author: Ritesh Ghorse <ri...@gmail.com>
AuthorDate: Fri Aug 19 11:01:15 2022 -0400
[Go SDK] Fix go lint errors (#22796)
* fix go lint errors
* fix return
---
sdks/go/pkg/beam/core/funcx/fn.go | 2 +-
sdks/go/pkg/beam/core/graph/fn.go | 14 +++++++-------
sdks/go/pkg/beam/core/runtime/exec/fn.go | 6 +++---
sdks/go/pkg/beam/core/runtime/exec/translate.go | 4 ++--
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 16 ++++++++--------
sdks/go/pkg/beam/core/state/state.go | 18 +++++++++++-------
sdks/go/pkg/beam/core/state/state_test.go | 18 +++++++++---------
7 files changed, 41 insertions(+), 37 deletions(-)
diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go
index 788d6d52df6..89d92529f01 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -83,7 +83,7 @@ const (
FnBundleFinalization FnParamKind = 0x800
// FnWatermarkEstimator indicates a function input parameter that implements sdf.WatermarkEstimator
FnWatermarkEstimator FnParamKind = 0x1000
- // FnState indicates a function input parameter that implements state.Provider
+ // FnStateProvider indicates a function input parameter that implements state.Provider
FnStateProvider FnParamKind = 0x2000
)
diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go
index 2034349cdf2..bc54ad3362d 100644
--- a/sdks/go/pkg/beam/core/graph/fn.go
+++ b/sdks/go/pkg/beam/core/graph/fn.go
@@ -1272,15 +1272,15 @@ func validateState(fn *DoFn, numIn mainInputs) error {
err := errors.Errorf("Duplicate state key %v", k)
return errors.SetTopLevelMsgf(err, "Duplicate state key %v used by %v and %v. Ensure that state keys are"+
"unique per DoFn", k, orig, s)
- } else {
- stateKeys[k] = s
}
+ stateKeys[k] = s
+ }
+ if len(ps) > 0 {
+ // TODO(#22736) - Remove this once state is fully supported
+ err := errors.Errorf("ProcessElement uses a StateProvider, but state is not supported in this release.")
+ return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but state is not supported in this release. "+
+ "Please try upgrading to a newer release if one exists or wait for state support to be released.")
}
-
- // TODO(#22736) - Remove this once state is fully supported
- err := errors.Errorf("ProcessElement uses a StateProvider, but state is not supported in this release.")
- return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but state is not supported in this release. "+
- "Please try upgrading to a newer release if one exists or wait for state support to be released.")
} else {
if len(ps) > 0 {
err := errors.Errorf("ProcessElement doesn't use a StateProvider, but State structs are attached to "+
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 51f3296ca44..c07c45745e1 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -73,15 +73,15 @@ type stateProvider struct {
}
// ReadValueState reads a value state from the State API
-func (s *stateProvider) ReadValueState(userStateId string) (interface{}, []state.Transaction, error) {
+func (s *stateProvider) ReadValueState(userStateID string) (interface{}, []state.Transaction, error) {
// TODO(#22736) - read from the state api.
- return nil, nil, errors.New("Stateful DoFns are not supported yet.")
+ return nil, nil, errors.New("stateful DoFns are not supported yet")
}
// WriteValueState writes a value state to the State API
func (s *stateProvider) WriteValueState(val state.Transaction) error {
// TODO(#22736) - read from the state api.
- return errors.New("Stateful DoFns are not supported yet.")
+ return errors.New("stateful DoFns are not supported yet")
}
// Invoke invokes the fn with the given values. The extra values must match the non-main
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 53ddf7843c2..5c66c609aee 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -466,7 +466,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
input := unmarshalKeyedValues(transform.GetInputs())
if len(userState) > 0 {
- stateIdToCoder := make(map[string]*coder.Coder)
+ stateIDToCoder := make(map[string]*coder.Coder)
for key, spec := range userState {
// TODO(#22736) - this will eventually need to be aware of which type of state its modifying to support non-Value state types.
cID := spec.GetReadModifyWriteSpec().CoderId
@@ -474,7 +474,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
if err != nil {
return nil, err
}
- stateIdToCoder[key] = c
+ stateIDToCoder[key] = c
}
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 653dacce7a3..a5ef6d75e5d 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -448,11 +448,11 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
SideInputs: si,
}
if edge.Edge.DoFn.IsSplittable() {
- coderId, err := m.coders.Add(edge.Edge.RestrictionCoder)
+ coderID, err := m.coders.Add(edge.Edge.RestrictionCoder)
if err != nil {
return handleErr(err)
}
- payload.RestrictionCoderId = coderId
+ payload.RestrictionCoderId = coderID
m.requirements[URNRequiresSplittableDoFn] = true
}
if _, ok := edge.Edge.DoFn.ProcessElementFn().BundleFinalization(); ok {
@@ -462,7 +462,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
m.requirements[URNRequiresStatefulProcessing] = true
stateSpecs := make(map[string]*pipepb.StateSpec)
for _, ps := range edge.Edge.DoFn.PipelineState() {
- coderId, err := m.coders.Add(edge.Edge.StateCoders[ps.StateKey()])
+ coderID, err := m.coders.Add(edge.Edge.StateCoders[ps.StateKey()])
if err != nil {
return handleErr(err)
}
@@ -471,7 +471,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
// See https://github.com/apache/beam/blob/54b0784da7ccba738deff22bd83fbc374ad21d2e/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go#L2635
Spec: &pipepb.StateSpec_ReadModifyWriteSpec{
ReadModifyWriteSpec: &pipepb.ReadModifyWriteStateSpec{
- CoderId: coderId,
+ CoderId: coderID,
},
},
Protocol: &pipepb.FunctionSpec{
@@ -829,11 +829,11 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) (string, error) {
if err != nil {
return handleErr(err)
}
- coderId, err := makeWindowCoder(wfn)
+ coderID, err := makeWindowCoder(wfn)
if err != nil {
return handleErr(err)
}
- windowCoderId, err := m.coders.AddWindowCoder(coderId)
+ windowCoderId, err := m.coders.AddWindowCoder(coderID)
if err != nil {
return handleErr(err)
}
@@ -1054,11 +1054,11 @@ func MarshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (
if err != nil {
return nil, err
}
- coderId, err := makeWindowCoder(w.Fn)
+ coderID, err := makeWindowCoder(w.Fn)
if err != nil {
return nil, err
}
- windowCoderId, err := c.AddWindowCoder(coderId)
+ windowCoderId, err := c.AddWindowCoder(coderID)
if err != nil {
return nil, err
}
diff --git a/sdks/go/pkg/beam/core/state/state.go b/sdks/go/pkg/beam/core/state/state.go
index 62fd4509936..37adcdd0ca9 100644
--- a/sdks/go/pkg/beam/core/state/state.go
+++ b/sdks/go/pkg/beam/core/state/state.go
@@ -20,14 +20,18 @@ import (
"reflect"
)
-type TransactionType_Enum int32
+// TransactionTypeEnum represents the type of state transaction
+type TransactionTypeEnum int32
const (
- TransactionType_Set TransactionType_Enum = 0
- TransactionType_Clear TransactionType_Enum = 1
+ // TransactionTypeSet is the set transaction type
+ TransactionTypeSet TransactionTypeEnum = 0
+ // TransactionTypeClear is the set transaction type
+ TransactionTypeClear TransactionTypeEnum = 1
)
var (
+ // ProviderType is the state provider type
ProviderType = reflect.TypeOf((*Provider)(nil)).Elem()
)
@@ -37,7 +41,7 @@ var (
// it is primarily used for implementations of the Provider interface to talk to the various State objects.
type Transaction struct {
Key string
- Type TransactionType_Enum
+ Type TransactionTypeEnum
Val interface{}
}
@@ -67,7 +71,7 @@ type Value[T any] struct {
func (s *Value[T]) Write(p Provider, val T) error {
return p.WriteValueState(Transaction{
Key: s.Key,
- Type: TransactionType_Set,
+ Type: TransactionTypeSet,
Val: val,
})
}
@@ -84,9 +88,9 @@ func (s *Value[T]) Read(p Provider) (T, bool, error) {
}
for _, t := range bufferedTransactions {
switch t.Type {
- case TransactionType_Set:
+ case TransactionTypeSet:
cur = t.Val
- case TransactionType_Clear:
+ case TransactionTypeClear:
cur = nil
}
}
diff --git a/sdks/go/pkg/beam/core/state/state_test.go b/sdks/go/pkg/beam/core/state/state_test.go
index b1297891c1c..536fa00a60e 100644
--- a/sdks/go/pkg/beam/core/state/state_test.go
+++ b/sdks/go/pkg/beam/core/state/state_test.go
@@ -30,12 +30,12 @@ type fakeProvider struct {
err map[string]error
}
-func (s *fakeProvider) ReadValueState(userStateId string) (interface{}, []Transaction, error) {
- if err, ok := s.err[userStateId]; ok {
+func (s *fakeProvider) ReadValueState(userStateID string) (interface{}, []Transaction, error) {
+ if err, ok := s.err[userStateID]; ok {
return nil, nil, err
}
- base := s.initialState[userStateId]
- trans, ok := s.transactions[userStateId]
+ base := s.initialState[userStateID]
+ trans, ok := s.transactions[userStateID]
if !ok {
trans = []Transaction{}
}
@@ -58,15 +58,15 @@ func TestValueRead(t *testing.T) {
is["no_transactions"] = 1
ts["no_transactions"] = nil
is["basic_set"] = 1
- ts["basic_set"] = []Transaction{{Key: "basic_set", Type: TransactionType_Set, Val: 3}}
+ ts["basic_set"] = []Transaction{{Key: "basic_set", Type: TransactionTypeSet, Val: 3}}
is["basic_clear"] = 1
- ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type: TransactionType_Clear, Val: nil}}
+ ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type: TransactionTypeClear, Val: nil}}
is["set_then_clear"] = 1
- ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear", Type: TransactionType_Clear, Val: nil}}
+ ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type: TransactionTypeSet, Val: 3}, {Key: "set_then_clear", Type: TransactionTypeClear, Val: nil}}
is["set_then_clear_then_set"] = 1
- ts["set_then_clear_then_set"] = []Transaction{{Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear_then_set", Type: TransactionType_Clear, Val: nil}, {Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 4}}
+ ts["set_then_clear_then_set"] = []Transaction{{Key: "set_then_clear_then_set", Type: TransactionTypeSet, Val: 3}, {Key: "set_then_clear_then_set", Type: TransactionTypeClear, Val: nil}, {Key: "set_then_clear_then_set", Type: TransactionTypeSet, Val: 4}}
is["err"] = 1
- ts["err"] = []Transaction{{Key: "err", Type: TransactionType_Set, Val: 3}}
+ ts["err"] = []Transaction{{Key: "err", Type: TransactionTypeSet, Val: 3}}
es["err"] = errFake
f := fakeProvider{