You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/04/01 23:32:00 UTC

[jira] [Work logged] (BEAM-4374) Update existing metrics in the FN API to use new Metric Schema

     [ https://issues.apache.org/jira/browse/BEAM-4374?focusedWorklogId=414381&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414381 ]

ASF GitHub Bot logged work on BEAM-4374:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Apr/20 23:31
            Start Date: 01/Apr/20 23:31
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on pull request #11231: [BEAM-4374] Shortids for the Go SDK
URL: https://github.com/apache/beam/pull/11231#discussion_r401969044
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##########
 @@ -28,10 +31,180 @@ import (
 	"github.com/golang/protobuf/ptypes"
 )
 
-func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
+type mUrn uint32
+
+// TODO: Pull these from the protos.
+var sUrns = [...]string{
+	"beam:metric:user:sum_int64:v1",
+	"beam:metric:user:sum_double:v1",
+	"beam:metric:user:distribution_int64:v1",
+	"beam:metric:user:distribution_double:v1",
+	"beam:metric:user:latest_int64:v1",
+	"beam:metric:user:latest_double:v1",
+	"beam:metric:user:top_n_int64:v1",
+	"beam:metric:user:top_n_double:v1",
+	"beam:metric:user:bottom_n_int64:v1",
+	"beam:metric:user:bottom_n_double:v1",
+
+	"beam:metric:element_count:v1",
+	"beam:metric:sampled_byte_size:v1",
+
+	"beam:metric:pardo_execution_time:start_bundle_msecs:v1",
+	"beam:metric:pardo_execution_time:process_bundle_msecs:v1",
+	"beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
+	"beam:metric:ptransform_execution_time:total_msecs:v1",
+
+	"beam:metric:ptransform_progress:remaining:v1",
+	"beam:metric:ptransform_progress:completed:v1",
+
+	"TestingSentinelUrn", // Must remain last.
+}
+
+const (
+	urnUserSumInt64 mUrn = iota
+	urnUserSumFloat64
+	urnUserDistInt64
+	urnUserDistFloat64
+	urnUserLatestMsInt64
+	urnUserLatestMsFloat64
+	urnUserTopNInt64
+	urnUserTopNFloat64
+	urnUserBottomNInt64
+	urnUserBottomNFloat64
+
+	urnElementCount
+	urnSampledByteSize
+
+	urnStartBundle
+	urnProcessBundle
+	urnFinishBundle
+	urnTransformTotalTime
+
+	urnProgressRemaining
+	urnProgressCompleted
+
+	urnTestSentinel // Must remain last.
+)
+
+// urnToType maps the urn to it's encoding type.
+// This function is written to be inlinable by the compiler.
+func urnToType(u mUrn) string {
+	switch u {
+	case urnUserSumInt64, urnElementCount, urnStartBundle, urnProcessBundle, urnFinishBundle, urnTransformTotalTime:
+		return "beam:metrics:sum_int64:v1"
+	case urnUserSumFloat64:
+		return "beam:metrics:sum_double:v1"
+	case urnUserDistInt64, urnSampledByteSize:
+		return "beam:metrics:distribution_int64:v1"
+	case urnUserDistFloat64:
+		return "beam:metrics:distribution_double:v1"
+	case urnUserLatestMsInt64:
+		return "beam:metrics:latest_int64:v1"
+	case urnUserLatestMsFloat64:
+		return "beam:metrics:latest_double:v1"
+	case urnUserTopNInt64:
+		return "beam:metrics:top_n_int64:v1"
+	case urnUserTopNFloat64:
+		return "beam:metrics:top_n_double:v1"
+	case urnUserBottomNInt64:
+		return "beam:metrics:bottom_n_int64:v1"
+	case urnUserBottomNFloat64:
+		return "beam:metrics:bottom_n_double:v1"
+
+	case urnProgressRemaining, urnProgressCompleted:
+		return "beam:metrics:progress:v1"
+
+	// Monitoring Table isn't currently in the protos.
+	// case ???:
+	//	return "beam:metrics:monitoring_table:v1"
+
+	case urnTestSentinel:
+		return "TestingSentinelType"
+
+	default:
+		panic("metric urn without specified type" + sUrns[u])
+	}
+}
+
+type shortKey struct {
+	metrics.Labels
+	Urn mUrn // Urns fully specify their type.
+}
+
+// shortIDCache retains lookup caches for short ids to the full monitoring
+// info metadata.
+//
+// TODO: 2020/03/26 - measure mutex overhead vs sync.Map for this case.
+// sync.Map might have lower contention for this read heavy load.
+type shortIDCache struct {
+	mu              sync.Mutex
+	labels2ShortIds map[shortKey]string
+	shortIds2Infos  map[string]*ppb.MonitoringInfo
+
+	lastShortID int64
+}
+
+func newShortIDCache() *shortIDCache {
+	return &shortIDCache{
+		labels2ShortIds: make(map[shortKey]string),
+		shortIds2Infos:  make(map[string]*ppb.MonitoringInfo),
+	}
+}
+
+func (c *shortIDCache) getNextShortID() string {
+	id := atomic.AddInt64(&c.lastShortID, 1)
+	// No reason not to use the smallest string short ids possible.
+	return strconv.FormatInt(id, 36)
+}
+
+// getShortID returns the short id for the given metric, and if
+// it doesn't exist yet, stores the metadata.
+// Assumes c.mu lock is held.
+func (c *shortIDCache) getShortID(l metrics.Labels, urn mUrn) string {
+	k := shortKey{l, urn}
+	s, ok := c.labels2ShortIds[k]
+	if ok {
+		return s
+	}
+	s = c.getNextShortID()
+	c.labels2ShortIds[k] = s
+	c.shortIds2Infos[s] = &ppb.MonitoringInfo{
+		Urn:    sUrns[urn],
+		Type:   urnToType(urn),
+		Labels: userLabels(l),
+	}
+	return s
+}
+
+func (c *shortIDCache) shortIdsToInfos(shortids []string) map[string]*ppb.MonitoringInfo {
+	c.mu.Lock()
+	defer c.mu.Unlock()
 
 Review comment:
   Is the locking correct here? I'd think you'd want to acquire the lock in `getShortId` since that's where the race condition could occur. This function is only reading `shortIds2Infos` which should only gain more entries over time and entries are never modified, so I'd think it would be thread-safe even without acquiring the lock. Am I missing something?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 414381)
    Time Spent: 35h 40m  (was: 35.5h)

> Update existing metrics in the FN API to use new Metric Schema
> --------------------------------------------------------------
>
>                 Key: BEAM-4374
>                 URL: https://issues.apache.org/jira/browse/BEAM-4374
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Alex Amato
>            Priority: Major
>          Time Spent: 35h 40m
>  Remaining Estimate: 0h
>
> Update existing metrics to use the new proto and cataloging schema defined in:
> [_https://s.apache.org/beam-fn-api-metrics_]
>  * Check in new protos
>  * Define catalog file for metrics
>  * Port existing metrics to use this new format, based on catalog names+metadata



--
This message was sent by Atlassian Jira
(v8.3.4#803005)