You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/14 17:06:26 UTC

[GitHub] [beam] lukecwik opened a new pull request #16230: [BEAM-13015] Update FakeBeamFnStateClient to generate elements that stop at the next block boundary instead of continuing into additional blocks.

lukecwik opened a new pull request #16230:
URL: https://github.com/apache/beam/pull/16230


   This is in preparation for caching blocks of data from state. This contains test only cahnges.
   
   See https://s.apache.org/beam-fn-api-send-and-receive-data#heading=h.akxviyj4m0f0
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16230: [BEAM-13015] Update FakeBeamFnStateClient to generate elements that stop at the next block boundary instead of continuing into additional blocks.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16230:
URL: https://github.com/apache/beam/pull/16230#discussion_r770105066



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
##########
@@ -31,25 +36,82 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 
 /** A fake implementation of a {@link BeamFnStateClient} to aid with testing. */
 public class FakeBeamFnStateClient implements BeamFnStateClient {
-  private final Map<StateKey, ByteString> data;
-  private final int chunkSize;
+  private static final int DEFAULT_CHUNK_SIZE = 6;
+  private final Map<StateKey, List<ByteString>> data;
   private int currentId;
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
-    this(initialData, 6);
+  public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<StateKey, List<V>> initialData) {
+    this(valueCoder, initialData, DEFAULT_CHUNK_SIZE);
   }
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int chunkSize) {
-    this.data = new ConcurrentHashMap<>(initialData);
-    this.chunkSize = chunkSize;
+  public <V> FakeBeamFnStateClient(
+      Coder<V> valueCoder, Map<StateKey, List<V>> initialData, int chunkSize) {
+    this(Maps.transformValues(initialData, (value) -> KV.of(valueCoder, value)), chunkSize);
+  }
+
+  public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>> initialData) {
+    this(initialData, DEFAULT_CHUNK_SIZE);
+  }
+
+  public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>> initialData, int chunkSize) {
+    Map<StateKey, List<ByteString>> encodedData =
+        new HashMap<>(
+            Maps.transformValues(
+                initialData,
+                (KV<Coder<?>, List<?>> coderAndValues) -> {
+                  List<ByteString> chunks = new ArrayList<>();
+                  ByteString.Output output = ByteString.newOutput();
+                  for (Object value : coderAndValues.getValue()) {
+                    try {
+                      ((Coder<Object>) coderAndValues.getKey()).encode(value, output);
+                    } catch (IOException e) {
+                      throw new RuntimeException(e);
+                    }
+                    if (output.size() >= chunkSize) {
+                      ByteString chunk = output.toByteString();
+                      int i = 0;
+                      for (; i + chunkSize <= chunk.size(); i += chunkSize) {
+                        // We specifically use a copy of the bytes instead of a proper substring
+                        // so that debugging is easier since we don't have to worry about the
+                        // substring being a view over the original string.
+                        chunks.add(
+                            ByteString.copyFrom(chunk.substring(i, i + chunkSize).toByteArray()));
+                      }
+                      if (i < chunk.size()) {
+                        chunks.add(
+                            ByteString.copyFrom(chunk.substring(i, chunk.size()).toByteArray()));
+                      }
+                      output.reset();
+                    }
+                  }
+                  // Add the last chunk
+                  if (output.size() > 0) {
+                    chunks.add(output.toByteString());
+                  }
+                  return chunks;
+                }));
+    this.data =
+        new ConcurrentHashMap<>(
+            Maps.filterValues(encodedData, byteStrings -> !byteStrings.isEmpty()));

Review comment:
       Yup.

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -301,14 +303,17 @@ public void testUsingUserState() throws Exception {
       assertThat(mainOutputValues, empty());
 
       assertEquals(
-          ImmutableMap.<StateKey, ByteString>builder()
-              .put(bagUserStateKey("value", "X"), encode("X2"))
-              .put(bagUserStateKey("bag", "X"), encode("X0", "X1", "X2"))
-              .put(bagUserStateKey("combine", "X"), encode("X0X1X2"))
-              .put(bagUserStateKey("value", "Y"), encode("Y2"))
-              .put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2"))
-              .put(bagUserStateKey("combine", "Y"), encode("Y1Y2"))
-              .build(),
+          new FakeBeamFnStateClient(
+                  StringUtf8Coder.of(),
+                  ImmutableMap.<StateKey, List<String>>builder()
+                      .put(bagUserStateKey("value", "X"), asList("X2"))
+                      .put(bagUserStateKey("bag", "X"), asList("X0", "X1", "X2"))
+                      .put(bagUserStateKey("combine", "X"), asList("X0X1X2"))
+                      .put(bagUserStateKey("value", "Y"), asList("Y2"))
+                      .put(bagUserStateKey("bag", "Y"), asList("Y1", "Y2"))
+                      .put(bagUserStateKey("combine", "Y"), asList("Y1Y2"))
+                      .build())
+              .getData(),

Review comment:
       I wanted for the comparison to be based upon what the internal representation of FakeBeamFnStateClient does allowing for future changes to be simpler overall instead of relying on the caller to know the exact ordering/bytestring layout.

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
##########
@@ -31,25 +36,82 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 
 /** A fake implementation of a {@link BeamFnStateClient} to aid with testing. */
 public class FakeBeamFnStateClient implements BeamFnStateClient {
-  private final Map<StateKey, ByteString> data;
-  private final int chunkSize;
+  private static final int DEFAULT_CHUNK_SIZE = 6;
+  private final Map<StateKey, List<ByteString>> data;
   private int currentId;
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
-    this(initialData, 6);
+  public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<StateKey, List<V>> initialData) {
+    this(valueCoder, initialData, DEFAULT_CHUNK_SIZE);
   }
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int chunkSize) {
-    this.data = new ConcurrentHashMap<>(initialData);
-    this.chunkSize = chunkSize;
+  public <V> FakeBeamFnStateClient(
+      Coder<V> valueCoder, Map<StateKey, List<V>> initialData, int chunkSize) {
+    this(Maps.transformValues(initialData, (value) -> KV.of(valueCoder, value)), chunkSize);
+  }
+
+  public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>> initialData) {

Review comment:
       The factory method idea is much cleaner and will use that in a future PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16230: [BEAM-13015] Update FakeBeamFnStateClient to generate elements that stop at the next block boundary instead of continuing into additional blocks.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16230:
URL: https://github.com/apache/beam/pull/16230#issuecomment-995192907


   R: @TheNeuralBit 
   
   Looks like Yichi is OOO.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #16230: [BEAM-13015] Update FakeBeamFnStateClient to generate elements that stop at the next block boundary instead of continuing into additional blocks.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #16230:
URL: https://github.com/apache/beam/pull/16230#discussion_r770063300



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
##########
@@ -31,25 +36,82 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 
 /** A fake implementation of a {@link BeamFnStateClient} to aid with testing. */
 public class FakeBeamFnStateClient implements BeamFnStateClient {
-  private final Map<StateKey, ByteString> data;
-  private final int chunkSize;
+  private static final int DEFAULT_CHUNK_SIZE = 6;
+  private final Map<StateKey, List<ByteString>> data;
   private int currentId;
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
-    this(initialData, 6);
+  public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<StateKey, List<V>> initialData) {
+    this(valueCoder, initialData, DEFAULT_CHUNK_SIZE);
   }
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int chunkSize) {
-    this.data = new ConcurrentHashMap<>(initialData);
-    this.chunkSize = chunkSize;
+  public <V> FakeBeamFnStateClient(
+      Coder<V> valueCoder, Map<StateKey, List<V>> initialData, int chunkSize) {
+    this(Maps.transformValues(initialData, (value) -> KV.of(valueCoder, value)), chunkSize);
+  }
+
+  public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>> initialData) {

Review comment:
       It's confusing to use a `KV` here, could you just make a quick `@AutoValue CoderAndData<T> ` instead?

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
##########
@@ -31,25 +36,82 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 
 /** A fake implementation of a {@link BeamFnStateClient} to aid with testing. */
 public class FakeBeamFnStateClient implements BeamFnStateClient {
-  private final Map<StateKey, ByteString> data;
-  private final int chunkSize;
+  private static final int DEFAULT_CHUNK_SIZE = 6;
+  private final Map<StateKey, List<ByteString>> data;
   private int currentId;
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
-    this(initialData, 6);
+  public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<StateKey, List<V>> initialData) {
+    this(valueCoder, initialData, DEFAULT_CHUNK_SIZE);
   }
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int chunkSize) {
-    this.data = new ConcurrentHashMap<>(initialData);
-    this.chunkSize = chunkSize;
+  public <V> FakeBeamFnStateClient(
+      Coder<V> valueCoder, Map<StateKey, List<V>> initialData, int chunkSize) {
+    this(Maps.transformValues(initialData, (value) -> KV.of(valueCoder, value)), chunkSize);
+  }
+
+  public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>> initialData) {
+    this(initialData, DEFAULT_CHUNK_SIZE);
+  }
+
+  public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>> initialData, int chunkSize) {
+    Map<StateKey, List<ByteString>> encodedData =
+        new HashMap<>(
+            Maps.transformValues(
+                initialData,
+                (KV<Coder<?>, List<?>> coderAndValues) -> {
+                  List<ByteString> chunks = new ArrayList<>();
+                  ByteString.Output output = ByteString.newOutput();
+                  for (Object value : coderAndValues.getValue()) {
+                    try {
+                      ((Coder<Object>) coderAndValues.getKey()).encode(value, output);
+                    } catch (IOException e) {
+                      throw new RuntimeException(e);
+                    }
+                    if (output.size() >= chunkSize) {
+                      ByteString chunk = output.toByteString();
+                      int i = 0;
+                      for (; i + chunkSize <= chunk.size(); i += chunkSize) {
+                        // We specifically use a copy of the bytes instead of a proper substring
+                        // so that debugging is easier since we don't have to worry about the
+                        // substring being a view over the original string.
+                        chunks.add(
+                            ByteString.copyFrom(chunk.substring(i, i + chunkSize).toByteArray()));
+                      }
+                      if (i < chunk.size()) {
+                        chunks.add(
+                            ByteString.copyFrom(chunk.substring(i, chunk.size()).toByteArray()));
+                      }
+                      output.reset();
+                    }
+                  }
+                  // Add the last chunk
+                  if (output.size() > 0) {
+                    chunks.add(output.toByteString());
+                  }
+                  return chunks;
+                }));
+    this.data =
+        new ConcurrentHashMap<>(
+            Maps.filterValues(encodedData, byteStrings -> !byteStrings.isEmpty()));

Review comment:
       nit: consider doing this work in a factory method rather than in a constructor.
   
   I think the reasoning behind this has to do with testability though. Since this is a test utility maybe thats a moot point.  
   
   

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -301,14 +303,17 @@ public void testUsingUserState() throws Exception {
       assertThat(mainOutputValues, empty());
 
       assertEquals(
-          ImmutableMap.<StateKey, ByteString>builder()
-              .put(bagUserStateKey("value", "X"), encode("X2"))
-              .put(bagUserStateKey("bag", "X"), encode("X0", "X1", "X2"))
-              .put(bagUserStateKey("combine", "X"), encode("X0X1X2"))
-              .put(bagUserStateKey("value", "Y"), encode("Y2"))
-              .put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2"))
-              .put(bagUserStateKey("combine", "Y"), encode("Y1Y2"))
-              .build(),
+          new FakeBeamFnStateClient(
+                  StringUtf8Coder.of(),
+                  ImmutableMap.<StateKey, List<String>>builder()
+                      .put(bagUserStateKey("value", "X"), asList("X2"))
+                      .put(bagUserStateKey("bag", "X"), asList("X0", "X1", "X2"))
+                      .put(bagUserStateKey("combine", "X"), asList("X0X1X2"))
+                      .put(bagUserStateKey("value", "Y"), asList("Y2"))
+                      .put(bagUserStateKey("bag", "Y"), asList("Y1", "Y2"))
+                      .put(bagUserStateKey("combine", "Y"), asList("Y1Y2"))
+                      .build())
+              .getData(),

Review comment:
       Wouldn't it be preferable to keep the old assertion, since it's explicitly constructing the expected value?

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
##########
@@ -31,25 +36,82 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 
 /** A fake implementation of a {@link BeamFnStateClient} to aid with testing. */
 public class FakeBeamFnStateClient implements BeamFnStateClient {
-  private final Map<StateKey, ByteString> data;
-  private final int chunkSize;
+  private static final int DEFAULT_CHUNK_SIZE = 6;
+  private final Map<StateKey, List<ByteString>> data;
   private int currentId;
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
-    this(initialData, 6);
+  public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<StateKey, List<V>> initialData) {
+    this(valueCoder, initialData, DEFAULT_CHUNK_SIZE);
   }
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int chunkSize) {
-    this.data = new ConcurrentHashMap<>(initialData);
-    this.chunkSize = chunkSize;
+  public <V> FakeBeamFnStateClient(
+      Coder<V> valueCoder, Map<StateKey, List<V>> initialData, int chunkSize) {
+    this(Maps.transformValues(initialData, (value) -> KV.of(valueCoder, value)), chunkSize);
+  }
+
+  public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>> initialData) {
+    this(initialData, DEFAULT_CHUNK_SIZE);
+  }
+
+  public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>> initialData, int chunkSize) {
+    Map<StateKey, List<ByteString>> encodedData =
+        new HashMap<>(
+            Maps.transformValues(
+                initialData,
+                (KV<Coder<?>, List<?>> coderAndValues) -> {
+                  List<ByteString> chunks = new ArrayList<>();
+                  ByteString.Output output = ByteString.newOutput();
+                  for (Object value : coderAndValues.getValue()) {
+                    try {
+                      ((Coder<Object>) coderAndValues.getKey()).encode(value, output);
+                    } catch (IOException e) {
+                      throw new RuntimeException(e);
+                    }
+                    if (output.size() >= chunkSize) {
+                      ByteString chunk = output.toByteString();
+                      int i = 0;
+                      for (; i + chunkSize <= chunk.size(); i += chunkSize) {
+                        // We specifically use a copy of the bytes instead of a proper substring
+                        // so that debugging is easier since we don't have to worry about the
+                        // substring being a view over the original string.
+                        chunks.add(
+                            ByteString.copyFrom(chunk.substring(i, i + chunkSize).toByteArray()));
+                      }
+                      if (i < chunk.size()) {
+                        chunks.add(
+                            ByteString.copyFrom(chunk.substring(i, chunk.size()).toByteArray()));
+                      }
+                      output.reset();
+                    }
+                  }
+                  // Add the last chunk
+                  if (output.size() > 0) {
+                    chunks.add(output.toByteString());
+                  }
+                  return chunks;
+                }));
+    this.data =
+        new ConcurrentHashMap<>(
+            Maps.filterValues(encodedData, byteStrings -> !byteStrings.isEmpty()));

Review comment:
       I think it would improve readability though, to have a set of different factory methods rather than a mix of calls to `new FakeBeamFnStateClient`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16230: [BEAM-13015] Update FakeBeamFnStateClient to generate elements that stop at the next block boundary instead of continuing into additional blocks.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16230:
URL: https://github.com/apache/beam/pull/16230#issuecomment-993791103


   R: @y1chi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16230: [BEAM-13015] Update FakeBeamFnStateClient to generate elements that stop at the next block boundary instead of continuing into additional blocks.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16230:
URL: https://github.com/apache/beam/pull/16230#issuecomment-994122738


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #16230: [BEAM-13015] Update FakeBeamFnStateClient to generate elements that stop at the next block boundary instead of continuing into additional blocks.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #16230:
URL: https://github.com/apache/beam/pull/16230#issuecomment-995283485


   I have a few other PRs that rely on this format. I'll swap to the factory approach once those are in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #16230: [BEAM-13015] Update FakeBeamFnStateClient to generate elements that stop at the next block boundary instead of continuing into additional blocks.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #16230:
URL: https://github.com/apache/beam/pull/16230


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #16230: [BEAM-13015] Update FakeBeamFnStateClient to generate elements that stop at the next block boundary instead of continuing into additional blocks.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #16230:
URL: https://github.com/apache/beam/pull/16230#discussion_r768873740



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java
##########
@@ -32,48 +36,58 @@
 /** Tests for {@link MultimapSideInput}. */
 @RunWith(JUnit4.class)
 public class MultimapSideInputTest {

Review comment:
       I swapped here to use bytes to ensure that we were performing comparisons using structural values and not based upon java equality.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org