You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/02/20 00:57:29 UTC

[beam] 07/13: Stop paying the iterator object creation tax in MultiplexingMetricTrackingFnDataReceiver (#25540)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3e321d159342b0edbf5911a18ed7dafc467041ca
Author: Luke Cwik <lu...@gmail.com>
AuthorDate: Fri Feb 17 16:59:05 2023 -0800

    Stop paying the iterator object creation tax in MultiplexingMetricTrackingFnDataReceiver (#25540)
    
    * Stop paying the iterator object creation tax in MultiplexingMetricTrackingFnDataReceiver
    
    This removes the overhead shown in https://user-images.githubusercontent.com/10078956/219762523-1e76e849-18b9-4c40-a513-000364baea52.png
    
    This is for #21250
---
 .../beam/fn/harness/data/PCollectionConsumerRegistry.java      | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 45298a68d98..60b25d8b137 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 
 /**
  * The {@code PCollectionConsumerRegistry} is used to maintain a collection of consuming
@@ -209,8 +208,7 @@ public class PCollectionConsumerRegistry {
             come up in the existing SDF expansion, but might be useful to support fused SDF nodes.
             This would require dedicated delivery of the split results to each of the consumers
             separately. */
-            return new MultiplexingMetricTrackingFnDataReceiver(
-                pcId, coder, ImmutableList.copyOf(consumerAndMetadatas));
+            return new MultiplexingMetricTrackingFnDataReceiver(pcId, coder, consumerAndMetadatas);
           }
         });
   }
@@ -351,8 +349,10 @@ public class PCollectionConsumerRegistry {
 
       // Use the ExecutionStateTracker and enter an appropriate state to track the
       // Process Bundle Execution time metric and also ensure user counters can get an appropriate
-      // metrics container.
-      for (ConsumerAndMetadata consumerAndMetadata : consumerAndMetadatas) {
+      // metrics container. We specifically don't use a for-each loop since it creates an iterator
+      // on a hot path.
+      for (int size = consumerAndMetadatas.size(), i = 0; i < size; ++i) {
+        ConsumerAndMetadata consumerAndMetadata = consumerAndMetadatas.get(i);
         ExecutionState state = consumerAndMetadata.getExecutionState();
         state.activate();
         try {