You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:23:14 UTC

[GitHub] [beam] lukecwik opened a new pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

lukecwik opened a new pull request #11821:
URL: https://github.com/apache/beam/pull/11821


   This does not impact non-portable Dataflow since it overrides the PTransform expansion PCollection views.
   This currently has little benefit for other runners since they still treat all views as in memory iterables of values but opens the door for them to meaningfully provide optimized versions.
   
   This builds on prior work for exposing iterable and multimap views to runners in BEAM-3419
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635613803


   Run Spark ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-634312177


   R: @mxm @tweise 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #11821: [WIP] [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-634312177


   R: @mxm @tweise 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635503892


   R: @mxm @iemejia 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440427931



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
##########
@@ -533,4 +1251,141 @@ public String toString() {
       return Collections.singletonMap(tag, pCollection);
     }
   }
+
+  /** A {@link MultimapView} to {@link Map Map<K, V>} adapter. */
+  private static class MultimapViewToMapAdapter<K, V> extends AbstractMap<K, V> {
+    private final MultimapView<K, V> primitiveViewT;
+    private final Supplier<Integer> size;
+
+    private MultimapViewToMapAdapter(MultimapView<K, V> primitiveViewT) {
+      this.primitiveViewT = primitiveViewT;
+      this.size = Suppliers.memoize(() -> Iterables.size(primitiveViewT.get()));
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+      return primitiveViewT.get((K) key).iterator().hasNext();
+    }
+
+    @Override
+    public V get(Object key) {
+      Iterator<V> iterator = primitiveViewT.get((K) key).iterator();
+      if (!iterator.hasNext()) {
+        return null;
+      }
+      V value = iterator.next();
+      if (iterator.hasNext()) {
+        throw new IllegalArgumentException("Duplicate values for " + key);
+      }
+      return value;
+    }
+
+    @Override
+    public int size() {
+      return size.get();
+    }
+
+    @Override
+    public Set<Entry<K, V>> entrySet() {
+      return new AbstractSet<Entry<K, V>>() {
+        @Override
+        public Iterator<Entry<K, V>> iterator() {
+          return FluentIterable.from(primitiveViewT.get())
+              .<Entry<K, V>>transform((K key) -> new SimpleEntry<>(key, get(key)))
+              .iterator();
+        }
+
+        @Override
+        public boolean contains(Object o) {
+          if (!(o instanceof Entry)) {
+            return false;
+          }
+          Entry<?, ?> entry = (Entry<?, ?>) o;
+          // We treat the absence of the key in the map as a difference in these abstract sets. The
+          // underlying primitive view represents missing keys as empty iterables so we use this
+          // to check if the map contains the key first before comparing values.
+          Iterable<V> value = primitiveViewT.get((K) entry.getKey());
+          if (value.iterator().hasNext()) {
+            return false;
+          }
+          return Objects.equals(entry.getValue(), value);
+        }
+
+        @Override
+        public int size() {
+          return size.get();
+        }
+      };
+    }
+  }
+
+  /** A {@link MultimapView} to {@link Map Map<K, Iterable<V>>} adapter. */
+  private static class MultimapViewToMultimapAdapter<K, V> extends AbstractMap<K, Iterable<V>> {
+    private final MultimapView<K, V> primitiveViewT;
+    private final Supplier<Integer> size;
+
+    private MultimapViewToMultimapAdapter(MultimapView<K, V> primitiveViewT) {
+      this.primitiveViewT = primitiveViewT;
+      this.size = Suppliers.memoize(() -> Iterables.size(primitiveViewT.get()));
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+      return primitiveViewT.get((K) key).iterator().hasNext();
+    }
+
+    @Override
+    public Iterable<V> get(Object key) {
+      Iterable<V> values = primitiveViewT.get((K) key);
+      // The only was for the values iterable to be empty is for us to have never seen such a key

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-641558615


   Thanks for adding me as a reviewer. I tried to go through this PR and couldn't come up with valuable comments. (in fact, it was a good learning process for me to read these code changes).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635664997


   Run Spark ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635630717


   Run Spark ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440436806



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -496,6 +622,22 @@ public void processElement(@Element T element, OutputReceiver<KV<Void, T>> r) {
     }
   }
 
+  /** An identity {@link PTransform}. */
+  private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {

Review comment:
       Not any more. It was used in a much earlier version.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440423092



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionViewsTest.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import static org.apache.beam.sdk.values.PCollectionViews.computeOverlappingRanges;
+import static org.apache.beam.sdk.values.PCollectionViews.computePositionForIndex;
+import static org.apache.beam.sdk.values.PCollectionViews.computeTotalNumElements;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PCollectionViews}. */
+@RunWith(JUnit4.class)
+public class PCollectionViewsTest {
+  @Test
+  public void testEmpty() {
+    Iterable<OffsetRange> ranges = Collections.emptyList();
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(nonOverlappingRangesToNumElementsPerPosition, Collections.emptyMap());
+    assertEquals(0, computeTotalNumElements(nonOverlappingRangesToNumElementsPerPosition));
+    assertThrows(
+        IndexOutOfBoundsException.class,
+        () -> computePositionForIndex(nonOverlappingRangesToNumElementsPerPosition, 0));
+  }
+
+  @Test
+  public void testNoOverlapping() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 2), range(4, 6));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.of(range(0, 2), 1, range(4, 6), 1),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOnlyTouchingRanges() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 4), range(4, 8), range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.of(range(0, 4), 1, range(4, 8), 1, range(8, 12), 1),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testRangesWithAtMostOneOverlap() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 6), range(4, 10), range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder()
+            .put(range(0, 4), 1)
+            .put(range(4, 6), 2)
+            .put(range(6, 8), 1)
+            .put(range(8, 10), 2)
+            .put(range(10, 12), 1)
+            .build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOverlappingFroms() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 4), range(0, 8), range(0, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 3).put(range(4, 8), 2).put(range(8, 12), 1).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOverlappingTos() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 12), range(4, 12), range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 1).put(range(4, 8), 2).put(range(8, 12), 3).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOverlappingFromsAndTos() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 4), range(0, 4), range(0, 4));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 3).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testMultipleOverlapsForTheSameRange() {
+    Iterable<OffsetRange> ranges =
+        Arrays.asList(
+            range(0, 4),
+            range(0, 8),
+            range(0, 12),
+            range(0, 12),
+            range(4, 12),
+            range(8, 12),
+            range(0, 4),
+            range(0, 8),
+            range(0, 12),
+            range(0, 12),
+            range(4, 12),
+            range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 8).put(range(4, 8), 8).put(range(8, 12), 8).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testIncreasingOverlaps() {
+    Iterable<OffsetRange> ranges =
+        Arrays.asList(range(0, 4), range(1, 5), range(2, 6), range(3, 7), range(4, 8), range(5, 9));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder()
+            .put(range(0, 1), 1)
+            .put(range(1, 2), 2)
+            .put(range(2, 3), 3)
+            .put(range(3, 4), 4)
+            .put(range(4, 5), 4)
+            .put(range(5, 6), 4)
+            .put(range(6, 7), 3)
+            .put(range(7, 8), 2)
+            .put(range(8, 9), 1)
+            .build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testNestedOverlaps() {
+    Iterable<OffsetRange> ranges =
+        Arrays.asList(range(0, 8), range(1, 7), range(2, 6), range(3, 5));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder()
+            .put(range(0, 1), 1)
+            .put(range(1, 2), 2)
+            .put(range(2, 3), 3)
+            .put(range(3, 5), 4)
+            .put(range(5, 6), 3)
+            .put(range(6, 7), 2)
+            .put(range(7, 8), 1)
+            .build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testRandomRanges() {
+    Random random = new Random(123892154890L);

Review comment:
       Nowhere, just wanted a stable random number so that if it failed it was repeatable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440422483



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -157,7 +166,10 @@ private View() {}
    * PCollectionView} mapping each window to a {@link List} containing all of the elements in the
    * window.
    *
-   * <p>Unlike with {@link #asIterable}, the resulting list is required to fit in memory.
+   * <p>This view should only be used if random access and/or size of the PCollection is required.
+   * {@link #asIterable()} will perform significantly better for sequential access.

Review comment:
       No, most runners will be able to easily support an efficient iterable representation even if they only support an efficient multimap representation since the mapping on the runners side is trivial. The converse is not true.
   
   Also, the list side input adds additional overhead in the format which the iterable does not.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-636027113


   Existing spark failure is due to BEAM-10024


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r439607449



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -157,7 +166,10 @@ private View() {}
    * PCollectionView} mapping each window to a {@link List} containing all of the elements in the
    * window.
    *
-   * <p>Unlike with {@link #asIterable}, the resulting list is required to fit in memory.
+   * <p>This view should only be used if random access and/or size of the PCollection is required.
+   * {@link #asIterable()} will perform significantly better for sequential access.

Review comment:
       Is the performance difference dependent on the runner implementation?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
##########
@@ -151,13 +279,103 @@
   }
 
   /**
-   * Implementation which is able to adapt a multimap materialization to a {@code T}.
+   * Implementation which is able to adapt an iterable materialization to a {@code T}.
    *
    * <p>For internal use only.
    *
    * <p>Instantiate via {@link PCollectionViews#singletonView}.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
+  private static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T> {

Review comment:
       Nit: consider a more descriptive naming scheme and/or comments to differentiate these from the originals.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
##########
@@ -533,4 +1251,141 @@ public String toString() {
       return Collections.singletonMap(tag, pCollection);
     }
   }
+
+  /** A {@link MultimapView} to {@link Map Map<K, V>} adapter. */
+  private static class MultimapViewToMapAdapter<K, V> extends AbstractMap<K, V> {
+    private final MultimapView<K, V> primitiveViewT;
+    private final Supplier<Integer> size;
+
+    private MultimapViewToMapAdapter(MultimapView<K, V> primitiveViewT) {
+      this.primitiveViewT = primitiveViewT;
+      this.size = Suppliers.memoize(() -> Iterables.size(primitiveViewT.get()));
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+      return primitiveViewT.get((K) key).iterator().hasNext();
+    }
+
+    @Override
+    public V get(Object key) {
+      Iterator<V> iterator = primitiveViewT.get((K) key).iterator();
+      if (!iterator.hasNext()) {
+        return null;
+      }
+      V value = iterator.next();
+      if (iterator.hasNext()) {
+        throw new IllegalArgumentException("Duplicate values for " + key);
+      }
+      return value;
+    }
+
+    @Override
+    public int size() {
+      return size.get();
+    }
+
+    @Override
+    public Set<Entry<K, V>> entrySet() {
+      return new AbstractSet<Entry<K, V>>() {
+        @Override
+        public Iterator<Entry<K, V>> iterator() {
+          return FluentIterable.from(primitiveViewT.get())
+              .<Entry<K, V>>transform((K key) -> new SimpleEntry<>(key, get(key)))
+              .iterator();
+        }
+
+        @Override
+        public boolean contains(Object o) {
+          if (!(o instanceof Entry)) {
+            return false;
+          }
+          Entry<?, ?> entry = (Entry<?, ?>) o;
+          // We treat the absence of the key in the map as a difference in these abstract sets. The
+          // underlying primitive view represents missing keys as empty iterables so we use this
+          // to check if the map contains the key first before comparing values.
+          Iterable<V> value = primitiveViewT.get((K) entry.getKey());
+          if (value.iterator().hasNext()) {
+            return false;
+          }
+          return Objects.equals(entry.getValue(), value);
+        }
+
+        @Override
+        public int size() {
+          return size.get();
+        }
+      };
+    }
+  }
+
+  /** A {@link MultimapView} to {@link Map Map<K, Iterable<V>>} adapter. */
+  private static class MultimapViewToMultimapAdapter<K, V> extends AbstractMap<K, Iterable<V>> {
+    private final MultimapView<K, V> primitiveViewT;
+    private final Supplier<Integer> size;
+
+    private MultimapViewToMultimapAdapter(MultimapView<K, V> primitiveViewT) {
+      this.primitiveViewT = primitiveViewT;
+      this.size = Suppliers.memoize(() -> Iterables.size(primitiveViewT.get()));
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+      return primitiveViewT.get((K) key).iterator().hasNext();
+    }
+
+    @Override
+    public Iterable<V> get(Object key) {
+      Iterable<V> values = primitiveViewT.get((K) key);
+      // The only was for the values iterable to be empty is for us to have never seen such a key

Review comment:
       Nit: missing a noun here

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -219,6 +231,16 @@ private View() {}
    * <p>Public only so a {@link PipelineRunner} may override its behavior.
    *
    * <p>See {@link View#asList()}.
+   *
+   * <p>The materialized format uses {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap}

Review comment:
       I'm not sure these implementation details belong at the javadoc level; consider moving them into the body of the class so it's clearer what it's describing.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
##########
@@ -71,8 +79,23 @@ public FlinkSideInputReader(
             tag.getId(), new SideInputInitializer<>(view));
     T result = sideInputs.get(window);
     if (result == null) {
-      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
-      result = viewFn.apply(InMemoryMultimapSideInputView.empty());

Review comment:
       This is unrelated to your changes, but I'm curious why we would return empty here instead of throwing an error?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
##########
@@ -1302,13 +1303,35 @@ private GloballyAsSingletonView(
 
     @Override
     public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")

Review comment:
       Are `use_runner_v2` and `use_unified_worker` synonymous? If so, which is preferred?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
##########
@@ -278,6 +528,398 @@ public IterableViewFn(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
    * <p>Instantiate via {@link PCollectionViews#listView}.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
+  @VisibleForTesting
+  static class ListViewFn2<T> extends ViewFn<MultimapView<Long, MetaOr<T, OffsetRange>>, List<T>> {
+    private TypeDescriptorSupplier<T> typeDescriptorSupplier;
+
+    public ListViewFn2(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
+      this.typeDescriptorSupplier = typeDescriptorSupplier;
+    }
+
+    @Override
+    public Materialization<MultimapView<Long, MetaOr<T, OffsetRange>>> getMaterialization() {
+      return Materializations.multimap();
+    }
+
+    @Override
+    public List<T> apply(MultimapView<Long, MetaOr<T, OffsetRange>> primitiveViewT) {
+      return Collections.unmodifiableList(new ListOverMultimapView<>(primitiveViewT));
+    }
+
+    @Override
+    public TypeDescriptor<List<T>> getTypeDescriptor() {
+      return TypeDescriptors.lists(typeDescriptorSupplier.get());
+    }
+
+    /**
+     * A {@link List} adapter over a {@link MultimapView}.
+     *
+     * <p>See {@link View.AsList} for a description of the materialized format and {@code index} to
+     * {@code (position, sub-position)} mapping details.
+     */
+    private static class ListOverMultimapView<T> extends AbstractList<T> implements RandomAccess {
+      private final MultimapView<Long, MetaOr<T, OffsetRange>> primitiveView;
+      /**
+       * A mapping from non over-lapping ranges to the number of elements at each position within
+       * that range. Ranges not specified in the mapping implicitly have 0 elements at those
+       * positions.
+       *
+       * <p>Used to quickly compute the {@code index} -> {@code (position, sub-position} within the
+       * map.
+       */
+      private final Supplier<SortedMap<OffsetRange, Integer>>
+          nonOverlappingRangesToNumElementsPerPosition;
+
+      private final Supplier<Integer> size;
+
+      private ListOverMultimapView(MultimapView<Long, MetaOr<T, OffsetRange>> primitiveView) {
+        this.primitiveView = primitiveView;
+        this.nonOverlappingRangesToNumElementsPerPosition =
+            Suppliers.memoize(
+                () ->
+                    computeOverlappingRanges(
+                        Iterables.transform(
+                            primitiveView.get(Long.MIN_VALUE), (value) -> value.getMetadata())));
+        this.size =
+            Suppliers.memoize(
+                () -> computeTotalNumElements(nonOverlappingRangesToNumElementsPerPosition.get()));
+      }
+
+      @Override
+      public T get(int index) {
+        if (index < 0 || index >= size.get()) {
+          throw new IndexOutOfBoundsException();
+        }
+        KV<Long, Integer> position =
+            computePositionForIndex(nonOverlappingRangesToNumElementsPerPosition.get(), index);
+        return Iterables.get(primitiveView.get(position.getKey()), position.getValue()).get();
+      }
+
+      @Override
+      public int size() {
+        return size.get();
+      }
+
+      @Override
+      public Iterator<T> iterator() {
+        return listIterator();
+      }
+
+      @Override
+      public ListIterator<T> listIterator() {
+        return super.listIterator();
+      }
+
+      /** A {@link ListIterator} over {@link MultimapView} adapter. */
+      private class ListIteratorOverMultimapView implements ListIterator<T> {
+        private int position;
+
+        @Override
+        public boolean hasNext() {
+          return position < size();
+        }
+
+        @Override
+        public T next() {
+          if (!hasNext()) {
+            throw new NoSuchElementException();
+          }
+          T rval = get(position);
+          position += 1;
+          return rval;
+        }
+
+        @Override
+        public boolean hasPrevious() {
+          return position > 0;
+        }
+
+        @Override
+        public T previous() {
+          if (!hasPrevious()) {
+            throw new NoSuchElementException();
+          }
+          position -= 1;
+          return get(position);
+        }
+
+        @Override
+        public int nextIndex() {
+          return position;
+        }
+
+        @Override
+        public int previousIndex() {
+          return position - 1;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void set(T e) {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void add(T e) {
+          throw new UnsupportedOperationException();
+        }
+      }
+    }
+  }
+
+  /**
+   * Compares {@link OffsetRange}s such that ranges are ordered by the smallest {@code from} and in
+   * case of a tie the smallest {@code to}.
+   */
+  @VisibleForTesting
+  static class OffsetRangeComparator implements Comparator<OffsetRange> {
+    private static final OffsetRangeComparator INSTANCE = new OffsetRangeComparator();
+
+    @Override
+    public int compare(OffsetRange o1, OffsetRange o2) {
+      int fromComparison = Longs.compare(o1.getFrom(), o2.getFrom());
+      if (fromComparison != 0) {
+        return fromComparison;
+      }
+      return Longs.compare(o1.getTo(), o2.getTo());
+    }
+  }
+
+  @VisibleForTesting
+  static SortedMap<OffsetRange, Integer> computeOverlappingRanges(Iterable<OffsetRange> ranges) {
+    ImmutableSortedMap.Builder<OffsetRange, Integer> rval =
+        ImmutableSortedMap.orderedBy(OffsetRangeComparator.INSTANCE);
+    List<OffsetRange> sortedRanges = Lists.newArrayList(ranges);
+    if (sortedRanges.isEmpty()) {
+      return rval.build();
+    }
+    Collections.sort(sortedRanges, OffsetRangeComparator.INSTANCE);
+
+    // Stores ranges in smallest 'from' and then smallest 'to' order
+    // e.g. [2, 7), [3, 4), [3, 5), [3, 5), [3, 6), [4, 0)
+    PriorityQueue<OffsetRange> rangesWithSameFrom =
+        new PriorityQueue<>(OffsetRangeComparator.INSTANCE);
+    Iterator<OffsetRange> iterator = sortedRanges.iterator();
+
+    // Stored in reverse sorted order so that when we iterate and re-add them back to
+    // overlappingRanges they are stored in sorted order from smallest to largest range.to
+    List<OffsetRange> rangesToProcess = new ArrayList<>();
+    while (iterator.hasNext()) {
+      OffsetRange current = iterator.next();
+      // Skip empty ranges
+      if (current.getFrom() == current.getTo()) {
+        continue;
+      }
+
+      // If the current range has a different 'from' then a prior range then we must produce
+      // ranges in [rangesWithSameFrom.from, current.from)
+      while (!rangesWithSameFrom.isEmpty()
+          && rangesWithSameFrom.peek().getFrom() != current.getFrom()) {
+        rangesToProcess.addAll(rangesWithSameFrom);
+        Collections.sort(rangesToProcess, OffsetRangeComparator.INSTANCE);
+        rangesWithSameFrom.clear();
+
+        int i = 0;
+        long lastTo = rangesToProcess.get(i).getFrom();
+        // Output all the ranges that are strictly less then current.from
+        // e.g. current.to := 7 for [3, 4), [3, 5), [3, 5), [3, 6) will produce
+        // [3, 4) := 4
+        // [4, 5) := 3
+        // [5, 6) := 1
+        for (; i < rangesToProcess.size(); ++i) {
+          if (rangesToProcess.get(i).getTo() > current.getFrom()) {
+            break;
+          }
+          // Output only the first of any subsequent duplicate ranges
+          if (i == 0 || rangesToProcess.get(i - 1).getTo() != rangesToProcess.get(i).getTo()) {
+            rval.put(
+                new OffsetRange(lastTo, rangesToProcess.get(i).getTo()),
+                rangesToProcess.size() - i);
+            lastTo = rangesToProcess.get(i).getTo();
+          }
+        }
+
+        // We exitted the loop with 'to' > current.from, we must add the range [lastTo,
+        // current.from) if it is non-empty
+        if (lastTo < current.getFrom() && i != rangesToProcess.size()) {
+          rval.put(new OffsetRange(lastTo, current.getFrom()), rangesToProcess.size() - i);
+        }
+
+        // The remaining ranges have a 'to' that is greater then 'current.from' and will overlap
+        // with current so add them back to rangesWithSameFrom with the updated 'from'
+        for (; i < rangesToProcess.size(); ++i) {
+          rangesWithSameFrom.add(
+              new OffsetRange(current.getFrom(), rangesToProcess.get(i).getTo()));
+        }
+
+        rangesToProcess.clear();
+      }
+      rangesWithSameFrom.add(current);
+    }
+
+    // Process the last chunk of overlapping ranges
+    while (!rangesWithSameFrom.isEmpty()) {
+      // This range always represents the range with with the smallest 'to'
+      OffsetRange current = rangesWithSameFrom.remove();
+
+      rangesToProcess.addAll(rangesWithSameFrom);
+      Collections.sort(rangesToProcess, OffsetRangeComparator.INSTANCE);
+      rangesWithSameFrom.clear();
+
+      rval.put(current, rangesToProcess.size() + 1 /* include current */);
+
+      // Shorten all the remaining ranges such that they start with current.to
+      for (OffsetRange rangeWithDifferentFrom : rangesToProcess) {
+        // Skip any duplicates of current
+        if (rangeWithDifferentFrom.getTo() > current.getTo()) {
+          rangesWithSameFrom.add(new OffsetRange(current.getTo(), rangeWithDifferentFrom.getTo()));
+        }
+      }
+      rangesToProcess.clear();
+    }
+    return rval.build();
+  }
+
+  @VisibleForTesting
+  static int computeTotalNumElements(
+      Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition) {
+    long sum = 0;
+    for (Map.Entry<OffsetRange, Integer> range :
+        nonOverlappingRangesToNumElementsPerPosition.entrySet()) {
+      sum +=
+          Math.multiplyExact(
+              Math.subtractExact(range.getKey().getTo(), range.getKey().getFrom()),
+              range.getValue());
+    }
+    return Ints.checkedCast(sum);
+  }
+
+  @VisibleForTesting
+  static KV<Long, Integer> computePositionForIndex(
+      Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition, int index) {
+    if (index < 0) {
+      throw new IndexOutOfBoundsException(
+          String.format(
+              "Position %s was out of bounds for ranges %s.",
+              index, nonOverlappingRangesToNumElementsPerPosition));
+    }
+    for (Map.Entry<OffsetRange, Integer> range :
+        nonOverlappingRangesToNumElementsPerPosition.entrySet()) {
+      int numElementsInRange =
+          Ints.checkedCast(
+              Math.multiplyExact(
+                  Math.subtractExact(range.getKey().getTo(), range.getKey().getFrom()),
+                  range.getValue()));
+      if (numElementsInRange <= index) {
+        index -= numElementsInRange;
+        continue;
+      }
+      long position = range.getKey().getFrom() + index / range.getValue();
+      int subPosition = index % range.getValue();
+      return KV.of(position, subPosition);
+    }
+    throw new IndexOutOfBoundsException(
+        String.format(
+            "Position %s was out of bounds for ranges %s.",
+            index, nonOverlappingRangesToNumElementsPerPosition));
+  }
+
+  /** Stores values or metadata about values. */
+  public static class MetaOr<T, MetaT> {

Review comment:
       Nit: Can you rename this class? Maybe something like "ValueOrMetadata"?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -496,6 +622,22 @@ public void processElement(@Element T element, OutputReceiver<KV<Void, T>> r) {
     }
   }
 
+  /** An identity {@link PTransform}. */
+  private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {

Review comment:
       Is this used anywhere?

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionViewsTest.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import static org.apache.beam.sdk.values.PCollectionViews.computeOverlappingRanges;
+import static org.apache.beam.sdk.values.PCollectionViews.computePositionForIndex;
+import static org.apache.beam.sdk.values.PCollectionViews.computeTotalNumElements;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PCollectionViews}. */
+@RunWith(JUnit4.class)
+public class PCollectionViewsTest {
+  @Test
+  public void testEmpty() {
+    Iterable<OffsetRange> ranges = Collections.emptyList();
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(nonOverlappingRangesToNumElementsPerPosition, Collections.emptyMap());
+    assertEquals(0, computeTotalNumElements(nonOverlappingRangesToNumElementsPerPosition));
+    assertThrows(
+        IndexOutOfBoundsException.class,
+        () -> computePositionForIndex(nonOverlappingRangesToNumElementsPerPosition, 0));
+  }
+
+  @Test
+  public void testNoOverlapping() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 2), range(4, 6));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.of(range(0, 2), 1, range(4, 6), 1),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOnlyTouchingRanges() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 4), range(4, 8), range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.of(range(0, 4), 1, range(4, 8), 1, range(8, 12), 1),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testRangesWithAtMostOneOverlap() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 6), range(4, 10), range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder()
+            .put(range(0, 4), 1)
+            .put(range(4, 6), 2)
+            .put(range(6, 8), 1)
+            .put(range(8, 10), 2)
+            .put(range(10, 12), 1)
+            .build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOverlappingFroms() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 4), range(0, 8), range(0, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 3).put(range(4, 8), 2).put(range(8, 12), 1).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOverlappingTos() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 12), range(4, 12), range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 1).put(range(4, 8), 2).put(range(8, 12), 3).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOverlappingFromsAndTos() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 4), range(0, 4), range(0, 4));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 3).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testMultipleOverlapsForTheSameRange() {
+    Iterable<OffsetRange> ranges =
+        Arrays.asList(
+            range(0, 4),
+            range(0, 8),
+            range(0, 12),
+            range(0, 12),
+            range(4, 12),
+            range(8, 12),
+            range(0, 4),
+            range(0, 8),
+            range(0, 12),
+            range(0, 12),
+            range(4, 12),
+            range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 8).put(range(4, 8), 8).put(range(8, 12), 8).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testIncreasingOverlaps() {
+    Iterable<OffsetRange> ranges =
+        Arrays.asList(range(0, 4), range(1, 5), range(2, 6), range(3, 7), range(4, 8), range(5, 9));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder()
+            .put(range(0, 1), 1)
+            .put(range(1, 2), 2)
+            .put(range(2, 3), 3)
+            .put(range(3, 4), 4)
+            .put(range(4, 5), 4)
+            .put(range(5, 6), 4)
+            .put(range(6, 7), 3)
+            .put(range(7, 8), 2)
+            .put(range(8, 9), 1)
+            .build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testNestedOverlaps() {
+    Iterable<OffsetRange> ranges =
+        Arrays.asList(range(0, 8), range(1, 7), range(2, 6), range(3, 5));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder()
+            .put(range(0, 1), 1)
+            .put(range(1, 2), 2)
+            .put(range(2, 3), 3)
+            .put(range(3, 5), 4)
+            .put(range(5, 6), 3)
+            .put(range(6, 7), 2)
+            .put(range(7, 8), 1)
+            .build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testRandomRanges() {
+    Random random = new Random(123892154890L);

Review comment:
       Where'd this number come from?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-639108631


   R: @Ardagan @amaliujia 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440427252



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
##########
@@ -151,13 +279,103 @@
   }
 
   /**
-   * Implementation which is able to adapt a multimap materialization to a {@code T}.
+   * Implementation which is able to adapt an iterable materialization to a {@code T}.
    *
    * <p>For internal use only.
    *
    * <p>Instantiate via {@link PCollectionViews#singletonView}.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
+  private static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T> {

Review comment:
       The intent is that SingletonViewFn/MultimapViewFn/... would be removed and the SingletonViewFn2/MultimapViewFn2 would replace them once the JRH no longer exists.
   
   Added comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440428475



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
##########
@@ -278,6 +528,398 @@ public IterableViewFn(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
    * <p>Instantiate via {@link PCollectionViews#listView}.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
+  @VisibleForTesting
+  static class ListViewFn2<T> extends ViewFn<MultimapView<Long, MetaOr<T, OffsetRange>>, List<T>> {
+    private TypeDescriptorSupplier<T> typeDescriptorSupplier;
+
+    public ListViewFn2(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
+      this.typeDescriptorSupplier = typeDescriptorSupplier;
+    }
+
+    @Override
+    public Materialization<MultimapView<Long, MetaOr<T, OffsetRange>>> getMaterialization() {
+      return Materializations.multimap();
+    }
+
+    @Override
+    public List<T> apply(MultimapView<Long, MetaOr<T, OffsetRange>> primitiveViewT) {
+      return Collections.unmodifiableList(new ListOverMultimapView<>(primitiveViewT));
+    }
+
+    @Override
+    public TypeDescriptor<List<T>> getTypeDescriptor() {
+      return TypeDescriptors.lists(typeDescriptorSupplier.get());
+    }
+
+    /**
+     * A {@link List} adapter over a {@link MultimapView}.
+     *
+     * <p>See {@link View.AsList} for a description of the materialized format and {@code index} to
+     * {@code (position, sub-position)} mapping details.
+     */
+    private static class ListOverMultimapView<T> extends AbstractList<T> implements RandomAccess {
+      private final MultimapView<Long, MetaOr<T, OffsetRange>> primitiveView;
+      /**
+       * A mapping from non over-lapping ranges to the number of elements at each position within
+       * that range. Ranges not specified in the mapping implicitly have 0 elements at those
+       * positions.
+       *
+       * <p>Used to quickly compute the {@code index} -> {@code (position, sub-position} within the
+       * map.
+       */
+      private final Supplier<SortedMap<OffsetRange, Integer>>
+          nonOverlappingRangesToNumElementsPerPosition;
+
+      private final Supplier<Integer> size;
+
+      private ListOverMultimapView(MultimapView<Long, MetaOr<T, OffsetRange>> primitiveView) {
+        this.primitiveView = primitiveView;
+        this.nonOverlappingRangesToNumElementsPerPosition =
+            Suppliers.memoize(
+                () ->
+                    computeOverlappingRanges(
+                        Iterables.transform(
+                            primitiveView.get(Long.MIN_VALUE), (value) -> value.getMetadata())));
+        this.size =
+            Suppliers.memoize(
+                () -> computeTotalNumElements(nonOverlappingRangesToNumElementsPerPosition.get()));
+      }
+
+      @Override
+      public T get(int index) {
+        if (index < 0 || index >= size.get()) {
+          throw new IndexOutOfBoundsException();
+        }
+        KV<Long, Integer> position =
+            computePositionForIndex(nonOverlappingRangesToNumElementsPerPosition.get(), index);
+        return Iterables.get(primitiveView.get(position.getKey()), position.getValue()).get();
+      }
+
+      @Override
+      public int size() {
+        return size.get();
+      }
+
+      @Override
+      public Iterator<T> iterator() {
+        return listIterator();
+      }
+
+      @Override
+      public ListIterator<T> listIterator() {
+        return super.listIterator();
+      }
+
+      /** A {@link ListIterator} over {@link MultimapView} adapter. */
+      private class ListIteratorOverMultimapView implements ListIterator<T> {
+        private int position;
+
+        @Override
+        public boolean hasNext() {
+          return position < size();
+        }
+
+        @Override
+        public T next() {
+          if (!hasNext()) {
+            throw new NoSuchElementException();
+          }
+          T rval = get(position);
+          position += 1;
+          return rval;
+        }
+
+        @Override
+        public boolean hasPrevious() {
+          return position > 0;
+        }
+
+        @Override
+        public T previous() {
+          if (!hasPrevious()) {
+            throw new NoSuchElementException();
+          }
+          position -= 1;
+          return get(position);
+        }
+
+        @Override
+        public int nextIndex() {
+          return position;
+        }
+
+        @Override
+        public int previousIndex() {
+          return position - 1;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void set(T e) {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void add(T e) {
+          throw new UnsupportedOperationException();
+        }
+      }
+    }
+  }
+
+  /**
+   * Compares {@link OffsetRange}s such that ranges are ordered by the smallest {@code from} and in
+   * case of a tie the smallest {@code to}.
+   */
+  @VisibleForTesting
+  static class OffsetRangeComparator implements Comparator<OffsetRange> {
+    private static final OffsetRangeComparator INSTANCE = new OffsetRangeComparator();
+
+    @Override
+    public int compare(OffsetRange o1, OffsetRange o2) {
+      int fromComparison = Longs.compare(o1.getFrom(), o2.getFrom());
+      if (fromComparison != 0) {
+        return fromComparison;
+      }
+      return Longs.compare(o1.getTo(), o2.getTo());
+    }
+  }
+
+  @VisibleForTesting
+  static SortedMap<OffsetRange, Integer> computeOverlappingRanges(Iterable<OffsetRange> ranges) {
+    ImmutableSortedMap.Builder<OffsetRange, Integer> rval =
+        ImmutableSortedMap.orderedBy(OffsetRangeComparator.INSTANCE);
+    List<OffsetRange> sortedRanges = Lists.newArrayList(ranges);
+    if (sortedRanges.isEmpty()) {
+      return rval.build();
+    }
+    Collections.sort(sortedRanges, OffsetRangeComparator.INSTANCE);
+
+    // Stores ranges in smallest 'from' and then smallest 'to' order
+    // e.g. [2, 7), [3, 4), [3, 5), [3, 5), [3, 6), [4, 0)
+    PriorityQueue<OffsetRange> rangesWithSameFrom =
+        new PriorityQueue<>(OffsetRangeComparator.INSTANCE);
+    Iterator<OffsetRange> iterator = sortedRanges.iterator();
+
+    // Stored in reverse sorted order so that when we iterate and re-add them back to
+    // overlappingRanges they are stored in sorted order from smallest to largest range.to
+    List<OffsetRange> rangesToProcess = new ArrayList<>();
+    while (iterator.hasNext()) {
+      OffsetRange current = iterator.next();
+      // Skip empty ranges
+      if (current.getFrom() == current.getTo()) {
+        continue;
+      }
+
+      // If the current range has a different 'from' then a prior range then we must produce
+      // ranges in [rangesWithSameFrom.from, current.from)
+      while (!rangesWithSameFrom.isEmpty()
+          && rangesWithSameFrom.peek().getFrom() != current.getFrom()) {
+        rangesToProcess.addAll(rangesWithSameFrom);
+        Collections.sort(rangesToProcess, OffsetRangeComparator.INSTANCE);
+        rangesWithSameFrom.clear();
+
+        int i = 0;
+        long lastTo = rangesToProcess.get(i).getFrom();
+        // Output all the ranges that are strictly less then current.from
+        // e.g. current.to := 7 for [3, 4), [3, 5), [3, 5), [3, 6) will produce
+        // [3, 4) := 4
+        // [4, 5) := 3
+        // [5, 6) := 1
+        for (; i < rangesToProcess.size(); ++i) {
+          if (rangesToProcess.get(i).getTo() > current.getFrom()) {
+            break;
+          }
+          // Output only the first of any subsequent duplicate ranges
+          if (i == 0 || rangesToProcess.get(i - 1).getTo() != rangesToProcess.get(i).getTo()) {
+            rval.put(
+                new OffsetRange(lastTo, rangesToProcess.get(i).getTo()),
+                rangesToProcess.size() - i);
+            lastTo = rangesToProcess.get(i).getTo();
+          }
+        }
+
+        // We exitted the loop with 'to' > current.from, we must add the range [lastTo,
+        // current.from) if it is non-empty
+        if (lastTo < current.getFrom() && i != rangesToProcess.size()) {
+          rval.put(new OffsetRange(lastTo, current.getFrom()), rangesToProcess.size() - i);
+        }
+
+        // The remaining ranges have a 'to' that is greater then 'current.from' and will overlap
+        // with current so add them back to rangesWithSameFrom with the updated 'from'
+        for (; i < rangesToProcess.size(); ++i) {
+          rangesWithSameFrom.add(
+              new OffsetRange(current.getFrom(), rangesToProcess.get(i).getTo()));
+        }
+
+        rangesToProcess.clear();
+      }
+      rangesWithSameFrom.add(current);
+    }
+
+    // Process the last chunk of overlapping ranges
+    while (!rangesWithSameFrom.isEmpty()) {
+      // This range always represents the range with with the smallest 'to'
+      OffsetRange current = rangesWithSameFrom.remove();
+
+      rangesToProcess.addAll(rangesWithSameFrom);
+      Collections.sort(rangesToProcess, OffsetRangeComparator.INSTANCE);
+      rangesWithSameFrom.clear();
+
+      rval.put(current, rangesToProcess.size() + 1 /* include current */);
+
+      // Shorten all the remaining ranges such that they start with current.to
+      for (OffsetRange rangeWithDifferentFrom : rangesToProcess) {
+        // Skip any duplicates of current
+        if (rangeWithDifferentFrom.getTo() > current.getTo()) {
+          rangesWithSameFrom.add(new OffsetRange(current.getTo(), rangeWithDifferentFrom.getTo()));
+        }
+      }
+      rangesToProcess.clear();
+    }
+    return rval.build();
+  }
+
+  @VisibleForTesting
+  static int computeTotalNumElements(
+      Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition) {
+    long sum = 0;
+    for (Map.Entry<OffsetRange, Integer> range :
+        nonOverlappingRangesToNumElementsPerPosition.entrySet()) {
+      sum +=
+          Math.multiplyExact(
+              Math.subtractExact(range.getKey().getTo(), range.getKey().getFrom()),
+              range.getValue());
+    }
+    return Ints.checkedCast(sum);
+  }
+
+  @VisibleForTesting
+  static KV<Long, Integer> computePositionForIndex(
+      Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition, int index) {
+    if (index < 0) {
+      throw new IndexOutOfBoundsException(
+          String.format(
+              "Position %s was out of bounds for ranges %s.",
+              index, nonOverlappingRangesToNumElementsPerPosition));
+    }
+    for (Map.Entry<OffsetRange, Integer> range :
+        nonOverlappingRangesToNumElementsPerPosition.entrySet()) {
+      int numElementsInRange =
+          Ints.checkedCast(
+              Math.multiplyExact(
+                  Math.subtractExact(range.getKey().getTo(), range.getKey().getFrom()),
+                  range.getValue()));
+      if (numElementsInRange <= index) {
+        index -= numElementsInRange;
+        continue;
+      }
+      long position = range.getKey().getFrom() + index / range.getValue();
+      int subPosition = index % range.getValue();
+      return KV.of(position, subPosition);
+    }
+    throw new IndexOutOfBoundsException(
+        String.format(
+            "Position %s was out of bounds for ranges %s.",
+            index, nonOverlappingRangesToNumElementsPerPosition));
+  }
+
+  /** Stores values or metadata about values. */
+  public static class MetaOr<T, MetaT> {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635620885






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-637154715






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-637174817


   This is ready for review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440456092



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -157,7 +166,10 @@ private View() {}
    * PCollectionView} mapping each window to a {@link List} containing all of the elements in the
    * window.
    *
-   * <p>Unlike with {@link #asIterable}, the resulting list is required to fit in memory.
+   * <p>This view should only be used if random access and/or size of the PCollection is required.
+   * {@link #asIterable()} will perform significantly better for sequential access.

Review comment:
       Thanks for the explanation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-637086679


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635644599


   Run Spark ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635503702


   Run Java Spark PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440422483



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -157,7 +166,10 @@ private View() {}
    * PCollectionView} mapping each window to a {@link List} containing all of the elements in the
    * window.
    *
-   * <p>Unlike with {@link #asIterable}, the resulting list is required to fit in memory.
+   * <p>This view should only be used if random access and/or size of the PCollection is required.
+   * {@link #asIterable()} will perform significantly better for sequential access.

Review comment:
       No, most runners will be able to easily support an efficient iterable representation even if they only support an efficient multimap representation since the mapping on the runners side is trivial. The converse is not true.
   
   Also, the list side input adds additional overhead in the format which the iterable does not.
   
   Finally, the list side input is about accessing elements in a random access fashion and the current format is likely to only assign one element underneath each key so sequential access will be much better under the iterable case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [WIP] [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635451129


   > I got notified via email for a review but I can't find the review comment here. Do you still want this to be reviewed?
   
   I removed the comment once I noticed that this wasn't passing all the tests. I'm close now and just want to make sure that I didn't break Dataflow either.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440421252



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
##########
@@ -1302,13 +1303,35 @@ private GloballyAsSingletonView(
 
     @Override
     public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")

Review comment:
       `use_runner_v2` is preferred but internal test framework within Google has not been fully migrated to use one over the other. Also Googlers typically use `use_unified_worker`

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
##########
@@ -71,8 +79,23 @@ public FlinkSideInputReader(
             tag.getId(), new SideInputInitializer<>(view));
     T result = sideInputs.get(window);
     if (result == null) {
-      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
-      result = viewFn.apply(InMemoryMultimapSideInputView.empty());

Review comment:
       In this case the side input is a valid side input but Flink says it has no data for it by returning null. Also `viewFn.apply` expects a non-null value which is why we pass in an empty view.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-637086679






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635620991


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [WIP] [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635451337


   > > I got notified via email for a review but I can't find the review comment here. Do you still want this to be reviewed?
   > 
   > I removed the comment once I noticed that this wasn't passing all the tests. I'm close now and just want to make sure that I didn't break Dataflow either. I'll retag you soon.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440455768



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionViewsTest.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import static org.apache.beam.sdk.values.PCollectionViews.computeOverlappingRanges;
+import static org.apache.beam.sdk.values.PCollectionViews.computePositionForIndex;
+import static org.apache.beam.sdk.values.PCollectionViews.computeTotalNumElements;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PCollectionViews}. */
+@RunWith(JUnit4.class)
+public class PCollectionViewsTest {
+  @Test
+  public void testEmpty() {
+    Iterable<OffsetRange> ranges = Collections.emptyList();
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(nonOverlappingRangesToNumElementsPerPosition, Collections.emptyMap());
+    assertEquals(0, computeTotalNumElements(nonOverlappingRangesToNumElementsPerPosition));
+    assertThrows(
+        IndexOutOfBoundsException.class,
+        () -> computePositionForIndex(nonOverlappingRangesToNumElementsPerPosition, 0));
+  }
+
+  @Test
+  public void testNoOverlapping() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 2), range(4, 6));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.of(range(0, 2), 1, range(4, 6), 1),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOnlyTouchingRanges() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 4), range(4, 8), range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.of(range(0, 4), 1, range(4, 8), 1, range(8, 12), 1),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testRangesWithAtMostOneOverlap() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 6), range(4, 10), range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder()
+            .put(range(0, 4), 1)
+            .put(range(4, 6), 2)
+            .put(range(6, 8), 1)
+            .put(range(8, 10), 2)
+            .put(range(10, 12), 1)
+            .build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOverlappingFroms() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 4), range(0, 8), range(0, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 3).put(range(4, 8), 2).put(range(8, 12), 1).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOverlappingTos() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 12), range(4, 12), range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 1).put(range(4, 8), 2).put(range(8, 12), 3).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testOverlappingFromsAndTos() {
+    Iterable<OffsetRange> ranges = Arrays.asList(range(0, 4), range(0, 4), range(0, 4));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 3).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testMultipleOverlapsForTheSameRange() {
+    Iterable<OffsetRange> ranges =
+        Arrays.asList(
+            range(0, 4),
+            range(0, 8),
+            range(0, 12),
+            range(0, 12),
+            range(4, 12),
+            range(8, 12),
+            range(0, 4),
+            range(0, 8),
+            range(0, 12),
+            range(0, 12),
+            range(4, 12),
+            range(8, 12));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder().put(range(0, 4), 8).put(range(4, 8), 8).put(range(8, 12), 8).build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testIncreasingOverlaps() {
+    Iterable<OffsetRange> ranges =
+        Arrays.asList(range(0, 4), range(1, 5), range(2, 6), range(3, 7), range(4, 8), range(5, 9));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder()
+            .put(range(0, 1), 1)
+            .put(range(1, 2), 2)
+            .put(range(2, 3), 3)
+            .put(range(3, 4), 4)
+            .put(range(4, 5), 4)
+            .put(range(5, 6), 4)
+            .put(range(6, 7), 3)
+            .put(range(7, 8), 2)
+            .put(range(8, 9), 1)
+            .build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testNestedOverlaps() {
+    Iterable<OffsetRange> ranges =
+        Arrays.asList(range(0, 8), range(1, 7), range(2, 6), range(3, 5));
+
+    Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition =
+        computeOverlappingRanges(ranges);
+    assertEquals(
+        ImmutableMap.builder()
+            .put(range(0, 1), 1)
+            .put(range(1, 2), 2)
+            .put(range(2, 3), 3)
+            .put(range(3, 5), 4)
+            .put(range(5, 6), 3)
+            .put(range(6, 7), 2)
+            .put(range(7, 8), 1)
+            .build(),
+        nonOverlappingRangesToNumElementsPerPosition);
+    assertNonEmptyRangesAndPositions(ranges, nonOverlappingRangesToNumElementsPerPosition);
+  }
+
+  @Test
+  public void testRandomRanges() {
+    Random random = new Random(123892154890L);

Review comment:
       Makes sense. Can you add a comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440427252



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
##########
@@ -151,13 +279,103 @@
   }
 
   /**
-   * Implementation which is able to adapt a multimap materialization to a {@code T}.
+   * Implementation which is able to adapt an iterable materialization to a {@code T}.
    *
    * <p>For internal use only.
    *
    * <p>Instantiate via {@link PCollectionViews#singletonView}.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
+  private static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T> {

Review comment:
       The intent is that SingletonViewFn/MultimapViewFn/... would be removed and the SingletonViewFn2/MultimapViewFn2 would replace them once the JRH no longer exists.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-640687823






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635503425


   Run Flink ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-642957883


   R: @ibzib 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440426082



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -219,6 +231,16 @@ private View() {}
    * <p>Public only so a {@link PipelineRunner} may override its behavior.
    *
    * <p>See {@link View#asList()}.
+   *
+   * <p>The materialized format uses {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap}

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11821: [WIP] [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635201854


   I got notified via email for a review but I can't find the review comment here. Do you still want this to be reviewed? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [WIP] [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635062102


   Run Java Spark PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [WIP] [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635061994






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440427252



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
##########
@@ -151,13 +279,103 @@
   }
 
   /**
-   * Implementation which is able to adapt a multimap materialization to a {@code T}.
+   * Implementation which is able to adapt an iterable materialization to a {@code T}.
    *
    * <p>For internal use only.
    *
    * <p>Instantiate via {@link PCollectionViews#singletonView}.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
+  private static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T> {

Review comment:
       The intent is that SingletonViewFn/MultimapViewFn/... would be removed and the SingletonViewFn2/MultimapViewFn2 would replace them once the JRH no longer exists.
   
   Renamed them in the meantime.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635621052






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-637174452


   Existing spark failure is due to BEAM-10024


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635722845


   Run Spark ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [WIP] [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635061728






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11821:
URL: https://github.com/apache/beam/pull/11821#issuecomment-635503475






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #11821: [BEAM-10097, BEAM-5982, BEAM-3080] Use primitive views directly instead of transforming KV> to the view type via a naive mapping.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #11821:
URL: https://github.com/apache/beam/pull/11821


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org