You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/08 23:38:40 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #5855: [HUDI-4249] Fixing in-memory `HoodieData` implementation to operate lazily

xushiyan commented on code in PR #5855:
URL: https://github.com/apache/hudi/pull/5855#discussion_r917164576


##########
hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java:
##########
@@ -107,6 +108,19 @@ public static <K, V> HashMap<K, V> combine(Map<K, V> one, Map<K, V> another) {
     return combined;
   }
 
+  /**
+   * Combines provided {@link Map}s into one, returning new instance of {@link HashMap}.
+   *
+   * NOTE: That values associated with overlapping keys from the second map, will override
+   *       values from the first one
+   */
+  public static <K, V> HashMap<K, V> combine(Map<K, V> one, Map<K, V> another, BiFunction<V, V, V> merge) {

Review Comment:
   UT cover this?



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
+import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+
+/**
+ * In-memory implementation of {@link HoodiePairData} holding internally a {@link Stream} of {@link Pair}s.
+ *
+ * NOTE: This is an in-memory counterpart for {@code HoodieJavaPairRDD}, and it strives to provide
+ *       similar semantic as RDD container -- all intermediate (non-terminal, not de-referencing
+ *       the stream like "collect", "groupBy", etc) operations are executed *lazily*.
+ *       This allows to make sure that compute/memory churn is minimal since only necessary
+ *       computations will ultimately be performed.
+ *
+ * @param <K> type of the key in the pair
+ * @param <V> type of the value in the pair
+ */
+public class HoodieListPairData<K, V> extends HoodiePairData<K, V> {

Review Comment:
   add UT for this?



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
+import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+
+/**
+ * In-memory implementation of {@link HoodiePairData} holding internally a {@link Stream} of {@link Pair}s.
+ *
+ * NOTE: This is an in-memory counterpart for {@code HoodieJavaPairRDD}, and it strives to provide
+ *       similar semantic as RDD container -- all intermediate (non-terminal, not de-referencing
+ *       the stream like "collect", "groupBy", etc) operations are executed *lazily*.
+ *       This allows to make sure that compute/memory churn is minimal since only necessary
+ *       computations will ultimately be performed.
+ *
+ * @param <K> type of the key in the pair
+ * @param <V> type of the value in the pair
+ */
+public class HoodieListPairData<K, V> extends HoodiePairData<K, V> {
+
+  private final Stream<Pair<K, V>> dataStream;
+
+  public HoodieListPairData(List<Pair<K, V>> data) {
+    this.dataStream = data.stream().parallel();
+  }
+
+  HoodieListPairData(Stream<Pair<K, V>> dataStream) {
+    this.dataStream = dataStream;
+  }
+
+  @Override
+  public List<Pair<K, V>> get() {
+    return dataStream.collect(Collectors.toList());
+  }
+
+  @Override
+  public void persist(String cacheConfig) {
+    // no-op
+  }
+
+  @Override
+  public void unpersist() {
+    // no-op
+  }
+
+  @Override
+  public HoodieData<K> keys() {
+    return new HoodieListData<>(dataStream.map(Pair::getKey));
+  }
+
+  @Override
+  public HoodieData<V> values() {
+    return new HoodieListData<>(dataStream.map(Pair::getValue));
+  }
+
+  @Override
+  public long count() {
+    return dataStream.count();
+  }
+
+  @Override
+  public Map<K, Long> countByKey() {
+    return dataStream.collect(Collectors.groupingBy(Pair::getKey, Collectors.counting()));
+  }
+
+  @Override
+  public HoodiePairData<K, Iterable<V>> groupByKey() {
+    Collector<Pair<K, V>, ?, List<V>> mappingCollector = Collectors.mapping(Pair::getValue, Collectors.toList());
+    Collector<Pair<K, V>, ?, Map<K, List<V>>> groupingCollector =
+        Collectors.groupingBy(Pair::getKey, mappingCollector);
+
+    Map<K, List<V>> groupedByKey = dataStream.collect(groupingCollector);
+    return new HoodieListPairData<>(groupedByKey.entrySet().stream()
+        .map(e -> Pair.of(e.getKey(), e.getValue())));
+  }
+
+  @Override
+  public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> combiner, int parallelism) {
+    Map<K, java.util.Optional<V>> reducedMap = dataStream.collect(
+        Collectors.groupingBy(
+            Pair::getKey,
+            HashMap::new,
+            Collectors.mapping(Pair::getValue, Collectors.reducing(combiner::apply))));

Review Comment:
   like this style over declaring the local vars in groupByKey() (var types are verbose and barely telling anything). better to align on the style.



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java:
##########
@@ -21,108 +21,163 @@
 
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.collection.Pair;
 
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 
 /**
- * An abstraction for a data collection of objects in type T to store the reference
- * and do transformation.
+ * An interface abstracting a container holding a collection of objects of type {@code T}
+ * allowing to perform common transformation on it.
  *
- * @param <T> type of object.
+ * This abstraction provides common API implemented by
+ * <ol>
+ *   <li>In-memory implementation ({@code HoodieListData}, {@code HoodieListPairData}), where all objects
+ *   are held in-memory by the executing process</li>
+ *   <li>RDD-based implementation ({@code HoodieJavaRDD}, etc)</li>, where underlying collection is held
+ *   by an RDD allowing to execute transformations using Spark engine on the cluster
+ * </ol>
+ *
+ * All implementations provide for consistent semantic, where
+ * <ul>
+ *   <li>All non-terminal* operations are executed lazily (for ex, {@code map}, {@code filter}, etc)</li>
+ *   <li>All terminal operations are executed eagerly, executing all previously accumulated transformations.
+ *   Note that, collection could not be re-used after invoking terminal operation on it.</li>
+ * </ul>
+ *
+ * @param <T> type of object
  */
 public abstract class HoodieData<T> implements Serializable {
-  /**
-   * @return the collection of objects.
-   */
-  public abstract Object get();
 
   /**
-   * Caches the data.
-   *
-   * @param cacheConfig config value for caching.
+   * Persists the data w/ provided {@code level} (if applicable)
    */
-  public abstract void persist(String cacheConfig);
+  public abstract void persist(String level);
 
   /**
-   * Removes the cached data.
+   * Un-persists the data (if previously persisted)
    */
   public abstract void unpersist();
 
   /**
-   * @return whether the collection is empty.
+   * Returns whether the collection is empty.
    */
   public abstract boolean isEmpty();
 
   /**
-   * @return the number of objects.
+   * Returns number of objects held in the collection
+   *
+   * NOTE: This is a terminal operation
    */
   public abstract long count();
 
   /**
-   * @param func serializable map function.
-   * @param <O>  output object type.
-   * @return {@link HoodieData<O>} containing the result. Actual execution may be deferred.
+   * Maps every element in the collection using provided mapping {@code func}.
+   *
+   * This is an intermediate operation
+   *
+   * @param func serializable map function
+   * @param <O>  output object type
+   * @return {@link HoodieData<O>} holding mapped elements
    */
   public abstract <O> HoodieData<O> map(SerializableFunction<T, O> func);
 
   /**
-   * @param func                  serializable map function by taking a partition of objects
-   *                              and generating an iterator.
-   * @param preservesPartitioning whether to preserve partitions in the result.
-   * @param <O>                   output object type.
-   * @return {@link HoodieData<O>} containing the result. Actual execution may be deferred.
-   */
-  public abstract <O> HoodieData<O> mapPartitions(
-      SerializableFunction<Iterator<T>, Iterator<O>> func, boolean preservesPartitioning);
-
-  /**
-   * @param func                  serializable map function by taking a partition of objects
-   *                              and generating an iterator.
-   * @param <O>                   output object type.
-   * @return {@link HoodieData<O>} containing the result. Actual execution may be deferred.
+   * Maps every element in the collection's partition (if applicable) by applying provided
+   * mapping {@code func} to every collection's partition
+   *
+   * This is an intermediate operation
+   *
+   * @param func                  serializable map function accepting {@link Iterator} of a single
+   *                              partition's elements and returning a new {@link Iterator} mapping
+   *                              every element of the partition into a new one
+   * @param preservesPartitioning whether to preserve partitioning in the resulting collection
+   * @param <O>                   output object type
+   * @return {@link HoodieData<O>} holding mapped elements
    */
-  public abstract <O> HoodieData<O> mapPartitions(
-      SerializableFunction<Iterator<T>, Iterator<O>> func);
+  public abstract <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>,
+      Iterator<O>> func, boolean preservesPartitioning);
 
   /**
-   * @param func serializable flatmap function.
-   * @param <O>  output object type.
-   * @return {@link HoodieData<O>} containing the result. Actual execution may be deferred.
+   * Maps every element in the collection into a collection of the new elements (provided by
+   * {@link Iterator}) using provided mapping {@code func}, subsequently flattening the result
+   * (by concatenating) into a single collection
+   *
+   * This is an intermediate operation
+   *
+   * @param func serializable function mapping every element {@link T} into {@code Iterator<O>}
+   * @param <O>  output object type
+   * @return {@link HoodieData<O>} holding mapped elements
    */
   public abstract <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func);
 
   /**
-   * @param mapToPairFunc serializable map function to generate a pair.
-   * @param <K>           key type of the pair.
-   * @param <V>           value type of the pair.
-   * @return {@link HoodiePairData<K, V>} containing the result. Actual execution may be deferred.
+   * Maps every element in the collection using provided mapping {@code func} into a {@link Pair<K, V>}
+   * of elements {@code K} and {@code V}
+   * <p>
+   * This is an intermediate operation
+   *
+   * @param func serializable map function
+   * @param <K>  key type of the pair
+   * @param <V>  value type of the pair
+   * @return {@link HoodiePairData<K, V>} holding mapped elements
    */
-  public abstract <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, V> mapToPairFunc);
+  public abstract <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, V> func);
 
   /**
-   * @return distinct objects in {@link HoodieData}.
+   * Returns new {@link HoodieData} collection holding only distinct objects of the original one
+   *
+   * This is a stateful intermediate operation
    */
   public abstract HoodieData<T> distinct();
 
