You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/07/15 19:42:08 UTC

[beam] branch master updated: Collect heap profile on OOM on Dataflow (#22225)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 23a85351f82 Collect heap profile on OOM on Dataflow (#22225)
23a85351f82 is described below

commit 23a85351f82079c1853daff464c1b69bedbc4f88
Author: Danny McCormick <da...@google.com>
AuthorDate: Fri Jul 15 15:42:01 2022 -0400

    Collect heap profile on OOM on Dataflow (#22225)
---
 sdks/go/container/boot.go                          |  20 ++-
 .../beam/core/runtime/harness/diagnostics_hook.go  |  58 +++++++++
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |   6 +
 sdks/go/pkg/beam/core/runtime/harness/init/init.go |  16 +++
 sdks/go/pkg/beam/runners/dataflow/dataflow.go      |   2 +-
 sdks/go/pkg/beam/util/diagnostics/diagnostics.go   | 134 +++++++++++++++++++++
 sdks/go/pkg/beam/util/harnessopts/heap_dump.go     |  39 ++++++
 .../go/pkg/beam/util/harnessopts/heap_dump_test.go |  50 ++++++++
 sdks/go/pkg/beam/util/syscallx/syscall_default.go  |   4 +
 sdks/go/pkg/beam/util/syscallx/syscall_linux.go    |  25 +++-
 sdks/go/test/integration/integration.go            |  10 ++
 .../integration/primitives/heap_dump.go}           |  46 ++++---
 .../test/integration/primitives/heap_dump_test.go  |  66 ++++++++++
 sdks/go/test/run_validatesrunner_tests.sh          |   5 +-
 14 files changed, 455 insertions(+), 26 deletions(-)

diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index 71f437a2a67..add38bb4775 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -17,6 +17,7 @@ package main
 
 import (
 	"context"
+	"encoding/json"
 	"errors"
 	"flag"
 	"fmt"
@@ -25,11 +26,16 @@ import (
 	"os"
 	"path/filepath"
 	"strings"
+	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+	// Import gcs filesystem so that it can be used to upload heap dumps
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
 	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
 	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
 )
@@ -115,7 +121,19 @@ func main() {
 		os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " "))
 	}
 
-	log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+	err = execx.Execute(prog, args...)
+
+	if err != nil {
+		var opt runtime.RawOptionsWrapper
+		err := json.Unmarshal([]byte(options), &opt)
+		if err == nil {
+			if tempLocation, ok := opt.Options.Options["temp_location"]; ok {
+				diagnostics.UploadHeapProfile(ctx, fmt.Sprintf("%v/heapProfiles/profile-%v-%d", strings.TrimSuffix(tempLocation, "/"), *id, time.Now().Unix()))
+			}
+		}
+	}
+
+	log.Fatalf("User program exited: %v", err)
 }
 
 func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, error) {
diff --git a/sdks/go/pkg/beam/core/runtime/harness/diagnostics_hook.go b/sdks/go/pkg/beam/core/runtime/harness/diagnostics_hook.go
new file mode 100644
index 00000000000..afab350c39a
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/harness/diagnostics_hook.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package harness
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
+)
+
+var (
+	samplingFrequencySeconds   int = 1
+	maxTimeBetweenDumpsSeconds int = 60
+)
+
+func init() {
+	hf := func(opts []string) hooks.Hook {
+		return hooks.Hook{
+			Init: func(ctx context.Context) (context.Context, error) {
+				if len(opts) == 0 {
+					return ctx, nil
+				}
+				if len(opts) < 2 || len(opts) > 2 {
+					return ctx, fmt.Errorf("expected 2 options, got %v: %v", len(opts), opts)
+				}
+
+				heapProfileSamplingFrequencySeconds, err := strconv.Atoi(opts[0])
+				if err != nil {
+					return nil, err
+				}
+				heapProfileMaxTimeBetweenDumpsSeconds, err := strconv.Atoi(opts[1])
+				if err != nil {
+					return nil, err
+				}
+				samplingFrequencySeconds = heapProfileSamplingFrequencySeconds
+				maxTimeBetweenDumpsSeconds = heapProfileMaxTimeBetweenDumpsSeconds
+
+				return ctx, nil
+			},
+		}
+	}
+	hooks.RegisterHook("beam:go:hook:diagnostics:heapDump", hf)
+}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 4e4fe468ec6..7d4ac51537f 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -26,6 +26,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
@@ -33,6 +34,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
 	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
 	"github.com/golang/protobuf/proto"
 	"google.golang.org/grpc"
