You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/12 00:09:18 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #22225: Collect heap dump on OOM on Dataflow

lostluck commented on code in PR #22225:
URL: https://github.com/apache/beam/pull/22225#discussion_r918419721


##########
sdks/go/pkg/beam/util/diagnostics/diagnostics.go:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package diagnostics
+
+import (
+	"bufio"
+	"context"
+	"os"
+
+	"runtime/debug"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Review Comment:
   I think this approach is OK if we file a P3 issue noting that it's currently cloud vendor specific.
   
   ---
   
   This is the bit where the container becomes "Assumes Google Cloud" instead of being more flexible, and requires updating/maintenance and a new version (or forces custom containers) if we wanted to support alternative blob stores. It's not very open as a result.
   
   This is where we'd want to *not* have the `gcs` import, and be able to rely on whatever the user program has imported/enabled, so they can write to ec2 or a hdfs, or a network filesystem mount or something.
   
   On the plus side, we could make this dependency explicit in the boot container if we move this _ import into the boot.go instead. This code could also be re-used in any other approach with "restarting" the binary to do the upload.



##########
sdks/go/pkg/beam/util/diagnostics/diagnostics.go:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package diagnostics
+
+import (
+	"bufio"
+	"context"
+	"os"
+
+	"runtime/debug"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
+)
+
+func UploadHeapDump(ctx context.Context, dest string, additionalDataToWrite string) error {
+	heapDumpLoc := "heapDump"
+
+	f, err := os.Create(heapDumpLoc)
+	if err != nil {
+		return err
+	}
+	debug.WriteHeapDump(f.Fd())
+
+	heapDump, err := os.Open(heapDumpLoc)
+	if err != nil {
+		return err
+	}
+	defer heapDump.Close()
+	heapDumpReader := bufio.NewReader(heapDump)
+
+	fs, err := filesystem.New(ctx, dest)
+	if err != nil {
+		return err
+	}
+	defer fs.Close()
+	fd, err := fs.OpenWrite(ctx, dest)
+	if err != nil {
+		return err
+	}
+	buf := bufio.NewWriterSize(fd, 1<<20) // use 1MB buffer
+
+	buf.WriteString(additionalDataToWrite)

Review Comment:
   Why are we adding additional information to the heap dump file? Wouldn't this corrupt whatever file format the heap is in, preventing using the tools to analyse it?



##########
sdks/go/container/boot.go:
##########
@@ -115,7 +119,19 @@ func main() {
 		os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " "))
 	}
 
-	log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+	err = execx.Execute(prog, args...)
+
+	if err != nil {
+		var opt runtime.RawOptionsWrapper
+		err = json.Unmarshal([]byte(options), &opt)

Review Comment:
   As written the error from execx.Execute is never read and this error will overwrite it. Suggest changing it to `err := json...` so the original err is shadowed instead and will be available for printing in the final log.Fatal.



##########
sdks/go/pkg/beam/util/diagnostics/diagnostics.go:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package diagnostics
+
+import (
+	"bufio"
+	"context"
+	"os"
+
+	"runtime/debug"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
+)
+
+func UploadHeapDump(ctx context.Context, dest string, additionalDataToWrite string) error {

Review Comment:
   Always have documentation for Exported functions. In this case: at least point to instructions on how to read the file/make use of it.



##########
sdks/go/container/boot.go:
##########
@@ -115,7 +119,19 @@ func main() {
 		os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " "))
 	}
 
-	log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+	err = execx.Execute(prog, args...)
+
+	if err != nil {
+		var opt runtime.RawOptionsWrapper
+		err = json.Unmarshal([]byte(options), &opt)
+		if err == nil {
+			if tempLocation, ok := opt.Options.Options["temp_location"]; ok {
+				diagnostics.UploadHeapDump(ctx, fmt.Sprintf("%v/heapDumps/dump-%v-%d", strings.TrimSuffix(tempLocation, "/"), *id, time.Now().Unix()), fmt.Sprintf("Options %v", opt))

Review Comment:
   Please add a `\n` after the additional data so it's easier to extract a valid heap dump from the resulting file. As it stands I can't take the sample file, and get pprof to render the heap in question.



##########
sdks/go/pkg/beam/util/diagnostics/diagnostics.go:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package diagnostics
+
+import (
+	"bufio"
+	"context"
+	"os"
+
+	"runtime/debug"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
+)
+
+func UploadHeapDump(ctx context.Context, dest string, additionalDataToWrite string) error {
+	heapDumpLoc := "heapDump"
+
+	f, err := os.Create(heapDumpLoc)
+	if err != nil {
+		return err
+	}
+	debug.WriteHeapDump(f.Fd())

Review Comment:
   I'm not sure if this is getting what we want now that I'm looking into this.
   
   1. It's not clear to me that we're actually getting the heap from the spawned process. So what are we actually getting? The example heap dump is only ~8MB, which feels closer to what the launcher process is probably using, rather than the worker on death...
   2. AFAICT we might not actually want a debug.WriteHeapDump dump generally anyway. It's the whole heap. Every Byte. Attempting to actually read the linked example dump has been... difficult. https://pkg.go.dev/runtime/pprof#WriteHeapProfile is likely easier for most folks to deal with and will indicate where the leaks are coming from if any. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org