You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/13 13:48:00 UTC
[GitHub] [beam] jrmccluskey opened a new pull request #15717: [BEAM-11097] Update cache with metrics counting, windowing
jrmccluskey opened a new pull request #15717:
URL: https://github.com/apache/beam/pull/15717
Adds metrics counting for cache performance, windowing considerations for cached items, materializing side inputs to store in memory. Does **not** enable caching.
------------------------
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
`ValidatesRunner` compliance status (on master branch)
--------------------------------------------------------
<table>
<thead>
<tr>
<th>Lang</th>
<th>ULR</th>
<th>Dataflow</th>
<th>Flink</th>
<th>Samza</th>
<th>Spark</th>
<th>Twister2</th>
</tr>
</thead>
<tbody>
<tr>
<td>Go</td>
<td>---</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
</a>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
</a>
</td>
<td>---</td>
</tr>
<tr>
<td>Java</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
</a><br>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
</a>
</td>
</tr>
<tr>
<td>Python</td>
<td>---</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
</a>
</td>
<td>---</td>
</tr>
<tr>
<td>XLang</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
</a>
</td>
<td>---</td>
</tr>
</tbody>
</table>
Examples testing status on various runners
--------------------------------------------------------
<table>
<thead>
<tr>
<th>Lang</th>
<th>ULR</th>
<th>Dataflow</th>
<th>Flink</th>
<th>Samza</th>
<th>Spark</th>
<th>Twister2</th>
</tr>
</thead>
<tbody>
<tr>
<td>Go</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
</tr>
<tr>
<td>Java</td>
<td>---</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
</a><br>
</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
</tr>
<tr>
<td>Python</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
</tr>
<tr>
<td>XLang</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
<td>---</td>
</tr>
</tbody>
</table>
Post-Commit SDK/Transform Integration Tests Status (on master branch)
------------------------------------------------------------------------------------------------
<table>
<thead>
<tr>
<th>Go</th>
<th>Java</th>
<th>Python</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
</a>
</td>
</tr>
</tbody>
</table>
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
<table>
<thead>
<tr>
<th>---</th>
<th>Java</th>
<th>Python</th>
<th>Go</th>
<th>Website</th>
<th>Whitespace</th>
<th>Typescript</th>
</tr>
</thead>
<tbody>
<tr>
<td>Non-portable</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
</a><br>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
</a><br>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
</a>
</td>
</tr>
<tr>
<td>Portable</td>
<td>---</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
</a>
</td>
<td>
<a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
<img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
</a>
</td>
<td>---</td>
<td>---</td>
<td>---</td>
</tr>
</tbody>
</table>
See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #15717: [BEAM-11097] Update cache with metrics counting, windowing
Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15717:
URL: https://github.com/apache/beam/pull/15717#discussion_r728304133
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -145,19 +150,24 @@ func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (
return tok, c.isValid(tok)
}
+func (c *SideInputCache) makeCacheKey(tok token, w, key []byte) cacheKey {
+ return cacheKey{tok: tok, win: string(w), key: string(key)}
+}
+
// QueryCache takes a transform ID and side input ID and checking if a corresponding side
// input has been cached. A query having a bad token (e.g. one that doesn't make a known
// token or one that makes a known but currently invalid token) is treated the same as a
// cache miss.
-func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.ReStream {
+func (c *SideInputCache) QueryCache(ctx context.Context, transformID, sideInputID string, win, key []byte) exec.ReStream {
c.mu.Lock()
defer c.mu.Unlock()
tok, ok := c.makeAndValidateToken(transformID, sideInputID)
if !ok {
return nil
}
+ keyString := c.makeCacheKey(tok, win, key)
Review comment:
nit: No longer a string, and the scope is 3 lines, so just call it `ck`. Ths is a Go Idiom, the shorter the distance a name needs to travel, the shorter it can be. The shortness indicates that it's definition is likely not far away or it's fairly standard (like using r and w for io.Reader and io.Writer values).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #15717: [BEAM-11097] Update cache with metrics counting, windowing
Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15717:
URL: https://github.com/apache/beam/pull/15717#discussion_r728225713
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -44,18 +52,19 @@ type token string
type SideInputCache struct {
capacity int
mu sync.Mutex
- cache map[token]exec.ReStream
+ cache map[cacheKey]exec.ReStream
idsToTokens map[string]token
validTokens map[token]int8 // Maps tokens to active bundle counts
metrics CacheMetrics
}
// CacheMetrics stores metrics for the cache across a pipeline run.
type CacheMetrics struct {
- Hits int64
- Misses int64
- Evictions int64
- InUseEvictions int64
+ Hits *metrics.Counter
+ Misses *metrics.Counter
+ Evictions *metrics.Counter
+ InUseEvictions *metrics.Counter
+ ReStreamErrors *metrics.Counter
Review comment:
style nit: Go permits singlelining field declarations when the types are the same, for scenarios exactly like this.
`Hits, Misses, Evictions, InUseEvictions, ReStreamErrors int64`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] jrmccluskey commented on a change in pull request #15717: [BEAM-11097] Update cache with metrics counting, windowing
Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15717:
URL: https://github.com/apache/beam/pull/15717#discussion_r728307026
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -145,19 +150,24 @@ func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (
return tok, c.isValid(tok)
}
+func (c *SideInputCache) makeCacheKey(tok token, w, key []byte) cacheKey {
+ return cacheKey{tok: tok, win: string(w), key: string(key)}
+}
+
// QueryCache takes a transform ID and side input ID and checking if a corresponding side
// input has been cached. A query having a bad token (e.g. one that doesn't make a known
// token or one that makes a known but currently invalid token) is treated the same as a
// cache miss.
-func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.ReStream {
+func (c *SideInputCache) QueryCache(ctx context.Context, transformID, sideInputID string, win, key []byte) exec.ReStream {
c.mu.Lock()
defer c.mu.Unlock()
tok, ok := c.makeAndValidateToken(transformID, sideInputID)
if !ok {
return nil
}
+ keyString := c.makeCacheKey(tok, win, key)
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] jrmccluskey commented on a change in pull request #15717: [BEAM-11097] Update cache with metrics counting, windowing
Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15717:
URL: https://github.com/apache/beam/pull/15717#discussion_r728298792
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -44,18 +52,19 @@ type token string
type SideInputCache struct {
capacity int
mu sync.Mutex
- cache map[token]exec.ReStream
+ cache map[cacheKey]exec.ReStream
idsToTokens map[string]token
validTokens map[token]int8 // Maps tokens to active bundle counts
metrics CacheMetrics
}
// CacheMetrics stores metrics for the cache across a pipeline run.
type CacheMetrics struct {
- Hits int64
- Misses int64
- Evictions int64
- InUseEvictions int64
+ Hits *metrics.Counter
+ Misses *metrics.Counter
+ Evictions *metrics.Counter
+ InUseEvictions *metrics.Counter
+ ReStreamErrors *metrics.Counter
Review comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] jrmccluskey commented on a change in pull request #15717: [BEAM-11097] Update cache with metrics counting, windowing
Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15717:
URL: https://github.com/apache/beam/pull/15717#discussion_r728298792
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -44,18 +52,19 @@ type token string
type SideInputCache struct {
capacity int
mu sync.Mutex
- cache map[token]exec.ReStream
+ cache map[cacheKey]exec.ReStream
idsToTokens map[string]token
validTokens map[token]int8 // Maps tokens to active bundle counts
metrics CacheMetrics
}
// CacheMetrics stores metrics for the cache across a pipeline run.
type CacheMetrics struct {
- Hits int64
- Misses int64
- Evictions int64
- InUseEvictions int64
+ Hits *metrics.Counter
+ Misses *metrics.Counter
+ Evictions *metrics.Counter
+ InUseEvictions *metrics.Counter
+ ReStreamErrors *metrics.Counter
Review comment:
Fixed
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -145,19 +150,24 @@ func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (
return tok, c.isValid(tok)
}
+func (c *SideInputCache) makeCacheKey(tok token, w, key []byte) cacheKey {
+ return cacheKey{tok: tok, win: string(w), key: string(key)}
+}
+
// QueryCache takes a transform ID and side input ID and checking if a corresponding side
// input has been cached. A query having a bad token (e.g. one that doesn't make a known
// token or one that makes a known but currently invalid token) is treated the same as a
// cache miss.
-func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.ReStream {
+func (c *SideInputCache) QueryCache(ctx context.Context, transformID, sideInputID string, win, key []byte) exec.ReStream {
c.mu.Lock()
defer c.mu.Unlock()
tok, ok := c.makeAndValidateToken(transformID, sideInputID)
if !ok {
return nil
}
+ keyString := c.makeCacheKey(tok, win, key)
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #15717: [BEAM-11097] Update cache with metrics counting, windowing
Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15717:
URL: https://github.com/apache/beam/pull/15717#discussion_r728225713
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -44,18 +52,19 @@ type token string
type SideInputCache struct {
capacity int
mu sync.Mutex
- cache map[token]exec.ReStream
+ cache map[cacheKey]exec.ReStream
idsToTokens map[string]token
validTokens map[token]int8 // Maps tokens to active bundle counts
metrics CacheMetrics
}
// CacheMetrics stores metrics for the cache across a pipeline run.
type CacheMetrics struct {
- Hits int64
- Misses int64
- Evictions int64
- InUseEvictions int64
+ Hits *metrics.Counter
+ Misses *metrics.Counter
+ Evictions *metrics.Counter
+ InUseEvictions *metrics.Counter
+ ReStreamErrors *metrics.Counter
Review comment:
style nit: Go permits singlelining field declarations when the types are the same, for scenarios exactly like this.
`Hits, Misses, Evictions, InUseEvictions, ReStreamErrors int64`
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -145,19 +150,24 @@ func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (
return tok, c.isValid(tok)
}
+func (c *SideInputCache) makeCacheKey(tok token, w, key []byte) cacheKey {
+ return cacheKey{tok: tok, win: string(w), key: string(key)}
+}
+
// QueryCache takes a transform ID and side input ID and checking if a corresponding side
// input has been cached. A query having a bad token (e.g. one that doesn't make a known
// token or one that makes a known but currently invalid token) is treated the same as a
// cache miss.
-func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.ReStream {
+func (c *SideInputCache) QueryCache(ctx context.Context, transformID, sideInputID string, win, key []byte) exec.ReStream {
c.mu.Lock()
defer c.mu.Unlock()
tok, ok := c.makeAndValidateToken(transformID, sideInputID)
if !ok {
return nil
}
+ keyString := c.makeCacheKey(tok, win, key)
Review comment:
nit: No longer a string, and the scope is 3 lines, so just call it `ck`. Ths is a Go Idiom, the shorter the distance a name needs to travel, the shorter it can be. The shortness indicates that it's definition is likely not far away or it's fairly standard (like using r and w for io.Reader and io.Writer values).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] jrmccluskey commented on pull request #15717: [BEAM-11097] Update cache with metrics counting, windowing
Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15717:
URL: https://github.com/apache/beam/pull/15717#issuecomment-942408788
R: @lostluck
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lostluck merged pull request #15717: [BEAM-11097] Update cache with metrics counting, windowing
Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #15717:
URL: https://github.com/apache/beam/pull/15717
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lostluck merged pull request #15717: [BEAM-11097] Update cache with metrics counting, windowing
Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #15717:
URL: https://github.com/apache/beam/pull/15717
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org