You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/07/31 03:42:54 UTC

[beam] 01/01: [BEAM-10610][BEAM-9982] Support Loopback mode with universal runners in the Go SDK.

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 63705b05babb85ce5058013cae5a73daead733dc
Merge: 752bdfd 8798913
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Thu Jul 30 20:42:31 2020 -0700

    [BEAM-10610][BEAM-9982] Support Loopback mode with universal runners in the Go SDK.
    
    This allows loopback mode to be supported in the Go SDK. This is useful for validating pipelines against real portable runners such as Flink, Spark, and the Python Portable runner.
    
    This PR also fixes some smaller bugs around logging, discovered through testing.
    
    Missing newlines when printing unsent log messages to the local machine.
    A small race condition when the control channel terminates prevented orderly shutdown of workers.
    Cleans up [BEAM-9982], and removes the redundant MustMarshal function from graphx.
    Increases the default GRPC recv limit to it's maximum. Testing indicates that this only leads to memory overage if the inputs are actually large, and unnecessarily restricts pipeline data.
    Loopback mode allows for convenient testing of pipelines, such as avoiding needing to dig into running docker VMs to access logs. Print output will appear as expected in the normal console. There are also some testing benefits such as access to the local file system for reading and writing files.
    
    The associated risk for this convenience is access to the local state of the program. Package level/global state is accessible in the process, and thus to the local workers. As per best practices, it's best to move DoFn state into Structural DoFns. Otherwise you risk writing pipelines that depend on state that doesn't exist on distributed machines.
    
    To use loopback mode, ensure that the runner package you're using is based on the universal runner (such as spark or flink) or use the universal runner directly. You can register the universal runner for use by beamx.Run by underscore importing it.
    
    	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
    
    	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
    then add the following command lines to your job.
    --runner=universal --endpoint=<local Job management server instance> --environment_type=LOOPBACK
    
    This will have the framework start an BeamFnExternalWorkerPoolServer, which will spin up new workers in the main program process, with the runner orchestrating workers in the process as necessary.

 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |  47 +++----
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |   7 +-
 sdks/go/pkg/beam/core/runtime/harness/logging.go   |   2 +-
 sdks/go/pkg/beam/options/jobopts/options.go        |   9 +-
 .../beam/runners/universal/extworker/extworker.go  | 120 ++++++++++++++++++
 .../runners/universal/extworker/extworker_test.go  | 141 +++++++++++++++++++++
 sdks/go/pkg/beam/runners/universal/universal.go    |  17 ++-
 sdks/go/pkg/beam/util/grpcx/dial.go                |   3 +-
 8 files changed, 314 insertions(+), 32 deletions(-)