You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/07/15 19:42:08 UTC
[beam] branch master updated: Collect heap profile on OOM on Dataflow (#22225)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 23a85351f82 Collect heap profile on OOM on Dataflow (#22225)
23a85351f82 is described below
commit 23a85351f82079c1853daff464c1b69bedbc4f88
Author: Danny McCormick <da...@google.com>
AuthorDate: Fri Jul 15 15:42:01 2022 -0400
Collect heap profile on OOM on Dataflow (#22225)
---
sdks/go/container/boot.go | 20 ++-
.../beam/core/runtime/harness/diagnostics_hook.go | 58 +++++++++
sdks/go/pkg/beam/core/runtime/harness/harness.go | 6 +
sdks/go/pkg/beam/core/runtime/harness/init/init.go | 16 +++
sdks/go/pkg/beam/runners/dataflow/dataflow.go | 2 +-
sdks/go/pkg/beam/util/diagnostics/diagnostics.go | 134 +++++++++++++++++++++
sdks/go/pkg/beam/util/harnessopts/heap_dump.go | 39 ++++++
.../go/pkg/beam/util/harnessopts/heap_dump_test.go | 50 ++++++++
sdks/go/pkg/beam/util/syscallx/syscall_default.go | 4 +
sdks/go/pkg/beam/util/syscallx/syscall_linux.go | 25 +++-
sdks/go/test/integration/integration.go | 10 ++
.../integration/primitives/heap_dump.go} | 46 ++++---
.../test/integration/primitives/heap_dump_test.go | 66 ++++++++++
sdks/go/test/run_validatesrunner_tests.sh | 5 +-
14 files changed, 455 insertions(+), 26 deletions(-)
diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index 71f437a2a67..add38bb4775 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -17,6 +17,7 @@ package main
import (
"context"
+ "encoding/json"
"errors"
"flag"
"fmt"
@@ -25,11 +26,16 @@ import (
"os"
"path/filepath"
"strings"
+ "time"
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+ // Import gcs filesystem so that it can be used to upload heap dumps
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
)
@@ -115,7 +121,19 @@ func main() {
os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " "))
}
- log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+ err = execx.Execute(prog, args...)
+
+ if err != nil {
+ var opt runtime.RawOptionsWrapper
+ err := json.Unmarshal([]byte(options), &opt)
+ if err == nil {
+ if tempLocation, ok := opt.Options.Options["temp_location"]; ok {
+ diagnostics.UploadHeapProfile(ctx, fmt.Sprintf("%v/heapProfiles/profile-%v-%d", strings.TrimSuffix(tempLocation, "/"), *id, time.Now().Unix()))
+ }
+ }
+ }
+
+ log.Fatalf("User program exited: %v", err)
}
func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, error) {
diff --git a/sdks/go/pkg/beam/core/runtime/harness/diagnostics_hook.go b/sdks/go/pkg/beam/core/runtime/harness/diagnostics_hook.go
new file mode 100644
index 00000000000..afab350c39a
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/harness/diagnostics_hook.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package harness
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
+)
+
+var (
+ samplingFrequencySeconds int = 1
+ maxTimeBetweenDumpsSeconds int = 60
+)
+
+func init() {
+ hf := func(opts []string) hooks.Hook {
+ return hooks.Hook{
+ Init: func(ctx context.Context) (context.Context, error) {
+ if len(opts) == 0 {
+ return ctx, nil
+ }
+ if len(opts) < 2 || len(opts) > 2 {
+ return ctx, fmt.Errorf("expected 2 options, got %v: %v", len(opts), opts)
+ }
+
+ heapProfileSamplingFrequencySeconds, err := strconv.Atoi(opts[0])
+ if err != nil {
+ return nil, err
+ }
+ heapProfileMaxTimeBetweenDumpsSeconds, err := strconv.Atoi(opts[1])
+ if err != nil {
+ return nil, err
+ }
+ samplingFrequencySeconds = heapProfileSamplingFrequencySeconds
+ maxTimeBetweenDumpsSeconds = heapProfileMaxTimeBetweenDumpsSeconds
+
+ return ctx, nil
+ },
+ }
+ }
+ hooks.RegisterHook("beam:go:hook:diagnostics:heapDump", hf)
+}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 4e4fe468ec6..7d4ac51537f 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -26,6 +26,7 @@ import (
"sync/atomic"
"time"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
@@ -33,6 +34,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
@@ -70,6 +72,10 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
return err
}
+ if tempLocation := beam.PipelineOptions.Get("temp_location"); tempLocation != "" && samplingFrequencySeconds > 0 {
+ go diagnostics.SampleForHeapProfile(ctx, samplingFrequencySeconds, maxTimeBetweenDumpsSeconds)
+ }
+
recordHeader()
// Connect to FnAPI control server. Receive and execute work.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/init/init.go b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
index 0c52a007b5c..6dd2f8157c7 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/init/init.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
@@ -31,7 +31,9 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness"
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/syscallx"
)
var (
@@ -110,6 +112,10 @@ func hook() {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
ctx = grpcx.WriteWorkerID(ctx, *id)
+ memLimit := memoryLimit()
+ if err := syscallx.SetProcessMemoryCeiling(memLimit, memLimit); err != nil && err != syscallx.ErrUnsupported {
+ fmt.Println("Error Setting Rlimit ", err)
+ }
if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil {
fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err)
switch ShutdownMode {
@@ -127,3 +133,13 @@ func hook() {
time.Sleep(time.Hour)
}
}
+
+// memoryLimits returns 90% of the physical memory on the machine. If it cannot determine
+// that value, it returns 2GB. This is an imperfect heuristic. It aims to
+// ensure there is enough memory for the process without causing an OOM.
+func memoryLimit() uint64 {
+ if size, err := syscallx.PhysicalMemorySize(); err == nil {
+ return (size * 90) / 100
+ }
+ return 2 << 30
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 9d4a1b02bdc..bd233750495 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -101,6 +101,7 @@ func init() {
//
// New flags that are already put into pipeline options
// should be added to this map.
+// Don't filter temp_location since we need this included in PipelineOptions to correctly upload heap dumps.
var flagFilter = map[string]bool{
"dataflow_endpoint": true,
"staging_location": true,
@@ -117,7 +118,6 @@ var flagFilter = map[string]bool{
"network": true,
"subnetwork": true,
"no_use_public_ips": true,
- "temp_location": true,
"template_location": true,
"worker_machine_type": true,
"min_cpu_platform": true,
diff --git a/sdks/go/pkg/beam/util/diagnostics/diagnostics.go b/sdks/go/pkg/beam/util/diagnostics/diagnostics.go
new file mode 100644
index 00000000000..2748072e73f
--- /dev/null
+++ b/sdks/go/pkg/beam/util/diagnostics/diagnostics.go
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package diagnostics is a beam internal package that contains code for writing and uploading
+// diagnostic info (e.g. heap profiles) about the worker process.
+// This package is not intended for end user use and can change at any time.
+package diagnostics
+
+import (
+ "bufio"
+ "context"
+ "errors"
+ "os"
+ "runtime"
+ "runtime/pprof"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+)
+
+const (
+ hProfLoc = "/tmp/hProf"
+ tempHProfLoc = "/tmp/hProf.tmp"
+)
+
+// SampleForHeapProfile checks every second if it should take a heap profile, and if so
+// it takes one and saves it to hProfLoc. A profile will be taken if either:
+// (1) the amount of memory allocated has increased since the last heap profile was taken or
+// (2) it has been 60 seconds since the last heap profile was taken
+func SampleForHeapProfile(ctx context.Context, samplingFrequencySeconds, maxTimeBetweenDumpsSeconds int) {
+ var maxAllocatedSoFar uint64
+ samplesSkipped := 0
+ for {
+ var m runtime.MemStats
+ runtime.ReadMemStats(&m)
+ if m.Alloc > maxAllocatedSoFar || (samplesSkipped+1)*samplingFrequencySeconds > maxTimeBetweenDumpsSeconds {
+ samplesSkipped = 0
+ maxAllocatedSoFar = m.Alloc
+ err := saveHeapProfile()
+ if err != nil {
+ log.Warnf(ctx, "Failed to save heap profile. This will not affect pipeline execution, but may make it harder to diagnose memory issues: %v", err)
+ }
+ } else {
+ samplesSkipped++
+ }
+ time.Sleep(time.Duration(samplingFrequencySeconds) * time.Second)
+ }
+}
+
+func saveHeapProfile() error {
+ // Write to a .tmp file before moving to final location to ensure
+ // that OOM doesn't disrupt this flow.
+ // Try removing temp file in case we ran into an error previously
+ if err := os.Remove(tempHProfLoc); err != nil && !errors.Is(err, os.ErrNotExist) {
+ return err
+ }
+ fd, err := os.Create(tempHProfLoc)
+ if err != nil {
+ return err
+ }
+ defer fd.Close()
+ buf := bufio.NewWriterSize(fd, 1<<20) // use 1MB buffer
+
+ if err := pprof.WriteHeapProfile(buf); err != nil {
+ return err
+ }
+
+ if err := buf.Flush(); err != nil {
+ return err
+ }
+
+ if err := os.Remove(hProfLoc); err != nil && !errors.Is(err, os.ErrNotExist) {
+ return err
+ }
+
+ return os.Rename(tempHProfLoc, hProfLoc)
+}
+
+// UploadHeapProfile checks if a heap profile is available and, if so, uploads it to dest.
+// It will first check hProfLoc for the heap profile and then it will
+// check tempHProfLoc if no file exists at hProfLoc.
+//
+// To examine, download the file and run: `go tool pprof -http=:8082 path/to/profile`
+func UploadHeapProfile(ctx context.Context, dest string) error {
+ hProf, err := os.Open(hProfLoc)
+ if err != nil {
+ if errors.Is(err, os.ErrNotExist) {
+ hProf, err = os.Open(tempHProfLoc)
+ if err != nil {
+ if errors.Is(err, os.ErrNotExist) {
+ return nil
+ }
+ return err
+ }
+ }
+ }
+ defer hProf.Close()
+ hProfReader := bufio.NewReader(hProf)
+
+ fs, err := filesystem.New(ctx, dest)
+ if err != nil {
+ return err
+ }
+ defer fs.Close()
+ fd, err := fs.OpenWrite(ctx, dest)
+ if err != nil {
+ return err
+ }
+ buf := bufio.NewWriterSize(fd, 1<<20) // use 1MB buffer
+
+ _, err = hProfReader.WriteTo(buf)
+ if err != nil {
+ return err
+ }
+
+ if err := buf.Flush(); err != nil {
+ return err
+ }
+
+ return fd.Close()
+}
diff --git a/sdks/go/pkg/beam/util/harnessopts/heap_dump.go b/sdks/go/pkg/beam/util/harnessopts/heap_dump.go
new file mode 100644
index 00000000000..06536a977d5
--- /dev/null
+++ b/sdks/go/pkg/beam/util/harnessopts/heap_dump.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package harnessopts
+
+import (
+ "fmt"
+ "strconv"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
+)
+
+const (
+ diagnosticsHook = "beam:go:hook:diagnostics:heapDump"
+)
+
+// HeapDumpFrequency sets the sampling frequency for how often the diagnostics service checks if it
+// should take a heap dump and the maximum allowable time between heap dumps.
+// Setting the sampling frequency to <=0 disables the heap dump checks.
+// The default value for the sampling frequency is 1 second, the default max time is 60 seconds.
+func HeapDumpFrequency(samplingFrequencySeconds, maxTimeBetweenDumpsSeconds int) error {
+ if maxTimeBetweenDumpsSeconds < samplingFrequencySeconds {
+ return fmt.Errorf("max time between dumps %v should be greater than or equal to sampling frequence %v", maxTimeBetweenDumpsSeconds, samplingFrequencySeconds)
+ }
+ // The hook itself is defined in beam/core/runtime/harness/init/diagnostics_hook.go
+ return hooks.EnableHook(diagnosticsHook, strconv.Itoa(samplingFrequencySeconds), strconv.Itoa(maxTimeBetweenDumpsSeconds))
+}
diff --git a/sdks/go/pkg/beam/util/harnessopts/heap_dump_test.go b/sdks/go/pkg/beam/util/harnessopts/heap_dump_test.go
new file mode 100644
index 00000000000..ab083e0bc5b
--- /dev/null
+++ b/sdks/go/pkg/beam/util/harnessopts/heap_dump_test.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package harnessopts
+
+import (
+ "testing"
+
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness" // Imports the cache hook
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
+)
+
+func TestHeapDumpFrequency(t *testing.T) {
+ err := HeapDumpFrequency(10, 30)
+ if err != nil {
+ t.Errorf("HeapDumpFrequency failed when it should have succeeded, got %v", err)
+ }
+ ok, opts := hooks.IsEnabled(diagnosticsHook)
+ if !ok {
+ t.Fatalf("HeapDumpFrequency hook is not enabled")
+ }
+ if len(opts) != 2 {
+ t.Errorf("num opts mismatch, got %v, want 2", len(opts))
+ }
+ if opts[0] != "10" {
+ t.Errorf("cache size option mismatch, got %v, want %v", opts[0], 10)
+ }
+ if opts[1] != "30" {
+ t.Errorf("cache size option mismatch, got %v, want %v", opts[1], 30)
+ }
+}
+
+func TestHeapDumpFrequency_Bad(t *testing.T) {
+ err := HeapDumpFrequency(5, 4)
+ if err == nil {
+ t.Errorf("HeapDumpFrequency succeeded when it should have failed")
+ }
+}
diff --git a/sdks/go/pkg/beam/util/syscallx/syscall_default.go b/sdks/go/pkg/beam/util/syscallx/syscall_default.go
index 55756d0dbd4..67d188d8590 100644
--- a/sdks/go/pkg/beam/util/syscallx/syscall_default.go
+++ b/sdks/go/pkg/beam/util/syscallx/syscall_default.go
@@ -27,3 +27,7 @@ func PhysicalMemorySize() (uint64, error) {
func FreeDiskSpace(path string) (uint64, error) {
return 0, ErrUnsupported
}
+
+func SetProcessMemoryCeiling(softCeiling, hardCeiling uint64) error {
+ return ErrUnsupported
+}
diff --git a/sdks/go/pkg/beam/util/syscallx/syscall_linux.go b/sdks/go/pkg/beam/util/syscallx/syscall_linux.go
index 379437ad0a1..f18ee87ec20 100644
--- a/sdks/go/pkg/beam/util/syscallx/syscall_linux.go
+++ b/sdks/go/pkg/beam/util/syscallx/syscall_linux.go
@@ -18,12 +18,12 @@
package syscallx
-import "syscall"
+import "golang.org/x/sys/unix"
// PhysicalMemorySize returns the total physical memory size.
func PhysicalMemorySize() (uint64, error) {
- var info syscall.Sysinfo_t
- if err := syscall.Sysinfo(&info); err != nil {
+ var info unix.Sysinfo_t
+ if err := unix.Sysinfo(&info); err != nil {
return 0, err
}
return info.Totalram, nil
@@ -31,9 +31,24 @@ func PhysicalMemorySize() (uint64, error) {
// FreeDiskSpace returns the free disk space for a given path.
func FreeDiskSpace(path string) (uint64, error) {
- var stat syscall.Statfs_t
- if err := syscall.Statfs(path, &stat); err != nil {
+ var stat unix.Statfs_t
+ if err := unix.Statfs(path, &stat); err != nil {
return 0, err
}
return stat.Bavail * uint64(stat.Bsize), nil
}
+
+func SetProcessMemoryCeiling(softCeiling, hardCeiling uint64) error {
+ var rLimit unix.Rlimit
+
+ err := unix.Getrlimit(unix.RLIMIT_NOFILE, &rLimit)
+
+ if err != nil {
+ return err
+ }
+
+ rLimit.Max = hardCeiling
+ rLimit.Cur = softCeiling
+
+ return unix.Setrlimit(unix.RLIMIT_NOFILE, &rLimit)
+}
diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go
index e81993ae460..1c35247ac32 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -86,6 +86,8 @@ var directFilters = []string{
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
+ // OOMs currently only lead to heap dumps on Dataflow runner
+ "TestOomParDo",
}
var portableFilters = []string{
@@ -104,6 +106,8 @@ var portableFilters = []string{
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
+ // OOMs currently only lead to heap dumps on Dataflow runner
+ "TestOomParDo",
}
var flinkFilters = []string{
@@ -120,6 +124,8 @@ var flinkFilters = []string{
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
+ // OOMs currently only lead to heap dumps on Dataflow runner
+ "TestOomParDo",
}
var samzaFilters = []string{
@@ -141,6 +147,8 @@ var samzaFilters = []string{
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
+ // OOMs currently only lead to heap dumps on Dataflow runner
+ "TestOomParDo",
}
var sparkFilters = []string{
@@ -163,6 +171,8 @@ var sparkFilters = []string{
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
+ // OOMs currently only lead to heap dumps on Dataflow runner
+ "TestOomParDo",
}
var dataflowFilters = []string{
diff --git a/sdks/go/pkg/beam/util/syscallx/syscall_linux.go b/sdks/go/test/integration/primitives/heap_dump.go
similarity index 56%
copy from sdks/go/pkg/beam/util/syscallx/syscall_linux.go
copy to sdks/go/test/integration/primitives/heap_dump.go
index 379437ad0a1..9d71223629b 100644
--- a/sdks/go/pkg/beam/util/syscallx/syscall_linux.go
+++ b/sdks/go/test/integration/primitives/heap_dump.go
@@ -13,27 +13,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-//go:build linux
-// +build linux
+package primitives
-package syscallx
+import (
+ "context"
+ "time"
-import "syscall"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+)
-// PhysicalMemorySize returns the total physical memory size.
-func PhysicalMemorySize() (uint64, error) {
- var info syscall.Sysinfo_t
- if err := syscall.Sysinfo(&info); err != nil {
- return 0, err
+func oomFn(ctx context.Context, elm int, emit func(int)) {
+ size := 1 << 25
+ // Simulate a slow memory leak
+ for {
+ abc := make([]int64, size)
+ log.Debugf(ctx, "abc %v", abc)
+ time.Sleep(5 * time.Second)
+ log.Debugf(ctx, "abc %v", abc)
+ if size > 1<<40 {
+ break
+ }
+
+ size = int(float64(size) * 1.2)
}
- return info.Totalram, nil
+ emit(elm)
}
-// FreeDiskSpace returns the free disk space for a given path.
-func FreeDiskSpace(path string) (uint64, error) {
- var stat syscall.Statfs_t
- if err := syscall.Statfs(path, &stat); err != nil {
- return 0, err
- }
- return stat.Bavail * uint64(stat.Bsize), nil
+// OomParDo tests a DoFn that OOMs.
+func OomParDo() *beam.Pipeline {
+ p, s := beam.NewPipelineWithRoot()
+
+ in := beam.Create(s, 1)
+ beam.ParDo(s, oomFn, in)
+
+ return p
}
diff --git a/sdks/go/test/integration/primitives/heap_dump_test.go b/sdks/go/test/integration/primitives/heap_dump_test.go
new file mode 100644
index 00000000000..27ccf4dbf2b
--- /dev/null
+++ b/sdks/go/test/integration/primitives/heap_dump_test.go
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "strings"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+ _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+ "github.com/apache/beam/sdks/v2/go/test/integration"
+)
+
+func TestOomParDo(t *testing.T) {
+ integration.CheckFilters(t)
+ if flag.Lookup("temp_location") == nil {
+ t.Fatalf("A temp_location must be provided to correctly run TestOomParDo")
+ }
+ tempLocation := flag.Lookup("temp_location").Value.(flag.Getter).Get().(string)
+ if tempLocation == "" {
+ t.Fatalf("A temp_location must be provided to correctly run TestOomParDo")
+ }
+ dumpLocation := fmt.Sprintf("%v/heapProfiles/*", strings.TrimSuffix(tempLocation, "/"))
+ ctx := context.Background()
+
+ fs, err := filesystem.New(ctx, dumpLocation)
+ if err != nil {
+ t.Fatalf("Failed to connect to filesystem: %v", err)
+ }
+ defer fs.Close()
+
+ files, err := fs.List(ctx, dumpLocation)
+ if err != nil {
+ t.Fatalf("Failed to connect to filesystem: %v", err)
+ }
+ startFiles := len(files)
+
+ ptest.Run(OomParDo())
+
+ files, err = fs.List(ctx, dumpLocation)
+ if err != nil {
+ t.Fatalf("Failed to connect to filesystem: %v", err)
+ }
+ endFiles := len(files)
+
+ if startFiles >= endFiles {
+ t.Fatalf("No new heap dumps generated on OOM. There were %v dumps before running and %v after", startFiles, endFiles)
+ }
+}
diff --git a/sdks/go/test/run_validatesrunner_tests.sh b/sdks/go/test/run_validatesrunner_tests.sh
index cdc41e50b8e..579809ea0f4 100755
--- a/sdks/go/test/run_validatesrunner_tests.sh
+++ b/sdks/go/test/run_validatesrunner_tests.sh
@@ -91,6 +91,7 @@ SIMULTANEOUS=3
# Where to store integration test outputs.
GCS_LOCATION=gs://temp-storage-for-end-to-end-tests
+GCS_SUBFOLDER="test$RANDOM"
# Project for the container and integration test
PROJECT=apache-beam-testing
@@ -409,8 +410,8 @@ ARGS="$ARGS --project=$DATAFLOW_PROJECT"
ARGS="$ARGS --region=$REGION"
ARGS="$ARGS --environment_type=DOCKER"
ARGS="$ARGS --environment_config=$CONTAINER:$TAG"
-ARGS="$ARGS --staging_location=$GCS_LOCATION/staging-validatesrunner-test"
-ARGS="$ARGS --temp_location=$GCS_LOCATION/temp-validatesrunner-test"
+ARGS="$ARGS --staging_location=$GCS_LOCATION/staging-validatesrunner-test/$GCS_SUBFOLDER"
+ARGS="$ARGS --temp_location=$GCS_LOCATION/temp-validatesrunner-test/$GCS_SUBFOLDER"
ARGS="$ARGS --dataflow_worker_jar=$DATAFLOW_WORKER_JAR"
ARGS="$ARGS --endpoint=$ENDPOINT"
if [[ -n "$TEST_EXPANSION_ADDR" ]]; then