You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/09 21:33:42 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #21776: [Go SDK] Add more info to Worker Status API

lostluck commented on code in PR #21776:
URL: https://github.com/apache/beam/pull/21776#discussion_r893952091


##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -65,20 +70,50 @@ func (w *workerStatusHandler) start(ctx context.Context) error {
 	return nil
 }
 
+func memoryUsage() string {
+	m := runtime.MemStats{}
+	runtime.ReadMemStats(&m)
+	return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, m.Frees, m.HeapAlloc)
+}
+
+func (w *workerStatusHandler) activeProcessBundleStates() string {
+	var states string
+	for bundleID, store := range w.metStore {
+		execStates := ""
+		for bundleID, state := range store.StateRegistry() {
+			execStates += fmt.Sprintf("ID: %v Execution States: %#v,", bundleID, *state)
+
+		}
+		states += fmt.Sprintf("\nBundle ID: %v\nBundle State: %#v\nBundle Execution States: %v\n", bundleID, *store.BundleState(), execStates)
+	}
+	return states
+}
+
+func (w *workerStatusHandler) cacheStats() string {
+	return fmt.Sprintf("Cache:\n%v", w.cache.CacheMetrics())

Review Comment:
   ```suggestion
   	return fmt.Sprintf("State Cache:\n%+v", w.cache.CacheMetrics())
   ```
   
   It should be clear what cache this is taking about, and {0 0 0 0 0} isn't useful to anyway, so we use `%+v` to have the fields printed out as well, which makes the numbers useful.
   
   https://go.dev/play/p/q9db2d3a5Lv



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -65,20 +70,50 @@ func (w *workerStatusHandler) start(ctx context.Context) error {
 	return nil
 }
 
+func memoryUsage() string {
+	m := runtime.MemStats{}
+	runtime.ReadMemStats(&m)
+	return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, m.Frees, m.HeapAlloc)
+}
+
+func (w *workerStatusHandler) activeProcessBundleStates() string {
+	var states string
+	for bundleID, store := range w.metStore {
+		execStates := ""
+		for bundleID, state := range store.StateRegistry() {
+			execStates += fmt.Sprintf("ID: %v Execution States: %#v,", bundleID, *state)
+
+		}
+		states += fmt.Sprintf("\nBundle ID: %v\nBundle State: %#v\nBundle Execution States: %v\n", bundleID, *store.BundleState(), execStates)
+	}
+	return states
+}
+
+func (w *workerStatusHandler) cacheStats() string {
+	return fmt.Sprintf("Cache:\n%v", w.cache.CacheMetrics())
+}
+
+func goroutineDump() string {
+	buf := make([]byte, 1<<16)
+	runtime.Stack(buf, true)

Review Comment:
   Instead of the raw runtime version, use the pprof version:
   
   `import "runtime/pprof"`
   ```
   profile := pprof.Lookup("goroutine")
   if profile != nil {
     // Use debug=1 to get the human readable consolidated goroutine output.
     profile.Write(b, 1)  
   }
   ```
   
   This makes the output easier to read as some repeated goroutines will instead be consolidated, and the duplication indicated with a count.
   
   (Note that a `*strings.Builder` implements `io.Writer` and can be passed right in. (the method is on the pointer)



##########
sdks/go/pkg/beam/core/runtime/harness/harness.go:
##########
@@ -37,26 +38,15 @@ import (
 	"google.golang.org/protobuf/types/known/durationpb"
 )
 
-// StatusAddress is a type of status endpoint address as an optional argument to harness.Main().
-type StatusAddress string
-
 // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin).
 
 // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
 // "pipeline-construction time" -- on each worker. It is a FnAPI client and
 // ultimately responsible for correctly executing user code.

Review Comment:
   Please add documentation around "expected" Environment variables. TBH, I don't mind the options approach to pass into main, but the  fetching form the Env vars would then happen in the init.go.



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -65,20 +70,50 @@ func (w *workerStatusHandler) start(ctx context.Context) error {
 	return nil
 }
 
+func memoryUsage() string {
+	m := runtime.MemStats{}
+	runtime.ReadMemStats(&m)
+	return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, m.Frees, m.HeapAlloc)
+}
+
+func (w *workerStatusHandler) activeProcessBundleStates() string {
+	var states string
+	for bundleID, store := range w.metStore {
+		execStates := ""
+		for bundleID, state := range store.StateRegistry() {
+			execStates += fmt.Sprintf("ID: %v Execution States: %#v,", bundleID, *state)
+
+		}
+		states += fmt.Sprintf("\nBundle ID: %v\nBundle State: %#v\nBundle Execution States: %v\n", bundleID, *store.BundleState(), execStates)
+	}
+	return states
+}
+
+func (w *workerStatusHandler) cacheStats() string {
+	return fmt.Sprintf("Cache:\n%v", w.cache.CacheMetrics())
+}
+
+func goroutineDump() string {
+	buf := make([]byte, 1<<16)
+	runtime.Stack(buf, true)
+	return string(buf)
+}
+
 // reader reads the WorkerStatusRequest from the stream and sends a processed WorkerStatusResponse to
 // a response channel.
 func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
 	defer w.wg.Done()
-	buf := make([]byte, 1<<16)
+
 	for w.isAlive() {
 		req, err := stub.Recv()
 		if err != nil && err != io.EOF {
 			log.Debugf(ctx, "exiting workerStatusHandler.Reader(): %v", err)
 			return
 		}
 		log.Debugf(ctx, "RECV-status: %v", req.GetId())
-		runtime.Stack(buf, true)
-		response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: string(buf)}
+
+		statusInfo := fmt.Sprintf("\n============Memory Usage============\n%s\n============Active Process Bundle States============\n%s\n============Cache Stats============\n%s\n============Goroutine Dump============\n%s\n", memoryUsage(), w.activeProcessBundleStates(), w.cacheStats(), goroutineDump())

Review Comment:
   Use a [strings.Builder](https://pkg.go.dev/strings#Builder) instead of trying to get everything into a single print output. It will be easier to read. I'd recommend simply passing the builder to each of the helper methods, and have each of the helper methods contribute their well formed output. Then these are still easy to unit test.



##########
sdks/go/test/integration/wordcount/wordcount.go:
##########
@@ -18,15 +18,13 @@ package wordcount
 
 import (
 	"context"
-	"regexp"

Review Comment:
   We can remove this change. At best, we mostly want to remove the blank line between "strings" and "fmt".



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -65,20 +70,50 @@ func (w *workerStatusHandler) start(ctx context.Context) error {
 	return nil
 }
 
+func memoryUsage() string {
+	m := runtime.MemStats{}
+	runtime.ReadMemStats(&m)
+	return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, m.Frees, m.HeapAlloc)

Review Comment:
   Aside from the previous string builder, we get *so much* information in MemStats, we should at least try to match what Java's putting out.
   
   https://pkg.go.dev/runtime#MemStats  -> Everything we get.
   Java's print out: https://github.com/apache/beam/blob/ffef8de04a93435e69faf3bf65efe11852cbd8dc/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/MemoryMonitor.java#L622 
   
   We can do the same level of readability improvement that Java is doing, though I don't think we can do "thrashing". We can show the GC target size, and `GCCPUFraction` gives us how much CPU time has been spent in GC in a way we can use as the percentage. Not sure about "thrashing" and "pushback", but this gets us close to the way there.
   
   Java's for reference:
   https://github.com/apache/beam/blob/ffef8de04a93435e69faf3bf65efe11852cbd8dc/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/MemoryMonitor.java#L411
   
   `NextGC` is useful since it sets the target of the next GC.
   
   `Mallocs` & `Frees` aren't super useful outside of a full heap dump (we care about memory, not counts). `HeapInuse` is more useful, as is `StackInuse` and `StackSys` (We'll need both for a complete picture, as Go tries to do as much as possible on the stack, instead of the heap, unlike Java which is All Heap).



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -65,20 +70,50 @@ func (w *workerStatusHandler) start(ctx context.Context) error {
 	return nil
 }
 
+func memoryUsage() string {
+	m := runtime.MemStats{}
+	runtime.ReadMemStats(&m)
+	return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, m.Frees, m.HeapAlloc)
+}
+
+func (w *workerStatusHandler) activeProcessBundleStates() string {
+	var states string
+	for bundleID, store := range w.metStore {
+		execStates := ""
+		for bundleID, state := range store.StateRegistry() {
+			execStates += fmt.Sprintf("ID: %v Execution States: %#v,", bundleID, *state)
+
+		}
+		states += fmt.Sprintf("\nBundle ID: %v\nBundle State: %#v\nBundle Execution States: %v\n", bundleID, *store.BundleState(), execStates)

Review Comment:
   Aside from making it use a strings.Builder, the only concern is to validate that we are being thread safe doing. We likely can improve the printout to be much clearer though.



##########
sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go:
##########
@@ -272,3 +272,8 @@ func (c *SideInputCache) evictElement(ctx context.Context) {
 		}
 	}
 }
+
+// CacheMetrics returns the cache metrics for current side input cache.
+func (c *SideInputCache) CacheMetrics() CacheMetrics {
+	return c.metrics

Review Comment:
   This method needs to be under the lock's critical section.
   
   Add
   
   ```
   	c.mu.Lock()
   	defer c.mu.Unlock()
   ```
   
   The return will provide a copy, but the copy could happen then things are mutating, causing a race condition error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org