You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/25 23:22:47 UTC
[beam] branch master updated: [BEAM-7383] Adding strict flag to
runners to validate with vet runner
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 838c030 [BEAM-7383] Adding strict flag to runners to validate with vet runner
new e0c7eb6 Merge pull request #8644 from youngoli/beam7383
838c030 is described below
commit 838c0304a0d6e95eb8ab1be49e864db6fa3c561f
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Tue May 21 16:55:55 2019 -0700
[BEAM-7383] Adding strict flag to runners to validate with vet runner
Creates a pipeline option called beam_strict that's supported by the
direct and universal runner. Uses the vet runner to perform the
verification for strict mode.
---
sdks/go/pkg/beam/options/jobopts/options.go | 4 ++++
sdks/go/pkg/beam/runners/direct/direct.go | 10 ++++++++++
sdks/go/pkg/beam/runners/universal/universal.go | 13 +++++++++++++
sdks/go/pkg/beam/runners/vet/vet.go | 10 ++++++----
4 files changed, 33 insertions(+), 4 deletions(-)
diff --git a/sdks/go/pkg/beam/options/jobopts/options.go b/sdks/go/pkg/beam/options/jobopts/options.go
index 86beb94..fb6b64a 100644
--- a/sdks/go/pkg/beam/options/jobopts/options.go
+++ b/sdks/go/pkg/beam/options/jobopts/options.go
@@ -61,6 +61,10 @@ var (
// Async determines whether to wait for job completion.
Async = flag.Bool("async", false, "Do not wait for job completion.")
+
+ // Strict mode applies additional validation to user pipelines before
+ // executing them and fails early if the pipelines don't pass.
+ Strict = flag.Bool("beam_strict", false, "Apply additional validation to pipelines.")
)
// GetEndpoint returns the endpoint, if non empty and exits otherwise. Runners
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go b/sdks/go/pkg/beam/runners/direct/direct.go
index bd1d324..55e4836 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -28,6 +28,8 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/go/pkg/beam/log"
+ "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
+ "github.com/apache/beam/sdks/go/pkg/beam/runners/vet"
)
func init() {
@@ -45,6 +47,14 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
log.Info(ctx, "Pipeline:")
log.Info(ctx, p)
+ if *jobopts.Strict {
+ log.Info(ctx, "Strict mode enabled, applying additional validation.")
+ if err := vet.Execute(ctx, p); err != nil {
+ return errors.Wrap(err, "strictness check failed")
+ }
+ log.Info(ctx, "Strict mode validation passed.")
+ }
+
edges, _, err := p.Build()
if err != nil {
return errors.Wrap(err, "invalid pipeline")
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go
index 0e22db7..51f74da 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -30,6 +30,7 @@ import (
pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
"github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib"
+ "github.com/apache/beam/sdks/go/pkg/beam/runners/vet"
"github.com/golang/protobuf/proto"
)
@@ -40,6 +41,18 @@ func init() {
// Execute executes the pipeline on a universal beam runner.
func Execute(ctx context.Context, p *beam.Pipeline) error {
+ if !beam.Initialized() {
+ panic(fmt.Sprint("Beam has not been initialized. Call beam.Init() before pipeline construction."))
+ }
+
+ if *jobopts.Strict {
+ log.Info(ctx, "Strict mode enabled, applying additional validation.")
+ if err := vet.Execute(ctx, p); err != nil {
+ return errors.Wrap(err, "strictness check failed")
+ }
+ log.Info(ctx, "Strict mode validation passed.")
+ }
+
endpoint, err := jobopts.GetEndpoint()
if err != nil {
return err
diff --git a/sdks/go/pkg/beam/runners/vet/vet.go b/sdks/go/pkg/beam/runners/vet/vet.go
index a700a80..77b8769 100644
--- a/sdks/go/pkg/beam/runners/vet/vet.go
+++ b/sdks/go/pkg/beam/runners/vet/vet.go
@@ -27,7 +27,6 @@ package vet
import (
"bytes"
"context"
- "errors"
"fmt"
"reflect"
"strings"
@@ -43,6 +42,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+ "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
func init() {
@@ -54,20 +54,22 @@ func init() {
type disabledResolver bool
func (p disabledResolver) Sym2Addr(name string) (uintptr, error) {
- return 0, fmt.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
+ return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
}
// Execute evaluates the pipeline on whether it can run without reflection.
func Execute(ctx context.Context, p *beam.Pipeline) error {
e, err := Evaluate(ctx, p)
if err != nil {
- return err
+ return errors.WithContext(err, "validating pipeline with vet runner")
}
if !e.Performant() {
e.summary()
e.Generate("main")
e.diag("*/\n")
- return fmt.Errorf("pipeline is not performant, see diagnostic summary:\n%s\n%s", string(e.d.Bytes()), string(e.Bytes()))
+ err := errors.Errorf("pipeline is not performant, see diagnostic summary:\n%s\n%s", string(e.d.Bytes()), string(e.Bytes()))
+ err = errors.WithContext(err, "validating pipeline with vet runner")
+ return errors.SetTopLevelMsg(err, "pipeline is not performant")
}
// Pipeline nas no further tasks.
return nil