You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "rohdesamuel (via GitHub)" <gi...@apache.org> on 2023/02/06 20:51:42 UTC

[GitHub] [beam] rohdesamuel opened a new pull request, #25354: Data sampling java

rohdesamuel opened a new pull request, #25354:
URL: https://github.com/apache/beam/pull/25354

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1106543369


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
+   */
+  public DataSampler() {
+    this.maxSamples = 10;
+    this.sampleEveryN = 1000;
+  }
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1113663848


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -91,10 +94,13 @@ public void sample(T element) {
   /**
    * Clears samples at end of call. This is to help mitigate memory use.
    *
+   * <p>This method is invoked by a thread handling a data sampling request in parallel to any calls
+   * to {@link #sample}.
+   *
    * @return samples taken since last call.
    */
-  public List<byte[]> samples() {
-    List<byte[]> ret = new ArrayList<>();
+  public List<BeamFnApi.SampledElement> samples() throws IOException {
+    List<BeamFnApi.SampledElement> ret = new ArrayList<>();
 
     // Serializing can take a lot of CPU time for larger or complex elements. Copy the array here
     // so as to not slow down the main processing hot path.

Review Comment:
   Done! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on PR #25354:
URL: https://github.com/apache/beam/pull/25354#issuecomment-1428837599

   Can you rebase on top of master and then I can do the next deep review round?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on PR #25354:
URL: https://github.com/apache/beam/pull/25354#issuecomment-1421402599

   Can you address the checkstyle failures?
   see https://ci-beam.apache.org/job/beam_PreCommit_Spotless_Commit/25154/checkstyle/new/#issuesContent
   
   ```
   Details | File | Package | Category | Type | Severity | Age
   -- | -- | -- | -- | -- | -- | --
     | ProcessBundleHandler.java:23 | org.apache.beam.fn.harness.control | Imports | AvoidStarImportCheck | Error | 1
   Using the '.*' form of import should be avoided - java.util.*.Since Checkstyle 3.0Checks that there are no import statements that use the * notation.Rationale: Importing all classes from a package or static members from a class leads to tight coupling between packages or classes and might lead to problems when a new version of a library introduces name clashes.
     | DataSamplingDescriptorModifier.java:1 | org.apache.beam.fn.harness.debug | Javadoc | JavadocPackageCheck | Error | 1
   Missing package-info.java file.Since Checkstyle 5.0Checks that each Java package has a Javadoc file used for commenting. By default it only allows a package-info.java file, but can be configured to allow a package.html file.An error will be reported if both files exist as this is not allowed by the Javadoc tool.
     | ProcessBundleDescriptorModifier.java:40 | org.apache.beam.fn.harness | Naming | MethodNameCheck | Error | 1
   Name 'ModifyProcessBundleDescriptor' must match pattern '^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*.Checks that method names conform to a format specified by the format property.Also, checks if a method name has the same name as the residing class. The default is false (it is not allowed). It is legal in Java to have method with the same name as a class. As long as a return type is specified it is a method and not a constructor which it could be easily confused as. Does not check-style the name of an overridden methods because the developer does not have a choice in renaming such methods.
     | DataSamplerTest.java:27 | org.apache.beam.fn.harness.debug | Imports | AvoidStarImportCheck | Error | 1
     | DataSamplingDescriptorModifierTest.java:41 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
   Name 'PCOLLECTION_ID_A' must match pattern '^[a-z][a-zA-Z0-9]*.Checks that local final variable names conform to a format specified by the format property. A catch parameter and resources in try statements are considered to be a local, final variables.
     | DataSamplingDescriptorModifierTest.java:42 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
     | DataSamplingDescriptorModifierTest.java:43 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
     | DataSamplingDescriptorModifierTest.java:44 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
     | DataSamplingDescriptorModifierTest.java:71 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
     | DataSamplingDescriptorModifierTest.java:72 | org.apache.beam.fn.harness.debug | Naming | LocalFinalVariableNameCheck | Error | 1
     | DataSamplingFnRunnerTest.java:22 | org.apache.beam.fn.harness.debug | Imports | AvoidStarImportCheck | Error | 1
     | DataSamplingFnRunnerTest.java:25 | org.apache.beam.fn.harness.debug | Imports | AvoidStarImportCheck | Error | 1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on PR #25354:
URL: https://github.com/apache/beam/pull/25354#issuecomment-1440453832

   Java PreCommit passed, just GH UI failed to update in a timely fashion: https://ci-beam.apache.org/job/beam_PreCommit_Java_Phrase/5948/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1102048741


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only

Review Comment:
   I see what you mean by using a DataSampler for each bundle but do you really want to sample for each bundle independently N elements and then 1 element for each 1000 since this would be an issue for streaming since streaming has 100s of work items executing in parallel with pretty small bundles so if your sampling at the bundle level your likely going to over sample by a lot and storing it all in memory at the same time.
   
   I was thinking that you would have a single object per PCollection effectively that stored all the samples so you wouldn't have to worry about memory growth beyond the fixed size of the N objects per PCollection but you want to make the decision whether to sample per bundle being processed independently so you don't add synchronization overhead when not sampling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1103248650


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {

Review Comment:
   Redone with atomics:
   
   **yes data sampling (no atomics)**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1150.680 ± 25.650  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 7,5,main]) completed. Took 5 mins 9.453 secs.
   
   **yes data sampling (with atomics)**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1547.291 ± 50.887  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 6,5,main]) completed. Took 5 mins 8.804 secs.
   
   
   **no data sampling**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1709.427 ± 38.715  ops/s
   :sdks:java:harness:jmh:jmh (Thread[included builds,5,main]) completed. Took 5 mins 8.804 secs.
   
   **At head**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1807.738 ± 15.295  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 2,5,main]) completed. Took 5 mins 8.921 secs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1103248650


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {

Review Comment:
   Redone with atomics:
   
   **yes data sampling (no atomics)**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1150.680 ± 25.650  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 7,5,main]) completed. Took 5 mins 9.453 secs.
   
   **yes data sampling (with atomics)**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1547.291 ± 50.887  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 6,5,main]) completed. Took 5 mins 8.804 secs.
   
   
   **no data sampling**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1697.123 ± 74.466  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 3,5,main]) completed. Took 5 mins 8.911 secs.
   
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1709.427 ± 38.715  ops/s
   :sdks:java:harness:jmh:jmh (Thread[included builds,5,main]) completed. Took 5 mins 8.804 secs.
   
   **At head**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1807.738 ± 15.295  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 2,5,main]) completed. Took 5 mins 8.921 secs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1113664617


##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws Exception {
     assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"), encodeString("a2")));
     assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"), encodeString("c2")));
   }
+
+  /**
+   * Test that samples can be taken from the DataSampler while adding new OutputSamplers. This fails
+   * with a ConcurrentModificationException if there is a bug.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentNewSampler() throws Exception {
+    DataSampler sampler = new DataSampler();
+    VarIntCoder coder = VarIntCoder.of();
+
+    // Create a thread that constantly creates new samplers.
+    Thread sampleThread =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 1000000; i++) {
+                sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+                // This sleep is here to allow for the test to stop this thread.
+                try {
+                  Thread.sleep(0);
+                } catch (InterruptedException e) {
+                  return;
+                }
+              }
+            });
+
+    sampleThread.start();
+
+    for (int i = 0; i < 20; i++) {
+      sampler.handleDataSampleRequest(
+          BeamFnApi.InstructionRequest.newBuilder()
+              .setSampleData(BeamFnApi.SampleDataRequest.newBuilder())
+              .build());
+    }
+
+    sampleThread.interrupt();

Review Comment:
   Ahh cool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1113713921


##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
     // The first 10 are always sampled, but with maxSamples = 5, the first ten are downsampled to
     // 4..9 inclusive. Then,
     // the 20th element is sampled (19) and every 20 after.
-    List<byte[]> expected = new ArrayList<>();
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(19));
     expected.add(encodeInt(39));
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
 
-    List<byte[]> samples = outputSampler.samples();
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
+
+  /**
+   * Test that sampling a PCollection while retrieving samples from multiple threads is ok.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSamples() throws Exception {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 1);
+
+    // Iteration count was empirically chosen to have a high probability of failure without the
+    // test going for too long.
+    Thread sampleThreadA =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 10000000; i++) {
+                outputSampler.sample(i);
+              }
+            });
+
+    Thread sampleThreadB =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 10000000; i++) {
+                outputSampler.sample(i);
+              }
+            });
+
+    sampleThreadA.start();
+    sampleThreadB.start();
+
+    for (int i = 0; i < 10000; i++) {
+      outputSampler.samples();
+    }

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1103400750


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {

Review Comment:
   This looks much better since the variation in the runs is smaller but 5% is still not small.
   
   Hopefully marking the fields final will address that but if not we might want to go with the subclass route:
   ```
   SamplingAndMetricTrackingFnDataReceiver extends MetricTrackingFnDataReceiver {
   
   public void accept(WindowedValue<T> value) {
     sample(value);
     super.accept(value);
   }
   ```
   
   Or take a look at the profiling information. If you follow the instructions here to install cloud profiler locally
   https://github.com/apache/beam/blob/b64526a3a2429d30535d5cd52d99521878dd3d2c/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L1549
   and then authenticate with gcloud you can run (replacing the GCP project with one that you have access to) will upload the results to Google Cloud Profiler:
   ./gradlew -info :sdks:java:harness:jmh:jmh -Pbenchmark=ProcessBundleBenchmark.testLargeBundle -PgcpProject=...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1105099337


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {

Review Comment:
   I think the problem was that I was measuring the null hypothesis at upstream head, instead of origin head. I re-ran the benchmark and saw that the benchmark at origin is the same as data sampling enabled. Maybe there were some changes that resulted in some performance differences?
   
   **yes sampling (with atomics)**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1584.137 ± 18.961  ops/s
   :sdks:java:harness:jmh:jmh (Thread[included builds,5,main]) completed. Took 5 mins 11.907 secs.
   
   **no data sampling**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1699.238 ± 16.382  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 11.424 secs.
   
   **at upstream head**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1737.774 ± 34.448  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 8.815 secs.
   
   **at origin head**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1698.047 ± 17.290  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 10.494 secs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1113713598


##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws Exception {
     assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"), encodeString("a2")));
     assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"), encodeString("c2")));
   }
+
+  /**
+   * Test that samples can be taken from the DataSampler while adding new OutputSamplers. This fails
+   * with a ConcurrentModificationException if there is a bug.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentNewSampler() throws Exception {
+    DataSampler sampler = new DataSampler();
+    VarIntCoder coder = VarIntCoder.of();
+
+    // Create a thread that constantly creates new samplers.
+    Thread sampleThread =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 1000000; i++) {
+                sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+                // This sleep is here to allow for the test to stop this thread.
+                try {
+                  Thread.sleep(0);
+                } catch (InterruptedException e) {
+                  return;
+                }
+              }
+            });
+
+    sampleThread.start();
+
+    for (int i = 0; i < 20; i++) {
+      sampler.handleDataSampleRequest(

Review Comment:
   Cool, used a CountDownLatch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1106533556


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java:
##########
@@ -349,6 +386,10 @@ public void accept(WindowedValue<T> input) throws Exception {
       // when we have window optimization.
       this.sampledByteSizeDistribution.tryUpdate(input.getValue(), coder);
 
+      if (outputSampler != null) {
+        outputSampler.sample(input.getValue());

Review Comment:
   I do, but I don't want to alter the encoding of the sampled bytes when sending back to the runner. It's not impossible, but I've found that there are a lot of mines when changing encodings.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -89,7 +92,9 @@ public class FnHarness {
   private static final String STATUS_API_SERVICE_DESCRIPTOR = "STATUS_API_SERVICE_DESCRIPTOR";
   private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
   private static final String RUNNER_CAPABILITIES = "RUNNER_CAPABILITIES";
+  private static final String ENABLE_DATA_SAMPLING_EXPERIMENT = "enable_data_sampling";
   private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);
+  private static final DataSampler dataSampler = new DataSampler();

Review Comment:
   sg, done



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
+   */
+  public DataSampler() {
+    this.maxSamples = 10;
+    this.sampleEveryN = 1000;
+  }
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private final int maxSamples;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // The fully-qualified type is: Map[PCollectionId, OutputSampler]. In order to sample
+  // on a PCollection-basis and not per-bundle, this keeps track of shared samples between states.
+  private final Map<String, OutputSampler<?>> outputSamplers = new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @param <T> The type of element contained in the PCollection.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(

Review Comment:
   done



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
+   */
+  public DataSampler() {
+    this.maxSamples = 10;
+    this.sampleEveryN = 1000;
+  }
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private final int maxSamples;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // The fully-qualified type is: Map[PCollectionId, OutputSampler]. In order to sample
+  // on a PCollection-basis and not per-bundle, this keeps track of shared samples between states.
+  private final Map<String, OutputSampler<?>> outputSamplers = new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @param <T> The type of element contained in the PCollection.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(
+        pcollectionId, new OutputSampler<>(coder, this.maxSamples, this.sampleEveryN));
+    return (OutputSampler<T>) outputSamplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSampleData();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(sampleDataRequest.getPcollectionIdsList());
+
+    BeamFnApi.SampleDataResponse.Builder response = BeamFnApi.SampleDataResponse.newBuilder();
+    for (String pcollectionId : responseSamples.keySet()) {
+      ElementList.Builder elementList = ElementList.newBuilder();
+      for (byte[] sample : responseSamples.get(pcollectionId)) {
+        elementList.addElements(
+            SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+      }
+      response.putElementSamples(pcollectionId, elementList.build());
+    }
+
+    return BeamFnApi.InstructionResponse.newBuilder().setSampleData(response);
+  }
+
+  /**
+   * Returns a map from PCollection to its samples. Samples are filtered on
+   * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+   *
+   * @param pcollections Filters all PCollections on this set. If empty, allows all PCollections.
+   * @return a map from PCollection to its samples.
+   */
+  private Map<String, List<byte[]>> samplesFor(List<String> pcollections) {

Review Comment:
   done



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(OutputSampler.class);
+
+  // Temporarily holds elements until the SDK receives a sample data request.
+  private final List<T> buffer;
+
+  // Maximum number of elements in buffer.
+  private final int maxElements;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // Total number of samples taken.
+  private final AtomicLong numSamples = new AtomicLong();
+
+  // Index into the buffer of where to overwrite samples.
+  private int resampleIndex = 0;
+
+  private final Coder<T> coder;
+
+  public OutputSampler(Coder<T> coder, int maxElements, int sampleEveryN) {
+    this.coder = coder;
+    this.maxElements = maxElements;
+    this.sampleEveryN = sampleEveryN;
+    this.buffer = new ArrayList<>(this.maxElements);
+  }
+
+  /**
+   * Samples every 1000th element or if it is part of the first 10 in the (local) PCollection.
+   *
+   * @param element the element to sample.
+   */
+  public void sample(T element) {
+    // Only sample the first 10 elements then after every `sampleEveryN`th element.
+    long samples = numSamples.get() + 1;
+
+    // This has eventual consistency. If there are many threads lazy setting, this will be set to
+    // the slowest thread accessing the atomic. But over time, it will still increase. This is ok
+    // because this is a debugging feature and doesn't need strict atomics.
+    numSamples.lazySet(samples);
+    if (samples > 10 && samples % sampleEveryN != 0) {
+      return;
+    }
+
+    synchronized (this) {
+      // Fill buffer until maxElements.
+      if (buffer.size() < maxElements) {
+        buffer.add(element);
+      } else {
+        // Then rewrite sampled elements as a circular buffer.
+        buffer.set(resampleIndex, element);
+        resampleIndex = (resampleIndex + 1) % maxElements;
+      }
+    }
+  }
+
+  /**
+   * Clears samples at end of call. This is to help mitigate memory use.
+   *
+   * @return samples taken since last call.
+   */
+  public List<byte[]> samples() {
+    List<byte[]> ret = new ArrayList<>();
+
+    // Serializing can take a lot of CPU time for larger or complex elements. Copy the array here
+    // so as to not slow down the main processing hot path.
+    List<T> copiedBuffer;
+    synchronized (this) {
+      copiedBuffer = new ArrayList<>(buffer);
+      buffer.clear();
+      resampleIndex = 0;
+    }
+
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();

Review Comment:
   done, also changed to return SampleElements



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataSamplerTest {
+  byte[] encodeInt(Integer i) throws IOException {
+    VarIntCoder coder = VarIntCoder.of();
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    coder.encode(i, stream);

Review Comment:
   done, added a test for this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25354:
URL: https://github.com/apache/beam/pull/25354#issuecomment-1421428228

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1105125507


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {

Review Comment:
   I also ran with the profiling enabled but wasn't able to identify any more points for improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on PR #25354:
URL: https://github.com/apache/beam/pull/25354#issuecomment-1430671365

   Added requested tests, lmk if I can improve them. I don't have a lot of experience with making testing multi-threaded Java code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1114675284


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -104,19 +104,22 @@ public List<BeamFnApi.SampledElement> samples() throws IOException {
 
     // Serializing can take a lot of CPU time for larger or complex elements. Copy the array here
     // so as to not slow down the main processing hot path.
-    List<T> copiedBuffer;
+    List<T> bufferToSend;
+    int sampleIndex = 0;
     synchronized (this) {
-      copiedBuffer = new ArrayList<>(buffer);
-      buffer.clear();
+      bufferToSend = buffer;
+      sampleIndex = resampleIndex;
+      buffer = new ArrayList<>(maxElements);
       resampleIndex = 0;
     }
 
     ByteStringOutputStream stream = new ByteStringOutputStream();
-    for (T el : copiedBuffer) {
+    for (int i = 0; i < bufferToSend.size(); i++) {
+      int index = (sampleIndex + i) % bufferToSend.size();

Review Comment:
   The specification doesn't say anything about having these ordered in the response based upon oldest to newest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik merged pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik merged PR #25354:
URL: https://github.com/apache/beam/pull/25354


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1113627698


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -35,22 +34,22 @@
  * simultaneously, even if computing the same logical PCollection.
  */
 public class DataSampler {
+  private static final Logger LOG = LoggerFactory.getLogger(DataSampler.class);
 
   /**
    * Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
    */
   public DataSampler() {
-    this.maxSamples = 10;
-    this.sampleEveryN = 1000;
+    this(10, 1000);
   }
 
   /**
    * @param maxSamples Sets the maximum number of samples held in memory at once.
    * @param sampleEveryN Sets how often to sample.
    */
   public DataSampler(int maxSamples, int sampleEveryN) {
-    this.maxSamples = maxSamples;
-    this.sampleEveryN = sampleEveryN;
+    this.maxSamples = maxSamples <= 0 ? 10 : maxSamples;
+    this.sampleEveryN = sampleEveryN <= 0 ? 1000 : sampleEveryN;

Review Comment:
   Its usually better to throw an IllegalArgumentException in these cases instead of silently having a different behavior then before.
   
   ```suggestion
       checkArgument(maxSamples > 0, "Expected positive number of samples, did you mean to disable data sampling?");
       checkArgument(sampleEveryN > 0, "Expected positive number for sampling period, did you mean to disable data sampling?");
       this.maxSamples = maxSamples;
       this.sampleEveryN = sampleEveryN;
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -91,10 +94,13 @@ public void sample(T element) {
   /**
    * Clears samples at end of call. This is to help mitigate memory use.
    *
+   * <p>This method is invoked by a thread handling a data sampling request in parallel to any calls
+   * to {@link #sample}.
+   *
    * @return samples taken since last call.
    */
-  public List<byte[]> samples() {
-    List<byte[]> ret = new ArrayList<>();
+  public List<BeamFnApi.SampledElement> samples() throws IOException {
+    List<BeamFnApi.SampledElement> ret = new ArrayList<>();
 
     // Serializing can take a lot of CPU time for larger or complex elements. Copy the array here
     // so as to not slow down the main processing hot path.

Review Comment:
   Since your only ever accessing buffer under a synchronized block it will be better to do a buffer swap then to copy the buffer.
   
   e.g.
   ```
       List<T> bufferToSend;
       synchronized (this) {
         bufferToSend = buffer;
         buffer = new ArrayList(maxElements);
         resampleIndex = 0;
       }
   ```
   
   Saves on copying the elements and clearing the original.



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -117,6 +127,24 @@ public void testSingleOutput() throws Exception {
     assertHasSamples(samples, "pcollection-id", Collections.singleton(encodeInt(1)));
   }
 
+  /**
+   * Smoke test that a samples show in the output map.

Review Comment:
   ```suggestion
      * Smoke test that a sample shows in the output map.
   ```



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -117,6 +127,24 @@ public void testSingleOutput() throws Exception {
     assertHasSamples(samples, "pcollection-id", Collections.singleton(encodeInt(1)));
   }
 
+  /**
+   * Smoke test that a samples show in the output map.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNestedContext() throws Exception {
+    DataSampler sampler = new DataSampler();
+
+    String rawString = "hello";
+    byte[] byteArray = rawString.getBytes(Charset.forName("ASCII"));

Review Comment:
   ```suggestion
       byte[] byteArray = rawString.getBytes(StandardCharsets.US_ASCII);
   ```



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
     // The first 10 are always sampled, but with maxSamples = 5, the first ten are downsampled to
     // 4..9 inclusive. Then,
     // the 20th element is sampled (19) and every 20 after.
-    List<byte[]> expected = new ArrayList<>();
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(19));
     expected.add(encodeInt(39));
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
 
-    List<byte[]> samples = outputSampler.samples();
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
+
+  /**
+   * Test that sampling a PCollection while retrieving samples from multiple threads is ok.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSamples() throws Exception {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 1);

Review Comment:
   ```suggestion
       OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 1);
   ```
   
   You can use a small maxElements size like 10. Using a bigger number may decrease contention. Also using sampleEveryN of 2 means that we swap between choosing to sample and not to sample often instead of sampling every element.



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
     // The first 10 are always sampled, but with maxSamples = 5, the first ten are downsampled to
     // 4..9 inclusive. Then,
     // the 20th element is sampled (19) and every 20 after.
-    List<byte[]> expected = new ArrayList<>();
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(19));
     expected.add(encodeInt(39));
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
 
-    List<byte[]> samples = outputSampler.samples();
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
+
+  /**
+   * Test that sampling a PCollection while retrieving samples from multiple threads is ok.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSamples() throws Exception {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 1);
+
+    // Iteration count was empirically chosen to have a high probability of failure without the
+    // test going for too long.
+    Thread sampleThreadA =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 10000000; i++) {
+                outputSampler.sample(i);
+              }
+            });
+
+    Thread sampleThreadB =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 10000000; i++) {
+                outputSampler.sample(i);
+              }
+            });
+
+    sampleThreadA.start();
+    sampleThreadB.start();
+
+    for (int i = 0; i < 10000; i++) {
+      outputSampler.samples();
+    }

Review Comment:
   Grab samples until both of the above threads are done, no need to go to a fixed number.
   
   Also perform the validation that I described above on each of the output samples.



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
     // The first 10 are always sampled, but with maxSamples = 5, the first ten are downsampled to
     // 4..9 inclusive. Then,
     // the 20th element is sampled (19) and every 20 after.
-    List<byte[]> expected = new ArrayList<>();
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(19));
     expected.add(encodeInt(39));
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
 
-    List<byte[]> samples = outputSampler.samples();
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
+
+  /**
+   * Test that sampling a PCollection while retrieving samples from multiple threads is ok.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSamples() throws Exception {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 1);
+
+    // Iteration count was empirically chosen to have a high probability of failure without the
+    // test going for too long.
+    Thread sampleThreadA =

Review Comment:
   You should use the count down latch to increase contention.



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws Exception {
     assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"), encodeString("a2")));
     assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"), encodeString("c2")));
   }
+
+  /**
+   * Test that samples can be taken from the DataSampler while adding new OutputSamplers. This fails
+   * with a ConcurrentModificationException if there is a bug.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentNewSampler() throws Exception {
+    DataSampler sampler = new DataSampler();
+    VarIntCoder coder = VarIntCoder.of();
+
+    // Create a thread that constantly creates new samplers.
+    Thread sampleThread =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 1000000; i++) {
+                sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+                // This sleep is here to allow for the test to stop this thread.
+                try {
+                  Thread.sleep(0);
+                } catch (InterruptedException e) {
+                  return;
+                }
+              }
+            });
+
+    sampleThread.start();
+
+    for (int i = 0; i < 20; i++) {
+      sampler.handleDataSampleRequest(

Review Comment:
   This tests concurrency between a single sampling thread and getting the progress request.
   
   You'll want to update this test to cover multiple threads (e.g. like 100) all creating the same set of 100 output samplers. You can have them all wait on a CountDownLatch which you release from the test thread before the output sampler creation starts allowing for all the threads to be ready to go (as done in https://github.com/apache/beam/blob/679d30256c6bd64d9760702c667d7d355e70166b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java#L94).
   
   At the same time you should continuously call handleDataSampleRequest until all the threads state has transitioned to TERMINATED (note that checking isAlive() can expose you to a race since start() doesn't mean that the thread is alive, just that the thread is scheduled to become alive at some point in the future). You can also ensure it is alive if you use another CountDownLatch to block the test thread from advancing to check if the sampler creating threads are alive as well. There are also futures as well.
   
   



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws Exception {
     assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"), encodeString("a2")));
     assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"), encodeString("c2")));
   }
+
+  /**
+   * Test that samples can be taken from the DataSampler while adding new OutputSamplers. This fails
+   * with a ConcurrentModificationException if there is a bug.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentNewSampler() throws Exception {
+    DataSampler sampler = new DataSampler();
+    VarIntCoder coder = VarIntCoder.of();
+
+    // Create a thread that constantly creates new samplers.
+    Thread sampleThread =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 1000000; i++) {
+                sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+                // This sleep is here to allow for the test to stop this thread.
+                try {
+                  Thread.sleep(0);
+                } catch (InterruptedException e) {
+                  return;
+                }

Review Comment:
   This and the interrupt below are unnecessary. Your test should be able to join all the sampler creating threads.
   ```suggestion
   ```



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
     // The first 10 are always sampled, but with maxSamples = 5, the first ten are downsampled to
     // 4..9 inclusive. Then,
     // the 20th element is sampled (19) and every 20 after.
-    List<byte[]> expected = new ArrayList<>();
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(19));
     expected.add(encodeInt(39));
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
 
-    List<byte[]> samples = outputSampler.samples();
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
+
+  /**
+   * Test that sampling a PCollection while retrieving samples from multiple threads is ok.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSamples() throws Exception {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 1);
+
+    // Iteration count was empirically chosen to have a high probability of failure without the
+    // test going for too long.
+    Thread sampleThreadA =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 10000000; i++) {

Review Comment:
   If you have this thread produce elements between 1 and 100000 and the other thread produce elements between 100000 and 200000 then you can validate below that when you get the samples that the numbers in series A and in series B are always greater than the largest seen from the previous sample. The series could be negative numbers and positive numbers or even and odd, just something that allows you to know which thread it came from so you can perform the validation on it.



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws Exception {
     assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"), encodeString("a2")));
     assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"), encodeString("c2")));
   }
+
+  /**
+   * Test that samples can be taken from the DataSampler while adding new OutputSamplers. This fails
+   * with a ConcurrentModificationException if there is a bug.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentNewSampler() throws Exception {
+    DataSampler sampler = new DataSampler();
+    VarIntCoder coder = VarIntCoder.of();
+
+    // Create a thread that constantly creates new samplers.
+    Thread sampleThread =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 1000000; i++) {
+                sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+                // This sleep is here to allow for the test to stop this thread.
+                try {
+                  Thread.sleep(0);
+                } catch (InterruptedException e) {
+                  return;
+                }
+              }
+            });
+
+    sampleThread.start();
+
+    for (int i = 0; i < 20; i++) {
+      sampler.handleDataSampleRequest(
+          BeamFnApi.InstructionRequest.newBuilder()
+              .setSampleData(BeamFnApi.SampleDataRequest.newBuilder())
+              .build());
+    }
+
+    sampleThread.interrupt();

Review Comment:
   I don't think you need to interrupt since the thread will finish by itself. You should just invoke join below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1103381141


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java:
##########
@@ -333,12 +369,14 @@ public void accept(WindowedValue<T> input) throws Exception {
     private final BundleCounter elementCountCounter;
     private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
     private final Coder<T> coder;
+    private @Nullable OutputSampler<T> outputSampler = null;

Review Comment:
   ```suggestion
       private final @Nullable OutputSampler<T> outputSampler;
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(OutputSampler.class);
+
+  // Temporarily holds elements until the SDK receives a sample data request.
+  private final List<T> buffer;
+
+  // Maximum number of elements in buffer.
+  private final int maxElements;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // Total number of samples taken.
+  private final AtomicLong numSamples = new AtomicLong();
+
+  // Index into the buffer of where to overwrite samples.
+  private int resampleIndex = 0;
+
+  private final Coder<T> coder;
+
+  public OutputSampler(Coder<T> coder, int maxElements, int sampleEveryN) {
+    this.coder = coder;
+    this.maxElements = maxElements;
+    this.sampleEveryN = sampleEveryN;
+    this.buffer = new ArrayList<>(this.maxElements);
+  }
+
+  /**
+   * Samples every 1000th element or if it is part of the first 10 in the (local) PCollection.
+   *
+   * @param element the element to sample.
+   */
+  public void sample(T element) {
+    // Only sample the first 10 elements then after every `sampleEveryN`th element.
+    long samples = numSamples.get() + 1;
+
+    // This has eventual consistency. If there are many threads lazy setting, this will be set to
+    // the slowest thread accessing the atomic. But over time, it will still increase. This is ok
+    // because this is a debugging feature and doesn't need strict atomics.
+    numSamples.lazySet(samples);
+    if (samples > 10 && samples % sampleEveryN != 0) {
+      return;
+    }
+
+    synchronized (this) {
+      // Fill buffer until maxElements.
+      if (buffer.size() < maxElements) {
+        buffer.add(element);
+      } else {
+        // Then rewrite sampled elements as a circular buffer.
+        buffer.set(resampleIndex, element);
+        resampleIndex = (resampleIndex + 1) % maxElements;
+      }
+    }
+  }
+
+  /**
+   * Clears samples at end of call. This is to help mitigate memory use.
+   *
+   * @return samples taken since last call.
+   */
+  public List<byte[]> samples() {
+    List<byte[]> ret = new ArrayList<>();
+
+    // Serializing can take a lot of CPU time for larger or complex elements. Copy the array here
+    // so as to not slow down the main processing hot path.
+    List<T> copiedBuffer;
+    synchronized (this) {
+      copiedBuffer = new ArrayList<>(buffer);
+      clear();
+    }
+
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    for (T el : copiedBuffer) {
+      try {
+        // This is deprecated, but until this is fully removed, this specifically needs the nested
+        // context. This is because the SDK will need to decode the sampled elements with the
+        // ToStringFn.
+        coder.encode(el, stream, Coder.Context.NESTED);
+        ret.add(stream.toByteArray());
+      } catch (Exception exception) {
+        LOG.warn("Could not encode element \"" + el + "\" to bytes: " + exception);
+      } finally {
+        stream.reset();
+      }
+    }
+
+    return ret;
+  }
+
+  private void clear() {
+    buffer.clear();
+    resampleIndex = 0;
+  }

Review Comment:
   Its easier this way since its much easier to reason about locking this way and now you don't have to remember that calling clear() requires holding the lock already.
   
   ```suggestion
         buffer.clear();
         resampleIndex = 0;
       }
   
       ByteArrayOutputStream stream = new ByteArrayOutputStream();
       for (T el : copiedBuffer) {
         try {
           // This is deprecated, but until this is fully removed, this specifically needs the nested
           // context. This is because the SDK will need to decode the sampled elements with the
           // ToStringFn.
           coder.encode(el, stream, Coder.Context.NESTED);
           ret.add(stream.toByteArray());
         } catch (Exception exception) {
           LOG.warn("Could not encode element \"" + el + "\" to bytes: " + exception);
         } finally {
           stream.reset();
         }
       }
   
       return ret;
     }
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java:
##########
@@ -250,12 +277,14 @@ public FnDataReceiver<WindowedValue<?>> getMultiplexingConsumer(String pCollecti
     private final Coder<T> coder;
     private final MetricsContainer metricsContainer;
     private final MetricsEnvironmentState metricsEnvironmentState;
+    private OutputSampler<T> outputSampler;

Review Comment:
   ```suggestion
       private final @Nullable OutputSampler<T> outputSampler;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1102423922


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {

Review Comment:
   Gotcha, that makes sense. I changed to the suggested implementation and ran the benchmark:
   
   **Benchmark with this PR (data sampling not enabled)**
   Benchmark                                Mode  Cnt     Score     Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1640.020 ± 236.240  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 3,5,main]) completed. Took 5 mins 9.573 secs.
   
   **Benchmark with this PR (data sampling enabled)**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1094.943 ± 99.748  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 4,5,main]) completed. Took 5 mins 9.603 secs.
   
   **Benchmark at upstream/master**
   Benchmark                                Mode  Cnt     Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle  thrpt   15  1808.108 ± 30.062  ops/s
   :sdks:java:harness:jmh:jmh (Thread[Execution worker Thread 5,5,main]) completed. Took 5 mins 9.064 secs.
   
   
   Clearly, there's some environmental factors affecting the benchmarking. But, I think it can at least be said this PR with data sampling disabled won't completely hinder performance.
   



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only

Review Comment:
   Yeah, those are really good points and I'm surprised I didn't implement it that way given that is the implementation in the runner. I changed it to be a shared class only between PCollections. I don't like the sampledatarequest to get samples for a specific PBD id, so I'll remove that in a PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1102048741


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only

Review Comment:
   I see what you mean by using a DataSampler for each bundle but do you really want to sample for each bundle independently N elements and then 1 element for each 1000 since this would be an issue for streaming since streaming has 100s of work items executing in parallel with pretty small bundles so if your sampling at the bundle level your likely going to over sample by a lot.
   
   I was thinking that you would have a single object per PCollection effectively that stored all the samples so you wouldn't have to worry about memory growth beyond the fixed size of the N objects per PCollection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1106205495


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
+   */
+  public DataSampler() {
+    this.maxSamples = 10;
+    this.sampleEveryN = 1000;
+  }
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private final int maxSamples;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // The fully-qualified type is: Map[PCollectionId, OutputSampler]. In order to sample
+  // on a PCollection-basis and not per-bundle, this keeps track of shared samples between states.
+  private final Map<String, OutputSampler<?>> outputSamplers = new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @param <T> The type of element contained in the PCollection.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(

Review Comment:
   nit: consider using computeIfAbsent which will only invoke the passed in function to return a new OutputSampler if there isn't one already.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -248,6 +253,15 @@ public static void main(
 
       FinalizeBundleHandler finalizeBundleHandler = new FinalizeBundleHandler(executorService);
 
+      // Create the sampler, if the experiment is enabled.
+      Optional<List<String>> experimentList =
+          Optional.ofNullable(options.as(ExperimentalOptions.class).getExperiments());
+      boolean shouldSample =
+          experimentList.isPresent()
+              && experimentList.get().contains(ENABLE_DATA_SAMPLING_EXPERIMENT);

Review Comment:
   ```suggestion
         boolean shouldSample = ExperimentalOptions.hasExperiment(options, ENABLE_DATA_SAMPLING_EXPERIMENT);
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
+   */
+  public DataSampler() {
+    this.maxSamples = 10;
+    this.sampleEveryN = 1000;

Review Comment:
   ```suggestion
       this(10, 1000);
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(OutputSampler.class);
+
+  // Temporarily holds elements until the SDK receives a sample data request.
+  private final List<T> buffer;
+
+  // Maximum number of elements in buffer.
+  private final int maxElements;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // Total number of samples taken.
+  private final AtomicLong numSamples = new AtomicLong();
+
+  // Index into the buffer of where to overwrite samples.
+  private int resampleIndex = 0;
+
+  private final Coder<T> coder;
+
+  public OutputSampler(Coder<T> coder, int maxElements, int sampleEveryN) {
+    this.coder = coder;
+    this.maxElements = maxElements;
+    this.sampleEveryN = sampleEveryN;
+    this.buffer = new ArrayList<>(this.maxElements);
+  }
+
+  /**
+   * Samples every 1000th element or if it is part of the first 10 in the (local) PCollection.

Review Comment:
   ```suggestion
      * Samples every {@code sampleEveryN}th element or if it is part of the first 10 in the (local) PCollection.
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
+   */
+  public DataSampler() {
+    this.maxSamples = 10;
+    this.sampleEveryN = 1000;
+  }
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private final int maxSamples;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // The fully-qualified type is: Map[PCollectionId, OutputSampler]. In order to sample
+  // on a PCollection-basis and not per-bundle, this keeps track of shared samples between states.
+  private final Map<String, OutputSampler<?>> outputSamplers = new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @param <T> The type of element contained in the PCollection.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(
+        pcollectionId, new OutputSampler<>(coder, this.maxSamples, this.sampleEveryN));
+    return (OutputSampler<T>) outputSamplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(

Review Comment:
   Lets make it a little simpler by allowing for only one thread to request samples at a time instead of several.
   ```suggestion
     public synchronized BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(OutputSampler.class);
+
+  // Temporarily holds elements until the SDK receives a sample data request.
+  private final List<T> buffer;
+
+  // Maximum number of elements in buffer.
+  private final int maxElements;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // Total number of samples taken.
+  private final AtomicLong numSamples = new AtomicLong();
+
+  // Index into the buffer of where to overwrite samples.
+  private int resampleIndex = 0;
+
+  private final Coder<T> coder;
+
+  public OutputSampler(Coder<T> coder, int maxElements, int sampleEveryN) {
+    this.coder = coder;
+    this.maxElements = maxElements;
+    this.sampleEveryN = sampleEveryN;
+    this.buffer = new ArrayList<>(this.maxElements);
+  }
+
+  /**
+   * Samples every 1000th element or if it is part of the first 10 in the (local) PCollection.
+   *

Review Comment:
   ```suggestion
      *
      * <p>This method is invoked in parallel by multiple bundle processing threads and in parallel to any {@link #samples} being returned to a thread handling a sample request.
      *
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(OutputSampler.class);
+
+  // Temporarily holds elements until the SDK receives a sample data request.
+  private final List<T> buffer;
+
+  // Maximum number of elements in buffer.
+  private final int maxElements;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // Total number of samples taken.
+  private final AtomicLong numSamples = new AtomicLong();
+
+  // Index into the buffer of where to overwrite samples.
+  private int resampleIndex = 0;
+
+  private final Coder<T> coder;
+
+  public OutputSampler(Coder<T> coder, int maxElements, int sampleEveryN) {
+    this.coder = coder;
+    this.maxElements = maxElements;
+    this.sampleEveryN = sampleEveryN;
+    this.buffer = new ArrayList<>(this.maxElements);
+  }
+
+  /**
+   * Samples every 1000th element or if it is part of the first 10 in the (local) PCollection.
+   *
+   * @param element the element to sample.
+   */
+  public void sample(T element) {
+    // Only sample the first 10 elements then after every `sampleEveryN`th element.
+    long samples = numSamples.get() + 1;
+
+    // This has eventual consistency. If there are many threads lazy setting, this will be set to
+    // the slowest thread accessing the atomic. But over time, it will still increase. This is ok
+    // because this is a debugging feature and doesn't need strict atomics.
+    numSamples.lazySet(samples);
+    if (samples > 10 && samples % sampleEveryN != 0) {
+      return;
+    }
+
+    synchronized (this) {
+      // Fill buffer until maxElements.
+      if (buffer.size() < maxElements) {
+        buffer.add(element);
+      } else {
+        // Then rewrite sampled elements as a circular buffer.
+        buffer.set(resampleIndex, element);
+        resampleIndex = (resampleIndex + 1) % maxElements;
+      }
+    }
+  }
+
+  /**
+   * Clears samples at end of call. This is to help mitigate memory use.
+   *

Review Comment:
   ```suggestion
      *
      * <p>This method is invoked by a thread handling a data sampling request in parallel to any calls to {@link #sample}.
      *
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java:
##########
@@ -349,6 +386,10 @@ public void accept(WindowedValue<T> input) throws Exception {
       // when we have window optimization.
       this.sampledByteSizeDistribution.tryUpdate(input.getValue(), coder);
 
+      if (outputSampler != null) {
+        outputSampler.sample(input.getValue());

Review Comment:
   Don't you want to sample the value with the windowing information attached?
   
   The runner would be responsible for pulling out the attributes that are being sampled that can be introspected and/or saved.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(OutputSampler.class);
+
+  // Temporarily holds elements until the SDK receives a sample data request.
+  private final List<T> buffer;
+
+  // Maximum number of elements in buffer.
+  private final int maxElements;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // Total number of samples taken.
+  private final AtomicLong numSamples = new AtomicLong();
+
+  // Index into the buffer of where to overwrite samples.
+  private int resampleIndex = 0;
+
+  private final Coder<T> coder;
+
+  public OutputSampler(Coder<T> coder, int maxElements, int sampleEveryN) {
+    this.coder = coder;
+    this.maxElements = maxElements;
+    this.sampleEveryN = sampleEveryN;
+    this.buffer = new ArrayList<>(this.maxElements);
+  }
+
+  /**
+   * Samples every 1000th element or if it is part of the first 10 in the (local) PCollection.
+   *
+   * @param element the element to sample.
+   */
+  public void sample(T element) {
+    // Only sample the first 10 elements then after every `sampleEveryN`th element.
+    long samples = numSamples.get() + 1;
+
+    // This has eventual consistency. If there are many threads lazy setting, this will be set to
+    // the slowest thread accessing the atomic. But over time, it will still increase. This is ok
+    // because this is a debugging feature and doesn't need strict atomics.
+    numSamples.lazySet(samples);
+    if (samples > 10 && samples % sampleEveryN != 0) {
+      return;
+    }
+
+    synchronized (this) {
+      // Fill buffer until maxElements.
+      if (buffer.size() < maxElements) {
+        buffer.add(element);
+      } else {
+        // Then rewrite sampled elements as a circular buffer.
+        buffer.set(resampleIndex, element);
+        resampleIndex = (resampleIndex + 1) % maxElements;
+      }
+    }
+  }
+
+  /**
+   * Clears samples at end of call. This is to help mitigate memory use.
+   *
+   * @return samples taken since last call.
+   */
+  public List<byte[]> samples() {
+    List<byte[]> ret = new ArrayList<>();
+
+    // Serializing can take a lot of CPU time for larger or complex elements. Copy the array here
+    // so as to not slow down the main processing hot path.
+    List<T> copiedBuffer;
+    synchronized (this) {
+      copiedBuffer = new ArrayList<>(buffer);
+      buffer.clear();
+      resampleIndex = 0;
+    }
+
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();

Review Comment:
   Use ByteStringOutputStream (https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java) and `toByteStringAndReset()`



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -89,7 +92,9 @@ public class FnHarness {
   private static final String STATUS_API_SERVICE_DESCRIPTOR = "STATUS_API_SERVICE_DESCRIPTOR";
   private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
   private static final String RUNNER_CAPABILITIES = "RUNNER_CAPABILITIES";
+  private static final String ENABLE_DATA_SAMPLING_EXPERIMENT = "enable_data_sampling";
   private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);
+  private static final DataSampler dataSampler = new DataSampler();

Review Comment:
   I don't see the value of making this static here, just define it as a local variable within the method instead similarly to how we define the other things like FinalizeBundleHandler.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
+   */
+  public DataSampler() {
+    this.maxSamples = 10;
+    this.sampleEveryN = 1000;
+  }
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {

Review Comment:
   nit: validate arguments passed in



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataSamplerTest {
+  byte[] encodeInt(Integer i) throws IOException {
+    VarIntCoder coder = VarIntCoder.of();
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    coder.encode(i, stream);

Review Comment:
   You need to specify the encoding context to be nested.
   
   This test passes since varint outer == nested encodings. You might want to have an explicit test that covers using something like `byte[]` which has a different encoding depending on context.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
+   */
+  public DataSampler() {
+    this.maxSamples = 10;
+    this.sampleEveryN = 1000;
+  }
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private final int maxSamples;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // The fully-qualified type is: Map[PCollectionId, OutputSampler]. In order to sample
+  // on a PCollection-basis and not per-bundle, this keeps track of shared samples between states.
+  private final Map<String, OutputSampler<?>> outputSamplers = new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @param <T> The type of element contained in the PCollection.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(
+        pcollectionId, new OutputSampler<>(coder, this.maxSamples, this.sampleEveryN));
+    return (OutputSampler<T>) outputSamplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSampleData();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(sampleDataRequest.getPcollectionIdsList());
+
+    BeamFnApi.SampleDataResponse.Builder response = BeamFnApi.SampleDataResponse.newBuilder();
+    for (String pcollectionId : responseSamples.keySet()) {
+      ElementList.Builder elementList = ElementList.newBuilder();
+      for (byte[] sample : responseSamples.get(pcollectionId)) {
+        elementList.addElements(
+            SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+      }
+      response.putElementSamples(pcollectionId, elementList.build());
+    }
+
+    return BeamFnApi.InstructionResponse.newBuilder().setSampleData(response);
+  }
+
+  /**
+   * Returns a map from PCollection to its samples. Samples are filtered on
+   * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+   *
+   * @param pcollections Filters all PCollections on this set. If empty, allows all PCollections.
+   * @return a map from PCollection to its samples.
+   */
+  private Map<String, List<byte[]>> samplesFor(List<String> pcollections) {

Review Comment:
   You could inline this method above which would make it clear that we only cover this logic when synchronized



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a maximum of 10 in memory.
+   */
+  public DataSampler() {
+    this.maxSamples = 10;
+    this.sampleEveryN = 1000;
+  }
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private final int maxSamples;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // The fully-qualified type is: Map[PCollectionId, OutputSampler]. In order to sample
+  // on a PCollection-basis and not per-bundle, this keeps track of shared samples between states.
+  private final Map<String, OutputSampler<?>> outputSamplers = new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *

Review Comment:
   ```suggestion
      *
      * <p>Invoked by multiple bundle processing threads in parallel when a new bundle processor is being instantiated.
      *
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1114770702


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -104,19 +104,22 @@ public List<BeamFnApi.SampledElement> samples() throws IOException {
 
     // Serializing can take a lot of CPU time for larger or complex elements. Copy the array here
     // so as to not slow down the main processing hot path.
-    List<T> copiedBuffer;
+    List<T> bufferToSend;
+    int sampleIndex = 0;
     synchronized (this) {
-      copiedBuffer = new ArrayList<>(buffer);
-      buffer.clear();
+      bufferToSend = buffer;
+      sampleIndex = resampleIndex;
+      buffer = new ArrayList<>(maxElements);
       resampleIndex = 0;
     }
 
     ByteStringOutputStream stream = new ByteStringOutputStream();
-    for (T el : copiedBuffer) {
+    for (int i = 0; i < bufferToSend.size(); i++) {
+      int index = (sampleIndex + i) % bufferToSend.size();

Review Comment:
   True, but this did make testing it easier. Without this, the elements are read out of order because old samples are overwritten in the array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on PR #25354:
URL: https://github.com/apache/beam/pull/25354#issuecomment-1439293677

   Run Java_PVR_Flink_Docker PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1100893265


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only
+  // synchronization
+  // is needed on the outermost map.
+  private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param processBundleDescriptorId The PBD to sample from.
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   * @param <T> The type of element contained in the PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(
+      String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+    Map<String, OutputSampler<?>> samplers = outputSamplers.get(processBundleDescriptorId);
+    samplers.putIfAbsent(
+        pcollectionId, new OutputSampler<T>(coder, this.maxSamples, this.sampleEveryN));
+
+    return (OutputSampler<T>) samplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(
+            ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
+            ImmutableSet.copyOf(sampleDataRequest.getPcollectionIdsList()));
+
+    BeamFnApi.SampleDataResponse.Builder response = BeamFnApi.SampleDataResponse.newBuilder();
+    for (String pcollectionId : responseSamples.keySet()) {
+      ElementList.Builder elementList = ElementList.newBuilder();
+      for (byte[] sample : responseSamples.get(pcollectionId)) {
+        elementList.addElements(
+            SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+      }
+      response.putElementSamples(pcollectionId, elementList.build());
+    }
+
+    return BeamFnApi.InstructionResponse.newBuilder().setSample(response);
+  }
+
+  /**
+   * Returns a map from PCollection to its samples. Samples are filtered on
+   * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+   *
+   * @param descriptors PCollections under each PBD id will be unioned. If empty, allows all
+   *     descriptors.
+   * @param pcollections Filters all PCollections on this set. If empty, allows all PCollections.
+   * @return a map from PCollection to its samples.
+   */
+  public Map<String, List<byte[]>> samplesFor(Set<String> descriptors, Set<String> pcollections) {

Review Comment:
   Done



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only

Review Comment:
   I think I do want the ProcessBundleIds, right? I want to have the least amount of synchronization and if each ProcessBundleId has its own sampler per PCollection, this should be correct?



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only
+  // synchronization
+  // is needed on the outermost map.
+  private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param processBundleDescriptorId The PBD to sample from.
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   * @param <T> The type of element contained in the PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(
+      String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+    Map<String, OutputSampler<?>> samplers = outputSamplers.get(processBundleDescriptorId);
+    samplers.putIfAbsent(
+        pcollectionId, new OutputSampler<T>(coder, this.maxSamples, this.sampleEveryN));
+
+    return (OutputSampler<T>) samplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(
+            ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),

Review Comment:
   Change parameter type to take a list



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only
+  // synchronization
+  // is needed on the outermost map.
+  private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param processBundleDescriptorId The PBD to sample from.
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   * @param <T> The type of element contained in the PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(
+      String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+    Map<String, OutputSampler<?>> samplers = outputSamplers.get(processBundleDescriptorId);
+    samplers.putIfAbsent(
+        pcollectionId, new OutputSampler<T>(coder, this.maxSamples, this.sampleEveryN));
+
+    return (OutputSampler<T>) samplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(
+            ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
+            ImmutableSet.copyOf(sampleDataRequest.getPcollectionIdsList()));
+
+    BeamFnApi.SampleDataResponse.Builder response = BeamFnApi.SampleDataResponse.newBuilder();
+    for (String pcollectionId : responseSamples.keySet()) {
+      ElementList.Builder elementList = ElementList.newBuilder();
+      for (byte[] sample : responseSamples.get(pcollectionId)) {
+        elementList.addElements(
+            SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+      }
+      response.putElementSamples(pcollectionId, elementList.build());
+    }
+
+    return BeamFnApi.InstructionResponse.newBuilder().setSample(response);
+  }
+
+  /**
+   * Returns a map from PCollection to its samples. Samples are filtered on
+   * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+   *
+   * @param descriptors PCollections under each PBD id will be unioned. If empty, allows all
+   *     descriptors.
+   * @param pcollections Filters all PCollections on this set. If empty, allows all PCollections.
+   * @return a map from PCollection to its samples.
+   */
+  public Map<String, List<byte[]>> samplesFor(Set<String> descriptors, Set<String> pcollections) {
+    Map<String, List<byte[]>> samples = new HashMap<>();
+
+    // Safe to iterate as the ConcurrentHashMap will return each element at most once and will not
+    // throw
+    // ConcurrentModificationException.
+    outputSamplers.forEach(
+        (descriptorId, samplers) -> {
+          if (!descriptors.isEmpty() && !descriptors.contains(descriptorId)) {
+            return;
+          }
+
+          samplers.forEach(
+              (pcollectionId, outputSampler) -> {
+                if (!pcollections.isEmpty() && !pcollections.contains(pcollectionId)) {
+                  return;
+                }
+
+                samples.putIfAbsent(pcollectionId, new ArrayList<>());
+                samples.get(pcollectionId).addAll(outputSampler.samples());
+              });
+        });
+
+    return samples;
+  }
+
+  /** @return samples from all PBDs and all PCollections. */
+  public Map<String, List<byte[]>> allSamples() {
+    return samplesFor(ImmutableSet.of(), ImmutableSet.of());
+  }
+
+  /**
+   * @param descriptors PBDs to filter on.
+   * @return samples only from the given descriptors.
+   */
+  public Map<String, List<byte[]>> samplesForDescriptors(Set<String> descriptors) {
+    return samplesFor(descriptors, ImmutableSet.of());
+  }
+
+  /**
+   * @param pcollections PCollection ids to filter on.
+   * @return samples only from the given PCollections.
+   */
+  public Map<String, List<byte[]>> samplesForPCollections(Set<String> pcollections) {
+    return samplesFor(ImmutableSet.of(), pcollections);
+  }

Review Comment:
   Done



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private final Coder<T> coder;
+  private final List<T> buffer = new ArrayList<>();
+  private static final Logger LOG = LoggerFactory.getLogger(OutputSampler.class);
+
+  // Maximum number of elements in buffer.
+  private int maxElements = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;

Review Comment:
   Yes, removed the default constructor



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {

Review Comment:
   The intent is to allow the SDK to define how it wants to implement sampling. This simplifies the protocol implementation. The runner PBD modification suggestion implies that an SDK needs to understand a new well-known transform in order to also implement this sampling protocol. Not impossible, but given that the original requirements for sampling was to sample everything given the pipeline option, I thought this would add complexity.
   
   I modified the graph in this way b/c this was the simplest way to add future-proofed data sampling. This cuts down on the amount of code changes now for every receiver type and for the future. I hadn't thought about this forcing to use the multi-consumer variant.
   
   Would an alternate suggestion be to create a new ExecutionState implementation (or ExecutionStateImpl subclass) that also adds DataSampling? That way the implementation can switch when it sees that data sampling is enabled.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private final Coder<T> coder;
+  private final List<T> buffer = new ArrayList<>();
+  private static final Logger LOG = LoggerFactory.getLogger(OutputSampler.class);
+
+  // Maximum number of elements in buffer.
+  private int maxElements = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // Total number of samples taken.
+  private long numSamples = 0;
+
+  // Index into the buffer of where to overwrite samples.
+  private int resampleIndex = 0;
+
+  public OutputSampler(Coder<T> coder) {
+    this.coder = coder;
+  }
+
+  public OutputSampler(Coder<T> coder, int maxElements, int sampleEveryN) {
+    this(coder);
+    this.maxElements = maxElements;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  /**
+   * Samples every 1000th element or if it is part of the first 10 in the (local) PCollection.
+   *
+   * @param element the element to sample.
+   */
+  public void sample(T element) {
+    // Only sample the first 10 elements then after every `sampleEveryN`th element.
+    numSamples += 1;
+    if (numSamples > 10 && numSamples % sampleEveryN != 0) {
+      return;
+    }
+
+    // Fill buffer until maxElements.
+    if (buffer.size() < maxElements) {
+      buffer.add(element);
+    } else {
+      // Then rewrite sampled elements as a circular buffer.
+      buffer.set(resampleIndex, element);
+      resampleIndex = (resampleIndex + 1) % maxElements;
+    }
+  }
+
+  /**
+   * Clears samples at end of call. This is to help mitigate memory use.
+   *
+   * @return samples taken since last call.
+   */
+  public List<byte[]> samples() {

Review Comment:
   Yikes, thank you for catching this. I added synchronization between the interface methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1099191235


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java:
##########
@@ -166,6 +157,8 @@ public class ProcessBundleHandler {
   @VisibleForTesting final BundleProcessorCache bundleProcessorCache;
   private final Set<String> runnerCapabilities;
 
+  private DataSampler dataSampler;

Review Comment:
   ```suggestion
     private final DataSampler dataSampler;
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;

Review Comment:
   ```suggestion
     private final int maxSamples = 10;
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only
+  // synchronization
+  // is needed on the outermost map.
+  private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param processBundleDescriptorId The PBD to sample from.
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   * @param <T> The type of element contained in the PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(
+      String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+    Map<String, OutputSampler<?>> samplers = outputSamplers.get(processBundleDescriptorId);
+    samplers.putIfAbsent(
+        pcollectionId, new OutputSampler<T>(coder, this.maxSamples, this.sampleEveryN));
+
+    return (OutputSampler<T>) samplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(
+            ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),

Review Comment:
   why copy?



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only

Review Comment:
   This is not true, ProcessBundleDescriptors can be re-used across multiple process bundle requests. We see this regularly on Dataflow streaming pipelines and under some circumstances on batch pipelines.
   
   This is true for ProcessBundleIds but I don't think that is what you want.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;

Review Comment:
   ```suggestion
     private final int sampleEveryN = 1000;
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private final Coder<T> coder;
+  private final List<T> buffer = new ArrayList<>();
+  private static final Logger LOG = LoggerFactory.getLogger(OutputSampler.class);
+
+  // Maximum number of elements in buffer.
+  private int maxElements = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // Total number of samples taken.
+  private long numSamples = 0;
+
+  // Index into the buffer of where to overwrite samples.
+  private int resampleIndex = 0;
+
+  public OutputSampler(Coder<T> coder) {
+    this.coder = coder;
+  }
+
+  public OutputSampler(Coder<T> coder, int maxElements, int sampleEveryN) {
+    this(coder);
+    this.maxElements = maxElements;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  /**
+   * Samples every 1000th element or if it is part of the first 10 in the (local) PCollection.
+   *
+   * @param element the element to sample.
+   */
+  public void sample(T element) {
+    // Only sample the first 10 elements then after every `sampleEveryN`th element.
+    numSamples += 1;
+    if (numSamples > 10 && numSamples % sampleEveryN != 0) {
+      return;
+    }
+
+    // Fill buffer until maxElements.
+    if (buffer.size() < maxElements) {
+      buffer.add(element);
+    } else {
+      // Then rewrite sampled elements as a circular buffer.
+      buffer.set(resampleIndex, element);
+      resampleIndex = (resampleIndex + 1) % maxElements;
+    }
+  }
+
+  /**
+   * Clears samples at end of call. This is to help mitigate memory use.
+   *
+   * @return samples taken since last call.
+   */
+  public List<byte[]> samples() {

Review Comment:
   note that sample() and samples() will be invoked by two different threads (say T1 and T2).
   
   sample() is invoked a lot and samples() is invoked rarely. Currently there is no synchronization between T1 and T2 which means that the changes from T1 do not have to be made visible to the T2 and vice versa. It will be dependent on the CPU architecture and the contents of the registers/L* caches.
   
   Note that clear() by T2 in the current implementation is unlikely to be seen by T1 and will be overwritten whenever T1 adds a value.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {

Review Comment:
   Why did you want to insert this as a separate transform in the graph instead of modifying https://github.com/apache/beam/blob/d20d0b01c3c6bcde551420f36e13d794c930f1e2/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java#L174 to support passing elements to the OutputSampler or being an OutputSampler itself?
   
   Also typically we would ask the runner to do these PBD modifications itself so it could selectively choose which PCollections to sample and which not to. This would also remove the need for the SDK to check for an experiment, allow the runner to choose which to sample without needing to pass in PCollection ids (the PBD ids would be enough). There is a cost though since transforms impose msec and state transition book keeping overhead which is not insignificant and you'll make the current single consumer hot-path in PCollectionConsumerRegistry always use the multi-consumer variant which has additional overhead.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private final Coder<T> coder;
+  private final List<T> buffer = new ArrayList<>();

Review Comment:
   setup the capacity of the buffer so we don't pay and resizing costs
   ```suggestion
     private final List<T> buffer = new ArrayList<>(maxElements);
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only
+  // synchronization
+  // is needed on the outermost map.
+  private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param processBundleDescriptorId The PBD to sample from.
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   * @param <T> The type of element contained in the PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(
+      String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+    Map<String, OutputSampler<?>> samplers = outputSamplers.get(processBundleDescriptorId);
+    samplers.putIfAbsent(
+        pcollectionId, new OutputSampler<T>(coder, this.maxSamples, this.sampleEveryN));
+
+    return (OutputSampler<T>) samplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(
+            ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
+            ImmutableSet.copyOf(sampleDataRequest.getPcollectionIdsList()));
+
+    BeamFnApi.SampleDataResponse.Builder response = BeamFnApi.SampleDataResponse.newBuilder();
+    for (String pcollectionId : responseSamples.keySet()) {
+      ElementList.Builder elementList = ElementList.newBuilder();
+      for (byte[] sample : responseSamples.get(pcollectionId)) {
+        elementList.addElements(
+            SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+      }
+      response.putElementSamples(pcollectionId, elementList.build());
+    }
+
+    return BeamFnApi.InstructionResponse.newBuilder().setSample(response);
+  }
+
+  /**
+   * Returns a map from PCollection to its samples. Samples are filtered on
+   * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+   *
+   * @param descriptors PCollections under each PBD id will be unioned. If empty, allows all
+   *     descriptors.
+   * @param pcollections Filters all PCollections on this set. If empty, allows all PCollections.
+   * @return a map from PCollection to its samples.
+   */
+  public Map<String, List<byte[]>> samplesFor(Set<String> descriptors, Set<String> pcollections) {
+    Map<String, List<byte[]>> samples = new HashMap<>();
+
+    // Safe to iterate as the ConcurrentHashMap will return each element at most once and will not
+    // throw
+    // ConcurrentModificationException.
+    outputSamplers.forEach(
+        (descriptorId, samplers) -> {
+          if (!descriptors.isEmpty() && !descriptors.contains(descriptorId)) {
+            return;
+          }
+
+          samplers.forEach(
+              (pcollectionId, outputSampler) -> {
+                if (!pcollections.isEmpty() && !pcollections.contains(pcollectionId)) {
+                  return;
+                }
+
+                samples.putIfAbsent(pcollectionId, new ArrayList<>());
+                samples.get(pcollectionId).addAll(outputSampler.samples());
+              });
+        });
+
+    return samples;
+  }
+
+  /** @return samples from all PBDs and all PCollections. */
+  public Map<String, List<byte[]>> allSamples() {
+    return samplesFor(ImmutableSet.of(), ImmutableSet.of());
+  }
+
+  /**
+   * @param descriptors PBDs to filter on.
+   * @return samples only from the given descriptors.
+   */
+  public Map<String, List<byte[]>> samplesForDescriptors(Set<String> descriptors) {
+    return samplesFor(descriptors, ImmutableSet.of());
+  }
+
+  /**
+   * @param pcollections PCollection ids to filter on.
+   * @return samples only from the given PCollections.
+   */
+  public Map<String, List<byte[]>> samplesForPCollections(Set<String> pcollections) {
+    return samplesFor(ImmutableSet.of(), pcollections);
+  }

Review Comment:
   There are only used during testing, move them into the test class



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only
+  // synchronization
+  // is needed on the outermost map.
+  private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param processBundleDescriptorId The PBD to sample from.
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   * @param <T> The type of element contained in the PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(
+      String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+    Map<String, OutputSampler<?>> samplers = outputSamplers.get(processBundleDescriptorId);
+    samplers.putIfAbsent(
+        pcollectionId, new OutputSampler<T>(coder, this.maxSamples, this.sampleEveryN));
+
+    return (OutputSampler<T>) samplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(
+            ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
+            ImmutableSet.copyOf(sampleDataRequest.getPcollectionIdsList()));
+
+    BeamFnApi.SampleDataResponse.Builder response = BeamFnApi.SampleDataResponse.newBuilder();
+    for (String pcollectionId : responseSamples.keySet()) {
+      ElementList.Builder elementList = ElementList.newBuilder();
+      for (byte[] sample : responseSamples.get(pcollectionId)) {
+        elementList.addElements(
+            SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+      }
+      response.putElementSamples(pcollectionId, elementList.build());
+    }
+
+    return BeamFnApi.InstructionResponse.newBuilder().setSample(response);
+  }
+
+  /**
+   * Returns a map from PCollection to its samples. Samples are filtered on
+   * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+   *
+   * @param descriptors PCollections under each PBD id will be unioned. If empty, allows all
+   *     descriptors.
+   * @param pcollections Filters all PCollections on this set. If empty, allows all PCollections.
+   * @return a map from PCollection to its samples.
+   */
+  public Map<String, List<byte[]>> samplesFor(Set<String> descriptors, Set<String> pcollections) {

Review Comment:
   don't make this public, instead have the test create and a request and return a response having the tests validate the response itself



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only
+  // synchronization
+  // is needed on the outermost map.
+  private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes when responding to a
+   * SampleDataRequest.
+   *
+   * @param processBundleDescriptorId The PBD to sample from.
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from a nested context.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   * @param <T> The type of element contained in the PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(
+      String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+    Map<String, OutputSampler<?>> samplers = outputSamplers.get(processBundleDescriptorId);
+    samplers.putIfAbsent(
+        pcollectionId, new OutputSampler<T>(coder, this.maxSamples, this.sampleEveryN));
+
+    return (OutputSampler<T>) samplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(
+            ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
+            ImmutableSet.copyOf(sampleDataRequest.getPcollectionIdsList()));
+
+    BeamFnApi.SampleDataResponse.Builder response = BeamFnApi.SampleDataResponse.newBuilder();
+    for (String pcollectionId : responseSamples.keySet()) {
+      ElementList.Builder elementList = ElementList.newBuilder();
+      for (byte[] sample : responseSamples.get(pcollectionId)) {
+        elementList.addElements(
+            SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+      }
+      response.putElementSamples(pcollectionId, elementList.build());
+    }
+
+    return BeamFnApi.InstructionResponse.newBuilder().setSample(response);
+  }
+
+  /**
+   * Returns a map from PCollection to its samples. Samples are filtered on
+   * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+   *
+   * @param descriptors PCollections under each PBD id will be unioned. If empty, allows all
+   *     descriptors.
+   * @param pcollections Filters all PCollections on this set. If empty, allows all PCollections.
+   * @return a map from PCollection to its samples.
+   */
+  public Map<String, List<byte[]>> samplesFor(Set<String> descriptors, Set<String> pcollections) {
+    Map<String, List<byte[]>> samples = new HashMap<>();
+
+    // Safe to iterate as the ConcurrentHashMap will return each element at most once and will not
+    // throw
+    // ConcurrentModificationException.
+    outputSamplers.forEach(
+        (descriptorId, samplers) -> {
+          if (!descriptors.isEmpty() && !descriptors.contains(descriptorId)) {
+            return;
+          }
+
+          samplers.forEach(
+              (pcollectionId, outputSampler) -> {
+                if (!pcollections.isEmpty() && !pcollections.contains(pcollectionId)) {
+                  return;
+                }
+
+                samples.putIfAbsent(pcollectionId, new ArrayList<>());

Review Comment:
   ```suggestion
                   samples.putIfAbsent(pcollectionId, Collections.EMPTY_LIST);
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private final Coder<T> coder;
+  private final List<T> buffer = new ArrayList<>();
+  private static final Logger LOG = LoggerFactory.getLogger(OutputSampler.class);
+
+  // Maximum number of elements in buffer.
+  private int maxElements = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;

Review Comment:
   shouldn't this be passed in by DataSampler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1102048741


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only

Review Comment:
   I see what you mean by using a DataSampler for each bundle but do you really want to sample for each bundle independently N elements and then 1 element for each 1000 since this would be an issue for streaming since streaming has 100s of work items executing in parallel with pretty small bundles so if your sampling at the bundle level your likely going to over sample by a lot and storing it all in memory at the same time.
   
   I was thinking that you would have a single object per PCollection effectively that stored all the samples so you wouldn't have to worry about memory growth beyond the fixed size of the N objects per PCollection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1102053721


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {

Review Comment:
   I was thinking that you would update `PCollectionConsumerRegistry#MetricTrackingFnDataReceiver` and `PCollectionConsumerRegistry#MultiplexingMetricTrackingFnDataReceiver` and add an `if(sampling) { doSampling }` to them. The `if` should be really cheap since the value should always be false or true so the JVMs ability to predict the branch should be very high. Alternatively you could also create a sub-class of the three receivers there to add sampling as necessary.
   
   You can validate the `if` approach by running the JMH benchmark:
   ```
   ./gradlew -info :sdks:java:harness:jmh:jmh -Pbenchmark=ProcessBundleBenchmark.testLargeBundle
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25354:
URL: https://github.com/apache/beam/pull/25354#issuecomment-1425416774

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @lukecwik for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on PR #25354:
URL: https://github.com/apache/beam/pull/25354#issuecomment-1428873966

   Rebased with master, thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1102436772


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, [PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique per thread, so only

Review Comment:
   Made https://github.com/apache/beam/pull/25421



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1102053721


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {

Review Comment:
   I was thinking that you would update PCollectionConsumerRegistry#MetricTrackingFnDataReceiver` and `PCollectionConsumerRegistry#MultiplexingMetricTrackingFnDataReceiver` and add an `if(sampling) { doSampling }` to them. The `if` should be really cheap since the value should always be false or true so the JVMs ability to predict the branch should be very high. Alternatively you could also create a sub-class of the three receivers there to add sampling as necessary.
   
   You can validate the `if` approach by running the JMH benchmark:
   ```
   ./gradlew -info :sdks:java:harness:jmh:jmh -Pbenchmark=ProcessBundleBenchmark.testLargeBundle
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1105110229


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements ProcessBundleDescriptorModifier {

Review Comment:
   good to know and yes there were some performance optimizations that I added on these code paths recently



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1113713709


##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
     // The first 10 are always sampled, but with maxSamples = 5, the first ten are downsampled to
     // 4..9 inclusive. Then,
     // the 20th element is sampled (19) and every 20 after.
-    List<byte[]> expected = new ArrayList<>();
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(19));
     expected.add(encodeInt(39));
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
 
-    List<byte[]> samples = outputSampler.samples();
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
+
+  /**
+   * Test that sampling a PCollection while retrieving samples from multiple threads is ok.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSamples() throws Exception {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 1);

Review Comment:
   Done



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
     // The first 10 are always sampled, but with maxSamples = 5, the first ten are downsampled to
     // 4..9 inclusive. Then,
     // the 20th element is sampled (19) and every 20 after.
-    List<byte[]> expected = new ArrayList<>();
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(19));
     expected.add(encodeInt(39));
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
 
-    List<byte[]> samples = outputSampler.samples();
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
+
+  /**
+   * Test that sampling a PCollection while retrieving samples from multiple threads is ok.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSamples() throws Exception {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 1);
+
+    // Iteration count was empirically chosen to have a high probability of failure without the
+    // test going for too long.
+    Thread sampleThreadA =

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on pull request #25354: PCollection data sampling for Java SDK harness #25064

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on PR #25354:
URL: https://github.com/apache/beam/pull/25354#issuecomment-1439321824

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org