+  /**
+   * Returns new {@link HoodieData} collection holding only distinct objects of the original one
+   *
+   * This is a stateful intermediate operation
+   */
   public abstract HoodieData<T> distinct(int parallelism);
 
-  public abstract <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> keyGetter, int parallelism);
-
+  /**
+   * Returns new instance of {@link HoodieData} collection only containing elements matching provided
+   * {@code filterFunc} (ie ones it returns true on)
+   *
+   * @param filterFunc filtering func either accepting or rejecting the elements
+   * @return {@link HoodieData<T>} holding filtered elements
+   */
   public abstract HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc);
 
   /**
-   * Unions this {@link HoodieData} with other {@link HoodieData}.
-   * @param other {@link HoodieData} of interest.
-   * @return the union of two as as instance of {@link HoodieData}.
+   * Unions {@link HoodieData} with another instance of {@link HoodieData}.
+   * Note that, it's only able to union same underlying collection implementations.
+   *
+   * This is a stateful intermediate operation
+   *
+   * @param other {@link HoodieData} collection
+   * @return {@link HoodieData<T>} holding superset of elements of this and {@code other} collections
    */
   public abstract HoodieData<T> union(HoodieData<T> other);
 
   /**
-   * @return collected results in {@link List<T>}.
+   * Collects results of the underlying collection into a {@link List<T>}
+   *
+   * This is a terminal operation
    */
   public abstract List<T> collectAsList();
 
+  /**
+   * Re-partitions underlying collection (if applicable) making sure new {@link HoodieData} has
+   * exactly {@code parallelism} partitions
+   *
+   * @param parallelism target number of partitions in the underlying collection
+   * @return {@link HoodieData<T>} holding re-partitioned collection
+   */
   public abstract HoodieData<T> repartition(int parallelism);
+
+  public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> keyGetter, int parallelism) {

Review Comment:
   javadoc this too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org