You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/11/15 01:34:41 UTC
[beam] branch master updated: Implement embedded WebAssembly example (#24081)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 689e70b5131 Implement embedded WebAssembly example (#24081)
689e70b5131 is described below
commit 689e70b5131620540faf52e2f1e2dca7a36f269d
Author: Damon <da...@users.noreply.github.com>
AuthorDate: Mon Nov 14 17:34:29 2022 -0800
Implement embedded WebAssembly example (#24081)
---
sdks/go.mod | 1 +
sdks/go.sum | 2 +
sdks/go/examples/wasm/Cargo.toml | 40 ++++++++
sdks/go/examples/wasm/README.md | 137 ++++++++++++++++++++++++++
sdks/go/examples/wasm/greet.rs | 139 +++++++++++++++++++++++++++
sdks/go/examples/wasm/greet.wasm | Bin 0 -> 62570 bytes
sdks/go/examples/wasm/wasm.go | 201 +++++++++++++++++++++++++++++++++++++++
7 files changed, 520 insertions(+)
diff --git a/sdks/go.mod b/sdks/go.mod
index de8e2a9b3ff..5db5f403026 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -123,6 +123,7 @@ require (
github.com/shabbyrobe/gocovmerge v0.0.0-20180507124511-f6ea450bfb63 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
+ github.com/tetratelabs/wazero v1.0.0-pre.3 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
diff --git a/sdks/go.sum b/sdks/go.sum
index 5466c737dd0..0f8a4d0728f 100644
--- a/sdks/go.sum
+++ b/sdks/go.sum
@@ -869,6 +869,8 @@ github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG
github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I=
github.com/testcontainers/testcontainers-go v0.15.0 h1:3Ex7PUGFv0b2bBsdOv6R42+SK2qoZnWBd21LvZYhUtQ=
github.com/testcontainers/testcontainers-go v0.15.0/go.mod h1:PkohMRH2X8Hib0IWtifVexDfLPVT+tb5E9hsf7cW12w=
+github.com/tetratelabs/wazero v1.0.0-pre.3 h1:Z5fbogMUGcERzaQb9mQU8+yJSy0bVvv2ce3dfR4wcZg=
+github.com/tetratelabs/wazero v1.0.0-pre.3/go.mod h1:M8UDNECGm/HVjOfq0EOe4QfCY9Les1eq54IChMLETbc=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
diff --git a/sdks/go/examples/wasm/Cargo.toml b/sdks/go/examples/wasm/Cargo.toml
new file mode 100644
index 00000000000..50fbebbb099
--- /dev/null
+++ b/sdks/go/examples/wasm/Cargo.toml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# File courtesy of
+# https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/testdata/Cargo.toml
+[package]
+name = "greet"
+version = "0.1.0"
+edition = "2021"
+
+[lib]
+# cdylib builds a a %.wasm file with `cargo build --release --target wasm32-unknown-unknown`
+crate-type = ["cdylib"]
+name = "greet"
+path = "greet.rs"
+
+[dependencies]
+# wee_aloc is a WebAssembly optimized allocator, which is needed to use non-numeric types like strings.
+# See https://docs.rs/wee_alloc/latest/wee_alloc/
+wee_alloc = "0.4.5"
+
+# Below settings dramatically reduce wasm output size
+# See https://rustwasm.github.io/book/reference/code-size.html#optimizing-builds-for-code-sizewasm-opt -Oz -o
+# See https://doc.rust-lang.org/cargo/reference/profiles.html#codegen-units
+[profile.release]
+opt-level = "z"
+lto = true
+codegen-units = 1
\ No newline at end of file
diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md
new file mode 100644
index 00000000000..ad25ce87771
--- /dev/null
+++ b/sdks/go/examples/wasm/README.md
@@ -0,0 +1,137 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+# Overview
+
+wasm is an **_EXPERIMENTAL_** simple example that loads and executes a wasm file function.
+greet.wasm, Cargo.toml and greet.rs were copied from the example provided by the wazero library:
+https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
+
+# Usage
+
+To run this example in various runners, the following assumes:
+
+```
+OUTPUT=<path to output i.e. /tmp/example/output>
+git clone https://github.com/apache/beam
+BEAM_HOME=$(pwd)/beam
+cd $BEAM_HOME/sdks
+```
+
+## Direct Runner execution
+
+To execute this example on the direct runner:
+
+```shell
+go run ./go/examples/wasm --output=$OUTPUT
+```
+
+## Flink Portable Runner
+
+The following describes how to execute this example on the flink portable runner.
+
+### REQUIREMENTS:
+
+- [Docker](https://docker.io); MacOS users may consider alternative: https://github.com/abiosoft/colima
+- Google Cloud Storage (https://cloud.google.com/storage) or S3 (https://aws.amazon.com/s3/) bucket;
+NOTE this example was only tested on Google Cloud Storage
+
+#### 0. Set OUTPUT to a Cloud storage bucket path
+
+The example below shows Google Cloud Storage.
+```
+OUTPUT=gs://<my bucket>/greet
+```
+
+#### 1. Find the latest flink runner version
+
+```shell
+cd $BEAM_HOME
+./gradlew :runners:flink:properties --property flink_versions
+```
+
+Expected output should include the following, from which you acquire the latest flink runner version.
+
+```shell
+'flink_versions: 1.12,1.13,1.14,1.15'
+```
+
+#### 2. Set to the latest flink runner version i.e. 1.15
+
+```shell
+FLINK_VERSION=1.15
+```
+
+#### 3. In a separate terminal, start the flink runner (It should take a few minutes on the first execution)
+```shell
+cd $BEAM_HOME
+./gradlew :runners:flink:$FLINK_VERSION:job-server:runShadow
+```
+
+Note the JobService host and port from the output, similar to:
+
+```shell
+INFO: JobService started on localhost:8099
+```
+
+#### 4. Set the JOB_SERVICE variable from the aforementioned output
+
+```shell
+JOB_SERVICE=localhost:8099
+```
+
+#### 5. Execute this example using the portable flink runner
+
+```shell
+cd $BEAM_HOME/sdks
+go run ./go/examples/wasm --runner=universal --endpoint=$JOB_SERVICE --output=$OUTPUT --environment_config=apache/beam_go_sdk:latest
+```
+
+## Dataflow Runner
+
+The following describes how to execute this example on Dataflow.
+
+### REQUIREMENTS:
+
+- Google Cloud Storage (https://cloud.google.com/storage) bucket
+
+#### 1. Set OUTPUT to a Cloud storage bucket path
+
+The example below shows Google Cloud Storage.
+
+```shell
+OUTPUT=gs://<my bucket>/greet
+```
+
+#### 2. Set additional variables
+
+```shell
+PROJECT=<project id>
+REGION=<region>
+STAGING=gs://<my bucket>/staging
+NETWORK=<network>
+SUBNETWORK=regions/$REGION/subnetworks/<subnetwork>
+```
+
+#### 3. Execute this example using Dataflow
+
+```
+cd $BEAM_HOME/sdks
+go run ./go/examples/wasm --runner=dataflow --output=$OUTPUT --environment_config=apache/beam_go_sdk:latest \
+ --project=$PROJECT --region=$REGION --network=$NETWORK --subnetwork=$SUBNETWORK --staging_location=$STAGING
+```
\ No newline at end of file
diff --git a/sdks/go/examples/wasm/greet.rs b/sdks/go/examples/wasm/greet.rs
new file mode 100644
index 00000000000..dfd57a260b9
--- /dev/null
+++ b/sdks/go/examples/wasm/greet.rs
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Copied from:
+// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/testdata/greet.rs
+extern crate alloc;
+extern crate core;
+extern crate wee_alloc;
+
+use alloc::vec::Vec;
+use std::mem::MaybeUninit;
+use std::slice;
+
+/// Prints a greeting to the console using [`log`].
+fn greet(name: &String) {
+ log(&["wasm >> ", &greeting(name)].concat());
+}
+
+/// Gets a greeting for the name.
+fn greeting(name: &String) -> String {
+ return ["Hello, ", &name, "!"].concat();
+}
+
+/// Logs a message to the console using [`_log`].
+fn log(message: &String) {
+ unsafe {
+ let (ptr, len) = string_to_ptr(message);
+ _log(ptr, len);
+ }
+}
+
+#[link(wasm_import_module = "env")]
+extern "C" {
+ /// WebAssembly import which prints a string (linear memory offset,
+ /// byteCount) to the console.
+ ///
+ /// Note: This is not an ownership transfer: Rust still owns the pointer
+ /// and ensures it isn't deallocated during this call.
+ #[link_name = "log"]
+ fn _log(ptr: u32, size: u32);
+}
+
+/// WebAssembly export that accepts a string (linear memory offset, byteCount)
+/// and calls [`greet`].
+///
+/// Note: The input parameters were returned by [`allocate`]. This is not an
+/// ownership transfer, so the inputs can be reused after this call.
+#[cfg_attr(all(target_arch = "wasm32"), export_name = "greet")]
+#[no_mangle]
+pub unsafe extern "C" fn _greet(ptr: u32, len: u32) {
+ greet(&ptr_to_string(ptr, len));
+}
+
+/// WebAssembly export that accepts a string (linear memory offset, byteCount)
+/// and returns a pointer/size pair packed into a u64.
+///
+/// Note: The return value is leaked to the caller, so it must call
+/// [`deallocate`] when finished.
+/// Note: This uses a u64 instead of two result values for compatibility with
+/// WebAssembly 1.0.
+#[cfg_attr(all(target_arch = "wasm32"), export_name = "greeting")]
+#[no_mangle]
+pub unsafe extern "C" fn _greeting(ptr: u32, len: u32) -> u64 {
+ let name = &ptr_to_string(ptr, len);
+ let g = greeting(name);
+ let (ptr, len) = string_to_ptr(&g);
+ // Note: This changes ownership of the pointer to the external caller. If
+ // we didn't call forget, the caller would read back a corrupt value. Since
+ // we call forget, the caller must deallocate externally to prevent leaks.
+ std::mem::forget(g);
+ return ((ptr as u64) << 32) | len as u64;
+}
+
+/// Returns a string from WebAssembly compatible numeric types representing
+/// its pointer and length.
+unsafe fn ptr_to_string(ptr: u32, len: u32) -> String {
+ let slice = slice::from_raw_parts_mut(ptr as *mut u8, len as usize);
+ let utf8 = std::str::from_utf8_unchecked_mut(slice);
+ return String::from(utf8);
+}
+
+/// Returns a pointer and size pair for the given string in a way compatible
+/// with WebAssembly numeric types.
+///
+/// Note: This doesn't change the ownership of the String. To intentionally
+/// leak it, use [`std::mem::forget`] on the input after calling this.
+unsafe fn string_to_ptr(s: &String) -> (u32, u32) {
+ return (s.as_ptr() as u32, s.len() as u32);
+}
+
+/// Set the global allocator to the WebAssembly optimized one.
+#[global_allocator]
+static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;
+
+/// WebAssembly export that allocates a pointer (linear memory offset) that can
+/// be used for a string.
+///
+/// This is an ownership transfer, which means the caller must call
+/// [`deallocate`] when finished.
+#[cfg_attr(all(target_arch = "wasm32"), export_name = "allocate")]
+#[no_mangle]
+pub extern "C" fn _allocate(size: u32) -> *mut u8 {
+ allocate(size as usize)
+}
+
+/// Allocates size bytes and leaks the pointer where they start.
+fn allocate(size: usize) -> *mut u8 {
+ // Allocate the amount of bytes needed.
+ let vec: Vec<MaybeUninit<u8>> = Vec::with_capacity(size);
+
+ // into_raw leaks the memory to the caller.
+ Box::into_raw(vec.into_boxed_slice()) as *mut u8
+}
+
+
+/// WebAssembly export that deallocates a pointer of the given size (linear
+/// memory offset, byteCount) allocated by [`allocate`].
+#[cfg_attr(all(target_arch = "wasm32"), export_name = "deallocate")]
+#[no_mangle]
+pub unsafe extern "C" fn _deallocate(ptr: u32, size: u32) {
+ deallocate(ptr as *mut u8, size as usize);
+}
+
+/// Retakes the pointer which allows its memory to be freed.
+unsafe fn deallocate(ptr: *mut u8, size: usize) {
+ let _ = Vec::from_raw_parts(ptr, 0, size);
+}
\ No newline at end of file
diff --git a/sdks/go/examples/wasm/greet.wasm b/sdks/go/examples/wasm/greet.wasm
new file mode 100644
index 00000000000..03c31769e7d
Binary files /dev/null and b/sdks/go/examples/wasm/greet.wasm differ
diff --git a/sdks/go/examples/wasm/wasm.go b/sdks/go/examples/wasm/wasm.go
new file mode 100644
index 00000000000..b918f41feee
--- /dev/null
+++ b/sdks/go/examples/wasm/wasm.go
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// wasm is an EXPERIMENTAL simple example that loads and executes a wasm file function.
+// greet.wasm, Cargo.toml and greet.rs were copied from the example provided by the wazero library:
+// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
+//
+// New Concepts:
+// 1. Load a wasm file compiled from: cargo build --release --target wasm32-unknown-unknown
+// 2. Execute a wasm function within a DoFn
+package main
+
+import (
+ "context"
+ _ "embed"
+ "flag"
+ "fmt"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+ "github.com/tetratelabs/wazero"
+ "github.com/tetratelabs/wazero/api"
+)
+
+const (
+ wasmFunctionName = "greeting"
+ wasmAllocateFunctionName = "allocate"
+ wasmDeallocateFunctionName = "deallocate"
+)
+
+//go:embed greet.wasm
+var greetWasm []byte
+
+var (
+ output = flag.String("output", "", "Output file (required).")
+)
+
+func init() {
+ // register.DoFnXxY registers a struct DoFn so that it can be correctly
+ // serialized and does some optimization to avoid runtime reflection. Since
+ // embeddedWasmFn's ProcessElement func has 2 inputs (context.Context) and 2 outputs (string, error),
+ // we use register.DoFn2x2 and provide its input and output types as its constraints.
+ // Struct DoFns must be registered for a pipeline to run.
+ register.DoFn2x2[context.Context, string, string, error](&embeddedWasmFn{})
+}
+
+func preRun() error {
+ if *output == "" {
+ return fmt.Errorf("--output is required")
+ }
+ return nil
+}
+
+func main() {
+ flag.Parse()
+ beam.Init()
+ ctx := context.Background()
+ if err := preRun(); err != nil {
+ log.Fatal(ctx, err)
+ }
+ if err := run(ctx); err != nil {
+ log.Fatal(ctx, err)
+ }
+}
+
+func run(ctx context.Context) error {
+ p, s := beam.NewPipelineWithRoot()
+
+ in := beam.Create(s, "Ada", "Lovelace", "World", "Beam", "Senior López")
+
+ out := beam.ParDo(s, &embeddedWasmFn{}, in)
+
+ textio.Write(s, *output, out)
+
+ if err := beamx.Run(ctx, p); err != nil {
+ return fmt.Errorf("failed to run pipeline: %v", err)
+ }
+ return nil
+}
+
+// Concept #2 wrap wasm function execution within a DoFn.
+// wasmFn wraps a DoFn to execute a Rust compiled wasm function
+type embeddedWasmFn struct {
+ r wazero.Runtime
+ mod api.Module
+ greeting, allocate, deallocate api.Function
+}
+
+// Setup loads and initializes the embedded wasm functions
+// Concept #1: Load a compiled wasm file []byte content and function.
+// This example is derived from
+// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
+func (fn *embeddedWasmFn) Setup(ctx context.Context) error {
+ // Create a new WebAssembly Runtime.
+ // Typically, a defer r.Close() would be called subsequently after. Yet, we need to keep this in memory
+ // throughout the DoFn lifecycle after which we invoke r.Close(); see Teardown below.
+ fn.r = wazero.NewRuntime(ctx)
+
+ // Instantiate a Go-defined module named "env" that exports a function to
+ // log to the console.
+ _, err := fn.r.NewHostModuleBuilder("env").
+ NewFunctionBuilder().WithFunc(logString).Export("log").
+ Instantiate(ctx, fn.r)
+ if err != nil {
+ return fmt.Errorf("failed to instantiate host module: %w", err)
+ }
+
+ // Instantiate a WebAssembly module that imports the "log" function defined
+ // in "env" and exports "memory" and functions we'll use in this example.
+ fn.mod, err = fn.r.InstantiateModuleFromBinary(ctx, greetWasm)
+ if err != nil {
+ return fmt.Errorf("failed to instantiate wasm module: %v", err)
+ }
+
+ // Get references to WebAssembly functions we'll use in this example.
+ fn.greeting = fn.mod.ExportedFunction(wasmFunctionName)
+ fn.allocate = fn.mod.ExportedFunction(wasmAllocateFunctionName)
+ fn.deallocate = fn.mod.ExportedFunction(wasmDeallocateFunctionName)
+ return nil
+}
+
+// ProcessElement processes a string calling a wasm function written in Rust
+// This example is derived from
+// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
+func (fn *embeddedWasmFn) ProcessElement(ctx context.Context, s string) (string, error) {
+
+ // We need to compute the size of s to use Rust's memory allocator.
+ size := uint64(len(s))
+
+ // Instead of an arbitrary memory offset, use Rust's allocator. Notice
+ // there is nothing string-specific in this allocation function. The same
+ // function could be used to pass binary serialized data to Wasm.
+ results, err := fn.allocate.Call(ctx, size)
+ if err != nil {
+ return "", fmt.Errorf("error calling allocate: %w", err)
+ }
+ ptr := results[0]
+
+ // This pointer was allocated by Rust, but owned by Go, So, we have to
+ // deallocate it when finished; defer means that this statement will be called when the function exits
+ defer fn.deallocate.Call(ctx, ptr, size)
+
+ // The pointer is a linear memory offset, which is where we write the value of the DoFn's input element s.
+ if !fn.mod.Memory().Write(ctx, uint32(ptr), []byte(s)) {
+ return "", fmt.Errorf("Memory.Write(%d, %d) out of range of memory size %d",
+ ptr, size, fn.mod.Memory().Size(ctx))
+ }
+
+ // Finally, we get the greeting message "Hello" concatenated to the DoFn's input element s.
+ // This shows how to read-back something allocated by Rust.
+ ptrSize, err := fn.greeting.Call(ctx, ptr, size)
+ resultPtr := uint32(ptrSize[0] >> 32)
+ resultSize := uint32(ptrSize[0])
+
+ // This pointer was allocated by Rust, but owned by Go, So, we have to
+ // deallocate it when finished; again defer flags Go to execute this statement when the function exits
+ defer fn.deallocate.Call(ctx, uint64(resultPtr), uint64(resultSize))
+
+ // The pointer is a linear memory offset, which is where we wrote the results of the string concatenation.
+ bytes, ok := fn.mod.Memory().Read(ctx, resultPtr, resultSize)
+ if !ok {
+ return "", fmt.Errorf("Memory.Read(%d, %d) out of range of memory size %d",
+ resultPtr, resultSize, fn.mod.Memory().Size(ctx))
+ }
+
+ // bytes contains our final result that we emit into the output PCollection
+ return string(bytes), nil
+}
+
+// Teardown the wazero.Runtime during the DoFn teardown lifecycle
+func (fn *embeddedWasmFn) Teardown(ctx context.Context) error {
+ // Typically we would proceed wazero.Runtime's Close method with Go's defer keyword, just after instantiation.
+ // However, we need to keep the property in memory until the end of the DoFn lifecycle
+ if err := fn.r.Close(ctx); err != nil {
+ return fmt.Errorf("failed to close runtime: %w", err)
+ }
+ return nil
+}
+
+// logString is an exported function to the wasm module that logs to console output.
+func logString(ctx context.Context, m api.Module, offset, byteCount uint32) {
+ buf, ok := m.Memory().Read(ctx, offset, byteCount)
+ if !ok {
+ log.Fatalf(ctx, "Memory.Read(%d, %d) out of range", offset, byteCount)
+ }
+ log.Info(ctx, string(buf))
+}