@@ -70,6 +72,10 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
 		return err
 	}
 
+	if tempLocation := beam.PipelineOptions.Get("temp_location"); tempLocation != "" && samplingFrequencySeconds > 0 {
+		go diagnostics.SampleForHeapProfile(ctx, samplingFrequencySeconds, maxTimeBetweenDumpsSeconds)
+	}
+
 	recordHeader()
 
 	// Connect to FnAPI control server. Receive and execute work.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/init/init.go b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
index 0c52a007b5c..6dd2f8157c7 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/init/init.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
@@ -31,7 +31,9 @@ import (
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/syscallx"
 )
 
 var (
@@ -110,6 +112,10 @@ func hook() {
 	ctx, cancelFn := context.WithCancel(context.Background())
 	defer cancelFn()
 	ctx = grpcx.WriteWorkerID(ctx, *id)
+	memLimit := memoryLimit()
+	if err := syscallx.SetProcessMemoryCeiling(memLimit, memLimit); err != nil && err != syscallx.ErrUnsupported {
+		fmt.Println("Error Setting Rlimit ", err)
+	}
 	if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil {
 		fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err)
 		switch ShutdownMode {
@@ -127,3 +133,13 @@ func hook() {
 		time.Sleep(time.Hour)
 	}
 }
+
+// memoryLimits returns 90% of the physical memory on the machine. If it cannot determine
+// that value, it returns 2GB. This is an imperfect heuristic. It aims to
+// ensure there is enough memory for the process without causing an OOM.
+func memoryLimit() uint64 {
+	if size, err := syscallx.PhysicalMemorySize(); err == nil {
+		return (size * 90) / 100
+	}
+	return 2 << 30
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 9d4a1b02bdc..bd233750495 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -101,6 +101,7 @@ func init() {
 //
 // New flags that are already put into pipeline options
 // should be added to this map.
+// Don't filter temp_location since we need this included in PipelineOptions to correctly upload heap dumps.
 var flagFilter = map[string]bool{
 	"dataflow_endpoint":              true,
 	"staging_location":               true,
@@ -117,7 +118,6 @@ var flagFilter = map[string]bool{
 	"network":                        true,
 	"subnetwork":                     true,
 	"no_use_public_ips":              true,
-	"temp_location":                  true,
 	"template_location":              true,
 	"worker_machine_type":            true,
 	"min_cpu_platform":               true,
diff --git a/sdks/go/pkg/beam/util/diagnostics/diagnostics.go b/sdks/go/pkg/beam/util/diagnostics/diagnostics.go
new file mode 100644
index 00000000000..2748072e73f
--- /dev/null
+++ b/sdks/go/pkg/beam/util/diagnostics/diagnostics.go
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package diagnostics is a beam internal package that contains code for writing and uploading
+// diagnostic info (e.g. heap profiles) about the worker process.
+// This package is not intended for end user use and can change at any time.
+package diagnostics
+
+import (
+	"bufio"
+	"context"
+	"errors"
+	"os"
+	"runtime"
+	"runtime/pprof"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+)
+
+const (
+	hProfLoc     = "/tmp/hProf"
+	tempHProfLoc = "/tmp/hProf.tmp"
+)
+
+// SampleForHeapProfile checks every second if it should take a heap profile, and if so
+// it takes one and saves it to hProfLoc. A profile will be taken if either:
+// (1) the amount of memory allocated has increased since the last heap profile was taken or
+// (2) it has been 60 seconds since the last heap profile was taken
+func SampleForHeapProfile(ctx context.Context, samplingFrequencySeconds, maxTimeBetweenDumpsSeconds int) {
+	var maxAllocatedSoFar uint64
+	samplesSkipped := 0
+	for {
+		var m runtime.MemStats
+		runtime.ReadMemStats(&m)
+		if m.Alloc > maxAllocatedSoFar || (samplesSkipped+1)*samplingFrequencySeconds > maxTimeBetweenDumpsSeconds {
+			samplesSkipped = 0
+			maxAllocatedSoFar = m.Alloc
+			err := saveHeapProfile()
+			if err != nil {
+				log.Warnf(ctx, "Failed to save heap profile. This will not affect pipeline execution, but may make it harder to diagnose memory issues: %v", err)
+			}
+		} else {
+			samplesSkipped++
+		}
+		time.Sleep(time.Duration(samplingFrequencySeconds) * time.Second)
+	}
+}
+
+func saveHeapProfile() error {
+	// Write to a .tmp file before moving to final location to ensure
+	// that OOM doesn't disrupt this flow.
+	// Try removing temp file in case we ran into an error previously
+	if err := os.Remove(tempHProfLoc); err != nil && !errors.Is(err, os.ErrNotExist) {
+		return err
+	}
+	fd, err := os.Create(tempHProfLoc)
+	if err != nil {
+		return err
+	}
+	defer fd.Close()
+	buf := bufio.NewWriterSize(fd, 1<<20) // use 1MB buffer
+
+	if err := pprof.WriteHeapProfile(buf); err != nil {
+		return err
+	}
+
+	if err := buf.Flush(); err != nil {
+		return err
+	}
+
+	if err := os.Remove(hProfLoc); err != nil && !errors.Is(err, os.ErrNotExist) {
+		return err
+	}
+
+	return os.Rename(tempHProfLoc, hProfLoc)
+}
+
+// UploadHeapProfile checks if a heap profile is available and, if so, uploads it to dest.
+// It will first check hProfLoc for the heap profile and then it will
+// check tempHProfLoc if no file exists at hProfLoc.
+//
+// To examine, download the file and run: `go tool pprof -http=:8082 path/to/profile`
+func UploadHeapProfile(ctx context.Context, dest string) error {
+	hProf, err := os.Open(hProfLoc)
+	if err != nil {
+		if errors.Is(err, os.ErrNotExist) {
+			hProf, err = os.Open(tempHProfLoc)
+			if err != nil {
+				if errors.Is(err, os.ErrNotExist) {
+					return nil
+				}
+				return err
+			}
+		}
+	}
+	defer hProf.Close()
+	hProfReader := bufio.NewReader(hProf)
+
+	fs, err := filesystem.New(ctx, dest)
+	if err != nil {
+		return err
+	}
+	defer fs.Close()
+	fd, err := fs.OpenWrite(ctx, dest)
+	if err != nil {
+		return err
+	}
+	buf := bufio.NewWriterSize(fd, 1<<20) // use 1MB buffer
+
+	_, err = hProfReader.WriteTo(buf)
+	if err != nil {
+		return err
+	}
+
+	if err := buf.Flush(); err != nil {
+		return err
+	}
+
+	return fd.Close()
+}
diff --git a/sdks/go/pkg/beam/util/harnessopts/heap_dump.go b/sdks/go/pkg/beam/util/harnessopts/heap_dump.go
new file mode 100644
index 00000000000..06536a977d5
--- /dev/null
+++ b/sdks/go/pkg/beam/util/harnessopts/heap_dump.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package harnessopts
+
+import (
+	"fmt"
+	"strconv"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
+)
+
+const (
+	diagnosticsHook = "beam:go:hook:diagnostics:heapDump"
+)
+
+// HeapDumpFrequency sets the sampling frequency for how often the diagnostics service checks if it
+// should take a heap dump and the maximum allowable time between heap dumps.
+// Setting the sampling frequency to <=0 disables the heap dump checks.
+// The default value for the sampling frequency is 1 second, the default max time is 60 seconds.
+func HeapDumpFrequency(samplingFrequencySeconds, maxTimeBetweenDumpsSeconds int) error {
+	if maxTimeBetweenDumpsSeconds < samplingFrequencySeconds {
+		return fmt.Errorf("max time between dumps %v should be greater than or equal to sampling frequence %v", maxTimeBetweenDumpsSeconds, samplingFrequencySeconds)
+	}
+	// The hook itself is defined in beam/core/runtime/harness/init/diagnostics_hook.go
+	return hooks.EnableHook(diagnosticsHook, strconv.Itoa(samplingFrequencySeconds), strconv.Itoa(maxTimeBetweenDumpsSeconds))
+}
diff --git a/sdks/go/pkg/beam/util/harnessopts/heap_dump_test.go b/sdks/go/pkg/beam/util/harnessopts/heap_dump_test.go
new file mode 100644
index 00000000000..ab083e0bc5b
--- /dev/null
+++ b/sdks/go/pkg/beam/util/harnessopts/heap_dump_test.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package harnessopts
+
+import (
+	"testing"
+
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness" // Imports the cache hook
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
+)
+
+func TestHeapDumpFrequency(t *testing.T) {
+	err := HeapDumpFrequency(10, 30)
+	if err != nil {
+		t.Errorf("HeapDumpFrequency failed when it should have succeeded, got %v", err)
+	}
+	ok, opts := hooks.IsEnabled(diagnosticsHook)
+	if !ok {
+		t.Fatalf("HeapDumpFrequency hook is not enabled")
+	}
+	if len(opts) != 2 {
+		t.Errorf("num opts mismatch, got %v, want 2", len(opts))
+	}
+	if opts[0] != "10" {
+		t.Errorf("cache size option mismatch, got %v, want %v", opts[0], 10)
+	}
+	if opts[1] != "30" {
+		t.Errorf("cache size option mismatch, got %v, want %v", opts[1], 30)
+	}
+}
+
+func TestHeapDumpFrequency_Bad(t *testing.T) {
+	err := HeapDumpFrequency(5, 4)
+	if err == nil {
+		t.Errorf("HeapDumpFrequency succeeded when it should have failed")
+	}
+}
diff --git a/sdks/go/pkg/beam/util/syscallx/syscall_default.go b/sdks/go/pkg/beam/util/syscallx/syscall_default.go
index 55756d0dbd4..67d188d8590 100644
--- a/sdks/go/pkg/beam/util/syscallx/syscall_default.go
+++ b/sdks/go/pkg/beam/util/syscallx/syscall_default.go
@@ -27,3 +27,7 @@ func PhysicalMemorySize() (uint64, error) {
 func FreeDiskSpace(path string) (uint64, error) {
 	return 0, ErrUnsupported
 }
+
+func SetProcessMemoryCeiling(softCeiling, hardCeiling uint64) error {
+	return ErrUnsupported
+}
diff --git a/sdks/go/pkg/beam/util/syscallx/syscall_linux.go b/sdks/go/pkg/beam/util/syscallx/syscall_linux.go
index 379437ad0a1..f18ee87ec20 100644
--- a/sdks/go/pkg/beam/util/syscallx/syscall_linux.go
+++ b/sdks/go/pkg/beam/util/syscallx/syscall_linux.go
@@ -18,12 +18,12 @@
 
 package syscallx
 
-import "syscall"
+import "golang.org/x/sys/unix"
 
 // PhysicalMemorySize returns the total physical memory size.
 func PhysicalMemorySize() (uint64, error) {
-	var info syscall.Sysinfo_t
-	if err := syscall.Sysinfo(&info); err != nil {
+	var info unix.Sysinfo_t
+	if err := unix.Sysinfo(&info); err != nil {
 		return 0, err
 	}
 	return info.Totalram, nil
@@ -31,9 +31,24 @@ func PhysicalMemorySize() (uint64, error) {
 
 // FreeDiskSpace returns the free disk space for a given path.
 func FreeDiskSpace(path string) (uint64, error) {
-	var stat syscall.Statfs_t
-	if err := syscall.Statfs(path, &stat); err != nil {
+	var stat unix.Statfs_t
+	if err := unix.Statfs(path, &stat); err != nil {
 		return 0, err
 	}
 	return stat.Bavail * uint64(stat.Bsize), nil
 }
+
+func SetProcessMemoryCeiling(softCeiling, hardCeiling uint64) error {
+	var rLimit unix.Rlimit
+
+	err := unix.Getrlimit(unix.RLIMIT_NOFILE, &rLimit)
+
+	if err != nil {
+		return err
+	}
+
+	rLimit.Max = hardCeiling
+	rLimit.Cur = softCeiling
+
+	return unix.Setrlimit(unix.RLIMIT_NOFILE, &rLimit)
+}
diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go
index e81993ae460..1c35247ac32 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -86,6 +86,8 @@ var directFilters = []string{
 	"TestDrain",
 	// FhirIO currently only supports Dataflow runner
 	"TestFhirIO.*",
+	// OOMs currently only lead to heap dumps on Dataflow runner
+	"TestOomParDo",
 }
 
 var portableFilters = []string{
@@ -104,6 +106,8 @@ var portableFilters = []string{
 	"TestDrain",
 	// FhirIO currently only supports Dataflow runner
 	"TestFhirIO.*",
+	// OOMs currently only lead to heap dumps on Dataflow runner
+	"TestOomParDo",
 }
 
 var flinkFilters = []string{
@@ -120,6 +124,8 @@ var flinkFilters = []string{
 	"TestDrain",
 	// FhirIO currently only supports Dataflow runner
 	"TestFhirIO.*",
+	// OOMs currently only lead to heap dumps on Dataflow runner
+	"TestOomParDo",
 }
 
 var samzaFilters = []string{
@@ -141,6 +147,8 @@ var samzaFilters = []string{
 	"TestDrain",
 	// FhirIO currently only supports Dataflow runner
 	"TestFhirIO.*",
+	// OOMs currently only lead to heap dumps on Dataflow runner
+	"TestOomParDo",
 }
 
 var sparkFilters = []string{
@@ -163,6 +171,8 @@ var sparkFilters = []string{
 	"TestDrain",
 	// FhirIO currently only supports Dataflow runner
 	"TestFhirIO.*",
+	// OOMs currently only lead to heap dumps on Dataflow runner
+	"TestOomParDo",
 }
 
 var dataflowFilters = []string{
diff --git a/sdks/go/pkg/beam/util/syscallx/syscall_linux.go b/sdks/go/test/integration/primitives/heap_dump.go
similarity index 56%
copy from sdks/go/pkg/beam/util/syscallx/syscall_linux.go
copy to sdks/go/test/integration/primitives/heap_dump.go
index 379437ad0a1..9d71223629b 100644
--- a/sdks/go/pkg/beam/util/syscallx/syscall_linux.go
+++ b/sdks/go/test/integration/primitives/heap_dump.go
@@ -13,27 +13,39 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//go:build linux
-// +build linux
+package primitives
 
-package syscallx
+import (
+	"context"
+	"time"
 
-import "syscall"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+)
 
-// PhysicalMemorySize returns the total physical memory size.
-func PhysicalMemorySize() (uint64, error) {
-	var info syscall.Sysinfo_t
-	if err := syscall.Sysinfo(&info); err != nil {
-		return 0, err
+func oomFn(ctx context.Context, elm int, emit func(int)) {
+	size := 1 << 25
+	// Simulate a slow memory leak
+	for {
+		abc := make([]int64, size)
+		log.Debugf(ctx, "abc %v", abc)
+		time.Sleep(5 * time.Second)
+		log.Debugf(ctx, "abc %v", abc)
+		if size > 1<<40 {
+			break
+		}
+
+		size = int(float64(size) * 1.2)
 	}
-	return info.Totalram, nil
+	emit(elm)
 }
 
-// FreeDiskSpace returns the free disk space for a given path.
-func FreeDiskSpace(path string) (uint64, error) {
-	var stat syscall.Statfs_t
-	if err := syscall.Statfs(path, &stat); err != nil {
-		return 0, err
-	}
-	return stat.Bavail * uint64(stat.Bsize), nil
+// OomParDo tests a DoFn that OOMs.
+func OomParDo() *beam.Pipeline {
+	p, s := beam.NewPipelineWithRoot()
+
+	in := beam.Create(s, 1)
+	beam.ParDo(s, oomFn, in)
+
+	return p
 }
diff --git a/sdks/go/test/integration/primitives/heap_dump_test.go b/sdks/go/test/integration/primitives/heap_dump_test.go
new file mode 100644
index 00000000000..27ccf4dbf2b
--- /dev/null
+++ b/sdks/go/test/integration/primitives/heap_dump_test.go
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+	"github.com/apache/beam/sdks/v2/go/test/integration"
+)
+
+func TestOomParDo(t *testing.T) {
+	integration.CheckFilters(t)
+	if flag.Lookup("temp_location") == nil {
+		t.Fatalf("A temp_location must be provided to correctly run TestOomParDo")
+	}
+	tempLocation := flag.Lookup("temp_location").Value.(flag.Getter).Get().(string)
+	if tempLocation == "" {
+		t.Fatalf("A temp_location must be provided to correctly run TestOomParDo")
+	}
+	dumpLocation := fmt.Sprintf("%v/heapProfiles/*", strings.TrimSuffix(tempLocation, "/"))
+	ctx := context.Background()
+
+	fs, err := filesystem.New(ctx, dumpLocation)
+	if err != nil {
+		t.Fatalf("Failed to connect to filesystem: %v", err)
+	}
+	defer fs.Close()
+
+	files, err := fs.List(ctx, dumpLocation)
+	if err != nil {
+		t.Fatalf("Failed to connect to filesystem: %v", err)
+	}
+	startFiles := len(files)
+
+	ptest.Run(OomParDo())
+
+	files, err = fs.List(ctx, dumpLocation)
+	if err != nil {
+		t.Fatalf("Failed to connect to filesystem: %v", err)
+	}
+	endFiles := len(files)
+
+	if startFiles >= endFiles {
+		t.Fatalf("No new heap dumps generated on OOM. There were %v dumps before running and %v after", startFiles, endFiles)
+	}
+}
diff --git a/sdks/go/test/run_validatesrunner_tests.sh b/sdks/go/test/run_validatesrunner_tests.sh
index cdc41e50b8e..579809ea0f4 100755
--- a/sdks/go/test/run_validatesrunner_tests.sh
+++ b/sdks/go/test/run_validatesrunner_tests.sh
@@ -91,6 +91,7 @@ SIMULTANEOUS=3
 
 # Where to store integration test outputs.
 GCS_LOCATION=gs://temp-storage-for-end-to-end-tests
+GCS_SUBFOLDER="test$RANDOM"
 
 # Project for the container and integration test
 PROJECT=apache-beam-testing
@@ -409,8 +410,8 @@ ARGS="$ARGS --project=$DATAFLOW_PROJECT"
 ARGS="$ARGS --region=$REGION"
 ARGS="$ARGS --environment_type=DOCKER"
 ARGS="$ARGS --environment_config=$CONTAINER:$TAG"
-ARGS="$ARGS --staging_location=$GCS_LOCATION/staging-validatesrunner-test"
-ARGS="$ARGS --temp_location=$GCS_LOCATION/temp-validatesrunner-test"
+ARGS="$ARGS --staging_location=$GCS_LOCATION/staging-validatesrunner-test/$GCS_SUBFOLDER"
+ARGS="$ARGS --temp_location=$GCS_LOCATION/temp-validatesrunner-test/$GCS_SUBFOLDER"
 ARGS="$ARGS --dataflow_worker_jar=$DATAFLOW_WORKER_JAR"
 ARGS="$ARGS --endpoint=$ENDPOINT"
 if [[ -n "$TEST_EXPANSION_ADDR" ]]; then