You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/03 19:15:35 UTC

[GitHub] [beam] lukecwik opened a new pull request #16130: [BEAM-13015] Start integrating a process wide cache.

lukecwik opened a new pull request #16130:
URL: https://github.com/apache/beam/pull/16130


   This initial implementation limits the process wide cache to 100mb using JAMM to measure object sizes and stores the bundle descriptors within it.
   
   It also provides a bunch of utility operations on caches to support sub-cache views over cache instances allowing one to locally scope the cache.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ben-manes commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ben-manes commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r765611431



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       Guava does allow recursive calls, but it is not a supported as a feature. It will detect a simple deadlock/livelock when the caller loads for a key it previously started computing (A->B->A). An out-of-order concurrent deadlock could occur in any recursive case, though. All three libraries handle this because they compute entries outside of the hash table lock.
   
   Unless this is an advertised feature then this assumption is dangerous. No library has: Guava discourages it, Caffeine warns against it and describes an explicit approach, and cache2k doesn’t seem to be aware of this possibility (not documented or tested for).
   
   The more I think about this, the more I think you need much more rigorous testing and assurance to rely on this working in production facing code. I don’t think this should be left as is and maybe reverted until better understood.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #16130:
URL: https://github.com/apache/beam/pull/16130#issuecomment-989131837


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16130:
URL: https://github.com/apache/beam/pull/16130#issuecomment-989154101


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16130:
URL: https://github.com/apache/beam/pull/16130#issuecomment-985765542


   R: @ibzib 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ben-manes commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ben-manes commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r765411720



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       fwiw, Caffeine handles this through `AsyncCache` where you do not need a separate thread to execute on. See FAQ's [example](https://github.com/ben-manes/caffeine/wiki/Faq#recursive-computations), which is similar to what Apache Solr does in their usage. It is a simple workaround for these rare cases and makes it more explicit about what is happening, as often recursive computes are surprising and can be error prone. I am not sure if one can expect per-key linearizability with recursive writes, so the plus/minus of this behavior depends on the use-case.
   
   One very minor topic to be aware of is that the `Cache#computeIfAbsent` used here does **not** follow the specification for Map's. In addition, until recently the Map's implementation was broken and caused the cache to be in an invalid state. See this [bug report](https://github.com/cache2k/cache2k/issues/174) where I found these issues for the simple single threaded cases. Since these types of mistakes are sadly common and only ironed by usage, you might want to spend a little extra time validating the behavior and avoiding regressions. Caffeine has had its share of bugs and it takes usage to wrangle them out, so early adopters should be a little more proactive to help the library authors catch their oversights.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r765104118



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Caches}. */
+@RunWith(JUnit4.class)
+public class CachesTest {
+  @Test
+  public void testNoopCache() {
+    Cache<String, String> cache = Caches.noop();
+    cache.put("key", "value");
+    assertNull(cache.peek("key"));
+    assertEquals("value", cache.computeIfAbsent("key", (unused) -> "value"));
+    assertNull(cache.peek("key"));
+  }
+
+  @Test
+  public void testEternalCache() {
+    testCache(Caches.eternal());
+  }
+
+  @Test
+  public void testDefaultCache() {
+    testCache(Caches.fromOptions(PipelineOptionsFactory.create()));
+  }
+
+  @Test
+  public void testSubCache() {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16130:
URL: https://github.com/apache/beam/pull/16130#issuecomment-989131837


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r766161342



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       Yes, `computeIfAbsent` creating a loop is definitely an issue that could happen if this API was misused. I have needed a chain for the use case that I'm building like X -> Y -> Z and is guaranteed to not create a loop otherwise it will cause an infinite loop outside of the cache as well since I'm effectively caching pages and continuation tokens. This is likely to change though as a different implementation won't have the need for the recursive `computeIfAbsent`.
   
   I'll update the docs in the future (and tests) whether this is needed or not depending on the different implementation.
   
   On a different note, I'm trying to ensure that the cache reduces the number of OOMs and maximizes the amount of cache hits by having one cache that is used and a bunch of sub caches that provide views over them for different key spaces. Our previous implementation used distinct cache objects which required one to partition memory and give each cache a limited amount of memory. There didn't seem to be a good way to have the caches work together so that if one was unused the other could have more memory. Note that all of these caches used only weight based eviction.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #16130:
URL: https://github.com/apache/beam/pull/16130


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ben-manes commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ben-manes commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r765611431



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       Guava does allow recursive calls, but it is not a supported as a feature. It will detect a simple deadlock/livelock when the caller loads for a key it previously started computing (A->B->A). An out-of-order concurrent deadlock could occur in any recursive case, though. All three libraries handle this because they compute entries outside of the hash table lock.
   
   Unless this is an advertised feature then this assumption is dangerous. No library has: Guava discourages it, Caffeine warns against it and describes an explicit approach, and cache2k doesn’t seem to be aware of this possibility (not documented or tested for).
   
   The more I think about this, the more I think you need much more rigorous testing and assurance to rely on this working in production facing code. I don’t think this should be left as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r764277870



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {

Review comment:
       Why do we need this?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .entryCapacity(Long.MAX_VALUE)
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  private static Cache<Object, Object> forCache(org.cache2k.Cache<Object, Object> cache) {
+    return new SubCacheable<Object, Object>() {
+      @Override
+      public Object peek(Object key) {
+        return cache.get(key);
+      }
+
+      @Override
+      public Object computeIfAbsent(Object key, Function<Object, Object> loadingFunction) {
+        return cache.computeIfAbsent(key, loadingFunction);
+      }
+
+      @Override
+      public void put(Object key, Object value) {
+        cache.put(key, value);
+      }
+
+      @Override
+      public void clear() {
+        cache.clear();
+      }
+
+      @Override
+      public void remove(Object key) {
+        cache.remove(key);
+      }
+
+      @Override
+      public Set<Object> keys() {
+        return cache.keys();
+      }
+    };
+  }
+
+  /**
+   * Additional operations necessary for a top level cache to support sub-caching.
+   *
+   * <p>Visibility is restricted to prevent usage of these methods outside of this class.
+   */
+  private interface SubCacheable<K, V> extends Cache<K, V> {
+    Set<K> keys();
+  }
+
+  /**
+   * A view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  private static class SubCache<K, V> implements Cache<K, V> {
+    private final SubCacheable<CompositeKey, Object> cache;
+    private final CompositeKey keyPrefix;
+
+    SubCache(SubCacheable<?, ?> cache, CompositeKey keyPrefix) {
+      this.cache = (SubCacheable<CompositeKey, Object>) cache;
+      this.keyPrefix = keyPrefix;
+    }
+
+    @Override
+    public V peek(K key) {
+      return (V) cache.peek(keyPrefix.subKey(key));
+    }
+
+    @Override
+    public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+      return (V)
+          cache.computeIfAbsent(
+              keyPrefix.subKey(key),
+              new Function<CompositeKey, Object>() {
+
+                @Override
+                public Object apply(CompositeKey o) {
+                  return loadingFunction.apply((K) o.keyParts[o.keyParts.length - 1]);
+                }
+              });
+    }
+
+    @Override
+    public void put(K key, V value) {
+      cache.put(keyPrefix.subKey(key), value);
+    }
+
+    @Override
+    public void clear() {
+      for (Object key : Sets.filter(cache.keys(), (Object o) -> keyPrefix.isProperPrefixOf(o))) {
+        cache.remove((CompositeKey) key);
+      }
+    }
+
+    @Override
+    public void remove(K key) {
+      cache.remove(keyPrefix.subKey(key));
+    }
+  }
+
+  /** A tuple of key parts used to represent a key within a cache. */
+  @VisibleForTesting
+  static class CompositeKey {
+    public static final CompositeKey ROOT = new CompositeKey(new Object[0]);
+    Object[] keyParts;
+
+    private CompositeKey(Object[] keyParts) {
+      this.keyParts = keyParts;
+    }
+
+    CompositeKey subKey(Object suffix, Object... additionalSuffixes) {
+      Object[] subKey = new Object[keyParts.length + 1 + additionalSuffixes.length];
+      System.arraycopy(keyParts, 0, subKey, 0, keyParts.length);
+      subKey[keyParts.length] = suffix;
+      System.arraycopy(
+          additionalSuffixes, 0, subKey, keyParts.length + 1, additionalSuffixes.length);
+      return new CompositeKey(subKey);
+    }
+
+    boolean isProperPrefixOf(Object possiblySuffix) {

Review comment:
       Nit: I don't think "suffix" is the right word for the argument here (if A is a prefix of B, then `B = A + suffix`) but I'm unsure of what the right word is. See https://english.stackexchange.com/questions/421424/what-is-the-opposite-of-a-prefix

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       I assume this issue been known to happen with the Guava cache implementation?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");

Review comment:
       Let's include the name of the unsupported type in the error message.

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Caches}. */
+@RunWith(JUnit4.class)
+public class CachesTest {
+  @Test
+  public void testNoopCache() {
+    Cache<String, String> cache = Caches.noop();
+    cache.put("key", "value");
+    assertNull(cache.peek("key"));
+    assertEquals("value", cache.computeIfAbsent("key", (unused) -> "value"));
+    assertNull(cache.peek("key"));
+  }
+
+  @Test
+  public void testEternalCache() {
+    testCache(Caches.eternal());
+  }
+
+  @Test
+  public void testDefaultCache() {
+    testCache(Caches.fromOptions(PipelineOptionsFactory.create()));
+  }
+
+  @Test
+  public void testSubCache() {

Review comment:
       Can we test nested subcaches as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16130:
URL: https://github.com/apache/beam/pull/16130#issuecomment-989131559


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ben-manes commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ben-manes commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r765611431



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       Guava does allow recursive calls, but it is not a supported as a feature. It will detect a simple deadlock/livelock when the caller loads for a key it previously started computing (A->B->A). An out-of-order deadlock could occur in any recursive case, though. All three libraries handle this because they compute entries outside of the hash table lock.
   
   Unless this is an advertised feature then this assumption is dangerous. No library has: Guava discourages it, Caffeine warns against it and describes an explicit approach, and cache2k doesn’t seem to be aware of this possibility (not documented or tested for).
   
   The more I think about this, the more I think you need much more rigorous testing and assurance to rely on this working in production facing code. I don’t think this should be left as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ben-manes commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ben-manes commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r765410980



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache

Review comment:
       fwiw, Caffeine handles this through `AsyncCache` where you do not need a separate thread to execute on. See FAQ's [example](https://github.com/ben-manes/caffeine/wiki/Faq#recursive-computations), which is similar to what Apache Solr does in their usage. It is a simple workaround for these rare cases and makes it more explicit about what is happening, as often recursive computes are surprising and can be error prone. I am not sure if one can expect per-key linearizability with recursive writes, so the plus/minus of this behavior depends on the use-case.
   
   One very minor topic to be aware of is that the `Cache#computeIfAbsent` used here does **not** follow the specification for Map's. In addition, until recently the Map's implementation was broken and caused the cache to be in an invalid state. See this [bug report](https://github.com/cache2k/cache2k/issues/174) where I found these issues for the simple single threaded cases. Since these types of mistakes are sadly common and only ironed by usage, you might want to spend a little extra time validating the behavior and avoiding regressions. Caffeine has had its share of bugs and it takes usage to wrangle them out, so early adopters should be a little more proactive to help the library authors catch their oversights.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16130:
URL: https://github.com/apache/beam/pull/16130#issuecomment-989300671


   Run Portable_Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r766161342



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       Yes, `computeIfAbsent` creating a loop is definitely an issue that could happen if this API was misused. I have needed a chain for the use case that I'm building like X -> Y -> Z and is guaranteed to not create a loop otherwise it will cause an infinite loop outside of the cache as well since I'm effectively caching pages and continuation tokens. This is likely to change though as a different implementation won't have the need for the recursive `computeIfAbsent`.
   
   I'll update the docs in the future (and tests) whether this is needed or not depending on the different implementation.
   
   On a different note, I'm trying to ensure that the cache reduces the number of OOMs and maximizes the amount of cache hits by having one cache that is used and a bunch of sub caches that provide views over the main cache for different key spaces. Our previous implementation used distinct cache objects which required one to partition memory and give each cache a limited amount of memory. There didn't seem to be a good way to have the caches work together so that if one was unused the other could have more memory. Note that all of these caches used only weight based eviction.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ben-manes commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ben-manes commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r766202605



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       In this case I would recommend Guava `get(key, callable)` because does support X -> Y-> Z, has been used at scale for known robustness, and it's an unofficially supported behavior. Recursive loads was tested for, fixed, discussed, and relied upon internally (e.g. by Gerrit). Otherwise an explicit lock object like the future example is ideal. I don't think you should rely on accidental behavior as your current implementation does. Note that Guava's `get(key, callable)` is very robust for this, but it's `asMap().compute` methods were added semi-recently and likely not.
   
   I'm not familiar enough but would be happy to discuss over email or a video conf if I can be of help. You might be able to use a weak value for your view caches so that they discard when the entry is evicted by the primary cache. Other times I've used index caches that maintain a secondaryKey => primaryKey mapping for a primaryKey => value cache, so it takes two lookups. The callbacks and/or reference caching can help a lot in orchestrating these complex cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16130:
URL: https://github.com/apache/beam/pull/16130#issuecomment-989321136


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ben-manes commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ben-manes commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r770227943



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       Thank you @lukecwik! Sorry if I was a bit of a pain about this. 😄




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r770226746



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       The swap to use guava cache is in https://github.com/apache/beam/pull/16252




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ben-manes commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ben-manes commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r765611431



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       Guava does allow recursive loads, but it is not a supported as a feature. It will detect a simple deadlock/livelock when the caller loads for a key it previously started computing (A->B->A). An out-of-order concurrent deadlock could occur in any recursive case, though. All three libraries handle this because they compute entries outside of the hash table lock.
   
   Unless this is an advertised feature then this assumption is dangerous. No library has: Guava discourages it, Caffeine warns against it and describes an explicit approach, and cache2k doesn’t seem to be aware of this possibility (not documented or tested for).
   
   The more I think about this, the more I think you need much more rigorous testing and assurance to rely on this working in production facing code. I don’t think this should be left as is and maybe reverted until better understood.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ben-manes commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ben-manes commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r765611431



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       Guava does allow recursive calls, but it is not a support as a feature. It will detect a simple deadlock/livelock when the caller loads for a key it previously started computing (A->B->A). An out-of-order deadlock could occur in any recursive case, though. All three libraries handle this because they compute entries outside of the hash table lock.
   
   Unless this is an advertised feature then this assumption is dangerous. No library has: Guava discourages it, Caffeine warns against it and describes an explicit approach, and cache2k doesn’t seem to be aware of this possibility (not documented or tested for).
   
   The more I think about this, the more I think you need much more rigorous testing and assurance to rely on this working in production facing code. I don’t think this should be left as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16130:
URL: https://github.com/apache/beam/pull/16130#issuecomment-989321329


   https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Commit/16564/ succeeded but writing the build result failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ben-manes commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ben-manes commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r765628733



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       /cc @cpovirk @lowasser maybe you can advise from the Google Java team’s recommendations for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ben-manes commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
ben-manes commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r765411720



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       fwiw, Caffeine handles this through `AsyncCache` where you do not need a separate thread to execute on. See FAQ's [example](https://github.com/ben-manes/caffeine/wiki/Faq#recursive-computations), which is similar to what Apache Solr does in their usage. It is a simple workaround for these rare cases and makes it more explicit about what is happening, as often recursive computes are surprising and can be error prone. I am not sure if one can expect per-key linearizability with recursive writes, so the plus/minus of this behavior depends on the use-case.
   
   One very minor topic to be aware of is that the `Cache#computeIfAbsent` used here does **not** follow the specification for Map's. In addition, until recently the Map's implementation was broken and caused the cache to be in an invalid state. See this [bug report](https://github.com/cache2k/cache2k/issues/174) where I found these issues for the simple single threaded cases. Since these types of mistakes are sadly common and only ironed out by usage, you might want to spend a little extra time validating the behavior and avoiding regressions. Caffeine has had its share of bugs and it takes usage to wrangle them out, so early adopters should be a little more proactive to help the library authors catch their oversights.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r765069337



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       This happens with ConcurrentHashMap, Guava, and Caffeine.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {

Review comment:
       Its useful for the migration when certain APIs are being updated and also for tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16130:
URL: https://github.com/apache/beam/pull/16130#issuecomment-989050016


   @ibzib PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] cpovirk commented on a change in pull request #16130: [BEAM-13015] Start integrating a process wide cache.

Posted by GitBox <gi...@apache.org>.
cpovirk commented on a change in pull request #16130:
URL: https://github.com/apache/beam/pull/16130#discussion_r770933541



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    return new SubCacheable<K, V>() {
+      @Override
+      public V peek(K key) {
+        return null;
+      }
+
+      @Override
+      public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+        return loadingFunction.apply(key);
+      }
+
+      @Override
+      public void put(K key, V value) {}
+
+      @Override
+      public void clear() {}
+
+      @Override
+      public void remove(K key) {}
+
+      @Override
+      public Set<K> keys() {
+        return Collections.emptySet();
+      }
+    };
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<Object, Object> cache =
+        Cache2kBuilder.forUnknownTypes()
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<Object, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(Object key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    } else if (cache instanceof SubCacheable) {
+      return new SubCache<>((SubCacheable<?, ?>) cache, CompositeKey.ROOT.subKey(keyPrefix));
+    }
+    throw new UnsupportedOperationException("An unsupported type of cache was passed.");
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls

Review comment:
       (Sorry for not replying. As a general rule, I endorse doing what Ben says; the details of our own caching library (let alone the others out there) are mostly beyond me.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org