You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/11/15 01:34:41 UTC

[beam] branch master updated: Implement embedded WebAssembly example (#24081)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 689e70b5131 Implement embedded WebAssembly example (#24081)
689e70b5131 is described below

commit 689e70b5131620540faf52e2f1e2dca7a36f269d
Author: Damon <da...@users.noreply.github.com>
AuthorDate: Mon Nov 14 17:34:29 2022 -0800

    Implement embedded WebAssembly example (#24081)
---
 sdks/go.mod                      |   1 +
 sdks/go.sum                      |   2 +
 sdks/go/examples/wasm/Cargo.toml |  40 ++++++++
 sdks/go/examples/wasm/README.md  | 137 ++++++++++++++++++++++++++
 sdks/go/examples/wasm/greet.rs   | 139 +++++++++++++++++++++++++++
 sdks/go/examples/wasm/greet.wasm | Bin 0 -> 62570 bytes
 sdks/go/examples/wasm/wasm.go    | 201 +++++++++++++++++++++++++++++++++++++++
 7 files changed, 520 insertions(+)

diff --git a/sdks/go.mod b/sdks/go.mod
index de8e2a9b3ff..5db5f403026 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -123,6 +123,7 @@ require (
 	github.com/shabbyrobe/gocovmerge v0.0.0-20180507124511-f6ea450bfb63 // indirect
 	github.com/sirupsen/logrus v1.8.1 // indirect
 	github.com/spf13/pflag v1.0.5 // indirect
+	github.com/tetratelabs/wazero v1.0.0-pre.3 // indirect
 	go.opencensus.io v0.24.0 // indirect
 	golang.org/x/tools v0.1.12 // indirect
 	golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
diff --git a/sdks/go.sum b/sdks/go.sum
index 5466c737dd0..0f8a4d0728f 100644
--- a/sdks/go.sum
+++ b/sdks/go.sum
@@ -869,6 +869,8 @@ github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG
 github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I=
 github.com/testcontainers/testcontainers-go v0.15.0 h1:3Ex7PUGFv0b2bBsdOv6R42+SK2qoZnWBd21LvZYhUtQ=
 github.com/testcontainers/testcontainers-go v0.15.0/go.mod h1:PkohMRH2X8Hib0IWtifVexDfLPVT+tb5E9hsf7cW12w=
+github.com/tetratelabs/wazero v1.0.0-pre.3 h1:Z5fbogMUGcERzaQb9mQU8+yJSy0bVvv2ce3dfR4wcZg=
+github.com/tetratelabs/wazero v1.0.0-pre.3/go.mod h1:M8UDNECGm/HVjOfq0EOe4QfCY9Les1eq54IChMLETbc=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
diff --git a/sdks/go/examples/wasm/Cargo.toml b/sdks/go/examples/wasm/Cargo.toml
new file mode 100644
index 00000000000..50fbebbb099
--- /dev/null
+++ b/sdks/go/examples/wasm/Cargo.toml
@@ -0,0 +1,40 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+# File courtesy of
+# https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/testdata/Cargo.toml
+[package]
+name = "greet"
+version = "0.1.0"
+edition = "2021"
+
+[lib]
+# cdylib builds a a %.wasm file with `cargo build --release --target wasm32-unknown-unknown`
+crate-type = ["cdylib"]
+name = "greet"
+path = "greet.rs"
+
+[dependencies]
+# wee_aloc is a WebAssembly optimized allocator, which is needed to use non-numeric types like strings.
+# See https://docs.rs/wee_alloc/latest/wee_alloc/
+wee_alloc = "0.4.5"
+
+# Below settings dramatically reduce wasm output size
+# See https://rustwasm.github.io/book/reference/code-size.html#optimizing-builds-for-code-sizewasm-opt -Oz -o
+# See https://doc.rust-lang.org/cargo/reference/profiles.html#codegen-units
+[profile.release]
+opt-level = "z"
+lto = true
+codegen-units = 1
\ No newline at end of file
diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md
new file mode 100644
index 00000000000..ad25ce87771
--- /dev/null
+++ b/sdks/go/examples/wasm/README.md
@@ -0,0 +1,137 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+# Overview
+
+wasm is an **_EXPERIMENTAL_** simple example that loads and executes a wasm file function.
+greet.wasm, Cargo.toml and greet.rs were copied from the example provided by the wazero library:
+https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
+
+# Usage
+
+To run this example in various runners, the following assumes:
+
+```
+OUTPUT=<path to output i.e. /tmp/example/output>
+git clone https://github.com/apache/beam
+BEAM_HOME=$(pwd)/beam
+cd $BEAM_HOME/sdks
+```
+
+## Direct Runner execution
+
+To execute this example on the direct runner:
+
+```shell
+go run ./go/examples/wasm --output=$OUTPUT
+```
+
+## Flink Portable Runner
+
+The following describes how to execute this example on the flink portable runner.
+
+### REQUIREMENTS:
+
+- [Docker](https://docker.io); MacOS users may consider alternative: https://github.com/abiosoft/colima
+- Google Cloud Storage (https://cloud.google.com/storage) or S3 (https://aws.amazon.com/s3/) bucket;
+NOTE this example was only tested on Google Cloud Storage
+
+#### 0. Set OUTPUT to a Cloud storage bucket path
+
+The example below shows Google Cloud Storage.
+```
+OUTPUT=gs://<my bucket>/greet
+```
+
+#### 1. Find the latest flink runner version
+
+```shell
+cd $BEAM_HOME
+./gradlew :runners:flink:properties --property flink_versions
+```
+
+Expected output should include the following, from which you acquire the latest flink runner version.
+
+```shell
+'flink_versions: 1.12,1.13,1.14,1.15'
+```
+
+#### 2. Set to the latest flink runner version i.e. 1.15
+
+```shell
+FLINK_VERSION=1.15
+```
+
+#### 3. In a separate terminal, start the flink runner (It should take a few minutes on the first execution)
+```shell
+cd $BEAM_HOME
+./gradlew :runners:flink:$FLINK_VERSION:job-server:runShadow
+```
+
+Note the JobService host and port from the output, similar to:
+
+```shell
+INFO: JobService started on localhost:8099
+```
+
+#### 4. Set the JOB_SERVICE variable from the aforementioned output
+
+```shell
+JOB_SERVICE=localhost:8099
+```
+
+#### 5. Execute this example using the portable flink runner
+
+```shell
+cd $BEAM_HOME/sdks
+go run ./go/examples/wasm --runner=universal --endpoint=$JOB_SERVICE --output=$OUTPUT --environment_config=apache/beam_go_sdk:latest
+```
+
+## Dataflow Runner
+
+The following describes how to execute this example on Dataflow.
+
+### REQUIREMENTS:
+
+- Google Cloud Storage (https://cloud.google.com/storage) bucket
+
+#### 1. Set OUTPUT to a Cloud storage bucket path
+
+The example below shows Google Cloud Storage.
+
+```shell
+OUTPUT=gs://<my bucket>/greet
+```
+
+#### 2. Set additional variables
+
+```shell
+PROJECT=<project id>
+REGION=<region>
+STAGING=gs://<my bucket>/staging
+NETWORK=<network>
+SUBNETWORK=regions/$REGION/subnetworks/<subnetwork>
+```
+
+#### 3. Execute this example using Dataflow
+
+```
+cd $BEAM_HOME/sdks
+go run ./go/examples/wasm --runner=dataflow --output=$OUTPUT --environment_config=apache/beam_go_sdk:latest \
+    --project=$PROJECT --region=$REGION --network=$NETWORK --subnetwork=$SUBNETWORK --staging_location=$STAGING
+```
\ No newline at end of file
diff --git a/sdks/go/examples/wasm/greet.rs b/sdks/go/examples/wasm/greet.rs
new file mode 100644
index 00000000000..dfd57a260b9
--- /dev/null
+++ b/sdks/go/examples/wasm/greet.rs
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Copied from:
+// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/testdata/greet.rs
+extern crate alloc;
+extern crate core;
+extern crate wee_alloc;
+
+use alloc::vec::Vec;
+use std::mem::MaybeUninit;
+use std::slice;
+
+/// Prints a greeting to the console using [`log`].
+fn greet(name: &String) {
+    log(&["wasm >> ", &greeting(name)].concat());
+}
+
+/// Gets a greeting for the name.
+fn greeting(name: &String) -> String {
+    return ["Hello, ", &name, "!"].concat();
+}
+
+/// Logs a message to the console using [`_log`].
+fn log(message: &String) {
+    unsafe {
+        let (ptr, len) = string_to_ptr(message);
+        _log(ptr, len);
+    }
+}
+
+#[link(wasm_import_module = "env")]
+extern "C" {
+    /// WebAssembly import which prints a string (linear memory offset,
+    /// byteCount) to the console.
+    ///
+    /// Note: This is not an ownership transfer: Rust still owns the pointer
+    /// and ensures it isn't deallocated during this call.
+    #[link_name = "log"]
+    fn _log(ptr: u32, size: u32);
+}
+
+/// WebAssembly export that accepts a string (linear memory offset, byteCount)
+/// and calls [`greet`].
+///
+/// Note: The input parameters were returned by [`allocate`]. This is not an
+/// ownership transfer, so the inputs can be reused after this call.
+#[cfg_attr(all(target_arch = "wasm32"), export_name = "greet")]
+#[no_mangle]
+pub unsafe extern "C" fn _greet(ptr: u32, len: u32) {
+    greet(&ptr_to_string(ptr, len));
+}
+
+/// WebAssembly export that accepts a string (linear memory offset, byteCount)
+/// and returns a pointer/size pair packed into a u64.
+///
+/// Note: The return value is leaked to the caller, so it must call
+/// [`deallocate`] when finished.
+/// Note: This uses a u64 instead of two result values for compatibility with
+/// WebAssembly 1.0.
+#[cfg_attr(all(target_arch = "wasm32"), export_name = "greeting")]
+#[no_mangle]
+pub unsafe extern "C" fn _greeting(ptr: u32, len: u32) -> u64 {
+    let name = &ptr_to_string(ptr, len);
+    let g = greeting(name);
+    let (ptr, len) = string_to_ptr(&g);
+    // Note: This changes ownership of the pointer to the external caller. If
+    // we didn't call forget, the caller would read back a corrupt value. Since
+    // we call forget, the caller must deallocate externally to prevent leaks.
+    std::mem::forget(g);
+    return ((ptr as u64) << 32) | len as u64;
+}
+
+/// Returns a string from WebAssembly compatible numeric types representing
+/// its pointer and length.
+unsafe fn ptr_to_string(ptr: u32, len: u32) -> String {
+    let slice = slice::from_raw_parts_mut(ptr as *mut u8, len as usize);
+    let utf8 = std::str::from_utf8_unchecked_mut(slice);
+    return String::from(utf8);
+}
+
+/// Returns a pointer and size pair for the given string in a way compatible
+/// with WebAssembly numeric types.
+///
+/// Note: This doesn't change the ownership of the String. To intentionally
+/// leak it, use [`std::mem::forget`] on the input after calling this.
+unsafe fn string_to_ptr(s: &String) -> (u32, u32) {
+    return (s.as_ptr() as u32, s.len() as u32);
+}
+
+/// Set the global allocator to the WebAssembly optimized one.
+#[global_allocator]
+static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;
+
+/// WebAssembly export that allocates a pointer (linear memory offset) that can
+/// be used for a string.
+///
+/// This is an ownership transfer, which means the caller must call
+/// [`deallocate`] when finished.
+#[cfg_attr(all(target_arch = "wasm32"), export_name = "allocate")]
+#[no_mangle]
+pub extern "C" fn _allocate(size: u32) -> *mut u8 {
+    allocate(size as usize)
+}
+
+/// Allocates size bytes and leaks the pointer where they start.
+fn allocate(size: usize) -> *mut u8 {
+    // Allocate the amount of bytes needed.
+    let vec: Vec<MaybeUninit<u8>> = Vec::with_capacity(size);
+
+    // into_raw leaks the memory to the caller.
+    Box::into_raw(vec.into_boxed_slice()) as *mut u8
+}
+
+
+/// WebAssembly export that deallocates a pointer of the given size (linear
+/// memory offset, byteCount) allocated by [`allocate`].
+#[cfg_attr(all(target_arch = "wasm32"), export_name = "deallocate")]
+#[no_mangle]
+pub unsafe extern "C" fn _deallocate(ptr: u32, size: u32) {
+    deallocate(ptr as *mut u8, size as usize);
+}
+
+/// Retakes the pointer which allows its memory to be freed.
+unsafe fn deallocate(ptr: *mut u8, size: usize) {
+    let _ = Vec::from_raw_parts(ptr, 0, size);
+}
\ No newline at end of file
diff --git a/sdks/go/examples/wasm/greet.wasm b/sdks/go/examples/wasm/greet.wasm
new file mode 100644
index 00000000000..03c31769e7d
Binary files /dev/null and b/sdks/go/examples/wasm/greet.wasm differ
diff --git a/sdks/go/examples/wasm/wasm.go b/sdks/go/examples/wasm/wasm.go
new file mode 100644
index 00000000000..b918f41feee
--- /dev/null
+++ b/sdks/go/examples/wasm/wasm.go
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// wasm is an EXPERIMENTAL simple example that loads and executes a wasm file function.
+// greet.wasm, Cargo.toml and greet.rs were copied from the example provided by the wazero library:
+// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
+//
+// New Concepts:
+// 1. Load a wasm file compiled from: cargo build --release --target wasm32-unknown-unknown
+// 2. Execute a wasm function within a DoFn
+package main
+
+import (
+	"context"
+	_ "embed"
+	"flag"
+	"fmt"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+	"github.com/tetratelabs/wazero"
+	"github.com/tetratelabs/wazero/api"
+)
+
+const (
+	wasmFunctionName           = "greeting"
+	wasmAllocateFunctionName   = "allocate"
+	wasmDeallocateFunctionName = "deallocate"
+)
+
+//go:embed greet.wasm
+var greetWasm []byte
+
+var (
+	output = flag.String("output", "", "Output file (required).")
+)
+
+func init() {
+	// register.DoFnXxY registers a struct DoFn so that it can be correctly
+	// serialized and does some optimization to avoid runtime reflection. Since
+	// embeddedWasmFn's ProcessElement func has 2 inputs (context.Context) and 2 outputs (string, error),
+	// we use register.DoFn2x2 and provide its input and output types as its constraints.
+	// Struct DoFns must be registered for a pipeline to run.
+	register.DoFn2x2[context.Context, string, string, error](&embeddedWasmFn{})
+}
+
+func preRun() error {
+	if *output == "" {
+		return fmt.Errorf("--output is required")
+	}
+	return nil
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+	ctx := context.Background()
+	if err := preRun(); err != nil {
+		log.Fatal(ctx, err)
+	}
+	if err := run(ctx); err != nil {
+		log.Fatal(ctx, err)
+	}
+}
+
+func run(ctx context.Context) error {
+	p, s := beam.NewPipelineWithRoot()
+
+	in := beam.Create(s, "Ada", "Lovelace", "World", "Beam", "Senior López")
+
+	out := beam.ParDo(s, &embeddedWasmFn{}, in)
+
+	textio.Write(s, *output, out)
+
+	if err := beamx.Run(ctx, p); err != nil {
+		return fmt.Errorf("failed to run pipeline: %v", err)
+	}
+	return nil
+}
+
+// Concept #2 wrap wasm function execution within a DoFn.
+// wasmFn wraps a DoFn to execute a Rust compiled wasm function
+type embeddedWasmFn struct {
+	r                              wazero.Runtime
+	mod                            api.Module
+	greeting, allocate, deallocate api.Function
+}
+
+// Setup loads and initializes the embedded wasm functions
+// Concept #1: Load a compiled wasm file []byte content and function.
+// This example is derived from
+// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
+func (fn *embeddedWasmFn) Setup(ctx context.Context) error {
+	// Create a new WebAssembly Runtime.
+	// Typically, a defer r.Close() would be called subsequently after.  Yet, we need to keep this in memory
+	// throughout the DoFn lifecycle after which we invoke r.Close(); see Teardown below.
+	fn.r = wazero.NewRuntime(ctx)
+
+	// Instantiate a Go-defined module named "env" that exports a function to
+	// log to the console.
+	_, err := fn.r.NewHostModuleBuilder("env").
+		NewFunctionBuilder().WithFunc(logString).Export("log").
+		Instantiate(ctx, fn.r)
+	if err != nil {
+		return fmt.Errorf("failed to instantiate host module: %w", err)
+	}
+
+	// Instantiate a WebAssembly module that imports the "log" function defined
+	// in "env" and exports "memory" and functions we'll use in this example.
+	fn.mod, err = fn.r.InstantiateModuleFromBinary(ctx, greetWasm)
+	if err != nil {
+		return fmt.Errorf("failed to instantiate wasm module: %v", err)
+	}
+
+	// Get references to WebAssembly functions we'll use in this example.
+	fn.greeting = fn.mod.ExportedFunction(wasmFunctionName)
+	fn.allocate = fn.mod.ExportedFunction(wasmAllocateFunctionName)
+	fn.deallocate = fn.mod.ExportedFunction(wasmDeallocateFunctionName)
+	return nil
+}
+
+// ProcessElement processes a string calling a wasm function written in Rust
+// This example is derived from
+// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
+func (fn *embeddedWasmFn) ProcessElement(ctx context.Context, s string) (string, error) {
+
+	// We need to compute the size of s to use Rust's memory allocator.
+	size := uint64(len(s))
+
+	// Instead of an arbitrary memory offset, use Rust's allocator. Notice
+	// there is nothing string-specific in this allocation function. The same
+	// function could be used to pass binary serialized data to Wasm.
+	results, err := fn.allocate.Call(ctx, size)
+	if err != nil {
+		return "", fmt.Errorf("error calling allocate: %w", err)
+	}
+	ptr := results[0]
+
+	// This pointer was allocated by Rust, but owned by Go, So, we have to
+	// deallocate it when finished; defer means that this statement will be called when the function exits
+	defer fn.deallocate.Call(ctx, ptr, size)
+
+	// The pointer is a linear memory offset, which is where we write the value of the DoFn's input element s.
+	if !fn.mod.Memory().Write(ctx, uint32(ptr), []byte(s)) {
+		return "", fmt.Errorf("Memory.Write(%d, %d) out of range of memory size %d",
+			ptr, size, fn.mod.Memory().Size(ctx))
+	}
+
+	// Finally, we get the greeting message "Hello" concatenated to the DoFn's input element s.
+	// This shows how to read-back something allocated by Rust.
+	ptrSize, err := fn.greeting.Call(ctx, ptr, size)
+	resultPtr := uint32(ptrSize[0] >> 32)
+	resultSize := uint32(ptrSize[0])
+
+	// This pointer was allocated by Rust, but owned by Go, So, we have to
+	// deallocate it when finished; again defer flags Go to execute this statement when the function exits
+	defer fn.deallocate.Call(ctx, uint64(resultPtr), uint64(resultSize))
+
+	// The pointer is a linear memory offset, which is where we wrote the results of the string concatenation.
+	bytes, ok := fn.mod.Memory().Read(ctx, resultPtr, resultSize)
+	if !ok {
+		return "", fmt.Errorf("Memory.Read(%d, %d) out of range of memory size %d",
+			resultPtr, resultSize, fn.mod.Memory().Size(ctx))
+	}
+
+	// bytes contains our final result that we emit into the output PCollection
+	return string(bytes), nil
+}
+
+// Teardown the wazero.Runtime during the DoFn teardown lifecycle
+func (fn *embeddedWasmFn) Teardown(ctx context.Context) error {
+	// Typically we would proceed wazero.Runtime's Close method with Go's defer keyword, just after instantiation.
+	// However, we need to keep the property in memory until the end of the DoFn lifecycle
+	if err := fn.r.Close(ctx); err != nil {
+		return fmt.Errorf("failed to close runtime: %w", err)
+	}
+	return nil
+}
+
+// logString is an exported function to the wasm module that logs to console output.
+func logString(ctx context.Context, m api.Module, offset, byteCount uint32) {
+	buf, ok := m.Memory().Read(ctx, offset, byteCount)
+	if !ok {
+		log.Fatalf(ctx, "Memory.Read(%d, %d) out of range", offset, byteCount)
+	}
+	log.Info(ctx, string(buf))
+}