You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/25 23:22:47 UTC

[beam] branch master updated: [BEAM-7383] Adding strict flag to runners to validate with vet runner

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 838c030  [BEAM-7383] Adding strict flag to runners to validate with vet runner
     new e0c7eb6  Merge pull request #8644 from youngoli/beam7383
838c030 is described below

commit 838c0304a0d6e95eb8ab1be49e864db6fa3c561f
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Tue May 21 16:55:55 2019 -0700

    [BEAM-7383] Adding strict flag to runners to validate with vet runner
    
    Creates a pipeline option called beam_strict that's supported by the
    direct and universal runner. Uses the vet runner to perform the
    verification for strict mode.
---
 sdks/go/pkg/beam/options/jobopts/options.go     |  4 ++++
 sdks/go/pkg/beam/runners/direct/direct.go       | 10 ++++++++++
 sdks/go/pkg/beam/runners/universal/universal.go | 13 +++++++++++++
 sdks/go/pkg/beam/runners/vet/vet.go             | 10 ++++++----
 4 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/sdks/go/pkg/beam/options/jobopts/options.go b/sdks/go/pkg/beam/options/jobopts/options.go
index 86beb94..fb6b64a 100644
--- a/sdks/go/pkg/beam/options/jobopts/options.go
+++ b/sdks/go/pkg/beam/options/jobopts/options.go
@@ -61,6 +61,10 @@ var (
 
 	// Async determines whether to wait for job completion.
 	Async = flag.Bool("async", false, "Do not wait for job completion.")
+
+	// Strict mode applies additional validation to user pipelines before
+	// executing them and fails early if the pipelines don't pass.
+	Strict = flag.Bool("beam_strict", false, "Apply additional validation to pipelines.")
 )
 
 // GetEndpoint returns the endpoint, if non empty and exits otherwise. Runners
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go b/sdks/go/pkg/beam/runners/direct/direct.go
index bd1d324..55e4836 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -28,6 +28,8 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
+	"github.com/apache/beam/sdks/go/pkg/beam/runners/vet"
 )
 
 func init() {
@@ -45,6 +47,14 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 	log.Info(ctx, "Pipeline:")
 	log.Info(ctx, p)
 
+	if *jobopts.Strict {
+		log.Info(ctx, "Strict mode enabled, applying additional validation.")
+		if err := vet.Execute(ctx, p); err != nil {
+			return errors.Wrap(err, "strictness check failed")
+		}
+		log.Info(ctx, "Strict mode validation passed.")
+	}
+
 	edges, _, err := p.Build()
 	if err != nil {
 		return errors.Wrap(err, "invalid pipeline")
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go
index 0e22db7..51f74da 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -30,6 +30,7 @@ import (
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
 	"github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib"
+	"github.com/apache/beam/sdks/go/pkg/beam/runners/vet"
 	"github.com/golang/protobuf/proto"
 )
 
@@ -40,6 +41,18 @@ func init() {
 
 // Execute executes the pipeline on a universal beam runner.
 func Execute(ctx context.Context, p *beam.Pipeline) error {
+	if !beam.Initialized() {
+		panic(fmt.Sprint("Beam has not been initialized. Call beam.Init() before pipeline construction."))
+	}
+
+	if *jobopts.Strict {
+		log.Info(ctx, "Strict mode enabled, applying additional validation.")
+		if err := vet.Execute(ctx, p); err != nil {
+			return errors.Wrap(err, "strictness check failed")
+		}
+		log.Info(ctx, "Strict mode validation passed.")
+	}
+
 	endpoint, err := jobopts.GetEndpoint()
 	if err != nil {
 		return err
diff --git a/sdks/go/pkg/beam/runners/vet/vet.go b/sdks/go/pkg/beam/runners/vet/vet.go
index a700a80..77b8769 100644
--- a/sdks/go/pkg/beam/runners/vet/vet.go
+++ b/sdks/go/pkg/beam/runners/vet/vet.go
@@ -27,7 +27,6 @@ package vet
 import (
 	"bytes"
 	"context"
-	"errors"
 	"fmt"
 	"reflect"
 	"strings"
@@ -43,6 +42,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 func init() {
@@ -54,20 +54,22 @@ func init() {
 type disabledResolver bool
 
 func (p disabledResolver) Sym2Addr(name string) (uintptr, error) {
-	return 0, fmt.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
+	return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
 }
 
 // Execute evaluates the pipeline on whether it can run without reflection.
 func Execute(ctx context.Context, p *beam.Pipeline) error {
 	e, err := Evaluate(ctx, p)
 	if err != nil {
-		return err
+		return errors.WithContext(err, "validating pipeline with vet runner")
 	}
 	if !e.Performant() {
 		e.summary()
 		e.Generate("main")
 		e.diag("*/\n")
-		return fmt.Errorf("pipeline is not performant, see diagnostic summary:\n%s\n%s", string(e.d.Bytes()), string(e.Bytes()))
+		err := errors.Errorf("pipeline is not performant, see diagnostic summary:\n%s\n%s", string(e.d.Bytes()), string(e.Bytes()))
+		err = errors.WithContext(err, "validating pipeline with vet runner")
+		return errors.SetTopLevelMsg(err, "pipeline is not performant")
 	}
 	// Pipeline nas no further tasks.
 	return nil