You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/05/21 20:07:38 UTC

[incubator-hudi] branch master updated: Remove redundant string from file comp rdd

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fd2fd6  Remove redundant string from file comp rdd
3fd2fd6 is described below

commit 3fd2fd6e9dbf8c03bcc742b883fa2a0598ad99be
Author: David Muto (pseudomuto) <da...@gmail.com>
AuthorDate: Fri May 10 10:10:51 2019 -0400

    Remove redundant string from file comp rdd
---
 .../bloom/BucketizedBloomCheckPartitioner.java     | 11 ++--
 .../uber/hoodie/index/bloom/HoodieBloomIndex.java  | 63 +++++++++++-----------
 .../index/bloom/HoodieBloomIndexCheckFunction.java | 16 +++---
 .../hoodie/index/bloom/HoodieGlobalBloomIndex.java | 21 ++++----
 .../hoodie/index/bloom/TestHoodieBloomIndex.java   |  4 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    | 32 +++++------
 6 files changed, 75 insertions(+), 72 deletions(-)

diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java
index 5bc47e2..8ce72c3 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java
@@ -19,12 +19,14 @@ package com.uber.hoodie.index.bloom;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.hash.Hashing;
+import com.uber.hoodie.common.util.collection.Pair;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
@@ -139,11 +141,10 @@ public class BucketizedBloomCheckPartitioner extends Partitioner {
 
   @Override
   public int getPartition(Object key) {
-    String[] parts = ((String) key).split("#");
-    String fileName = parts[0];
-    final long hashOfKey = Hashing.md5().hashString(parts[1], StandardCharsets.UTF_8).asLong();
-    List<Integer> candidatePartitions = fileGroupToPartitions.get(fileName);
-    int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size());
+    final Pair<String, String> parts = (Pair<String, String>) key;
+    final long hashOfKey = Hashing.md5().hashString(parts.getRight(), StandardCharsets.UTF_8).asLong();
+    final List<Integer> candidatePartitions = fileGroupToPartitions.get(parts.getLeft());
+    final int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size());
     assert idx >= 0;
     return candidatePartitions.get(idx);
   }
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
index b5ef59e..8dc15c1 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
@@ -34,6 +34,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.exception.MetadataNotFoundException;
 import com.uber.hoodie.index.HoodieIndex;
@@ -42,9 +43,12 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -171,7 +175,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
       // we will just try exploding the input and then count to determine comparisons
       // FIX(vc): Only do sampling here and extrapolate?
       fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo,
-          partitionRecordKeyPairRDD).mapToPair(t -> t._2()).countByKey();
+          partitionRecordKeyPairRDD).mapToPair(t -> t).countByKey();
     } else {
       fileToComparisons = new HashMap<>();
       partitionToFileInfo.entrySet().stream().forEach(e -> {
@@ -290,8 +294,6 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
     return true;
   }
 
-
-
   /**
    * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
    * checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
@@ -301,24 +303,21 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
    * recordKey ranges in the index info.
    */
   @VisibleForTesting
-  JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
+  JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
       final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
       JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
     IndexFileFilter indexFileFilter = config.useBloomIndexTreebasedFilter()
         ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
         : new ListBasedIndexFileFilter(partitionToFileIndexInfo);
+
     return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
       String recordKey = partitionRecordKeyPair._2();
       String partitionPath = partitionRecordKeyPair._1();
-      List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
-      indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> {
-        recordComparisons.add(
-            new Tuple2<>(String.format("%s#%s", matchingFile, recordKey),
-                new Tuple2<>(matchingFile,
-                    new HoodieKey(recordKey, partitionPath))));
-      });
-      return recordComparisons;
-    }).flatMapToPair(List::iterator);
+
+      return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
+          .map(matchingFile -> new Tuple2<>(matchingFile, new HoodieKey(recordKey, partitionPath)))
+          .collect(Collectors.toList());
+    }).flatMap(List::iterator);
   }
 
   /**
@@ -332,28 +331,32 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
       final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
       JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTableMetaClient metaClient,
       Map<String, Long> fileGroupToComparisons) {
-    JavaPairRDD<String, Tuple2<String, HoodieKey>> fileSortedTripletRDD =
+    JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
         explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
+
     if (config.useBloomIndexBucketizedChecking()) {
-      BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism,
-          fileGroupToComparisons, config.getBloomIndexKeysPerBucket());
-      fileSortedTripletRDD = fileSortedTripletRDD.repartitionAndSortWithinPartitions(partitioner);
+      Partitioner partitioner = new BucketizedBloomCheckPartitioner(
+          shuffleParallelism,
+          fileGroupToComparisons,
+          config.getBloomIndexKeysPerBucket()
+      );
+
+      fileComparisonsRDD = fileComparisonsRDD
+          .mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
+          .repartitionAndSortWithinPartitions(partitioner)
+          .map(Tuple2::_2);
     } else {
-      // sort further based on filename, such that all checking for the file can happen within
-      // a single partition, on-the-fly
-      fileSortedTripletRDD = fileSortedTripletRDD.sortByKey(true, shuffleParallelism);
+      fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
     }
-    return fileSortedTripletRDD.mapPartitionsWithIndex(
-        new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true)
+
+    return fileComparisonsRDD
+        .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true)
         .flatMap(List::iterator)
-        .filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0)
-        .flatMapToPair(lookupResult -> {
-          List<Tuple2<String, String>> vals = new ArrayList<>();
-          for (String recordKey : lookupResult.getMatchingRecordKeys()) {
-            vals.add(new Tuple2<>(recordKey, lookupResult.getFileName()));
-          }
-          return vals.iterator();
-        });
+        .filter(lr -> lr.getMatchingRecordKeys().size() > 0)
+        .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
+            .map(recordKey -> new Tuple2<>(recordKey, lookupResult.getFileName()))
+            .collect(Collectors.toList())
+            .iterator());
   }
 
   /**
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
index a313d2d..4595658 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
@@ -43,7 +43,7 @@ import scala.Tuple2;
  * actual files
  */
 public class HoodieBloomIndexCheckFunction implements
-    Function2<Integer, Iterator<Tuple2<String, Tuple2<String, HoodieKey>>>,
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
         Iterator<List<KeyLookupResult>>> {
 
   private static Logger logger = LogManager.getLogger(HoodieBloomIndexCheckFunction.class);
@@ -84,13 +84,13 @@ public class HoodieBloomIndexCheckFunction implements
 
   @Override
   public Iterator<List<KeyLookupResult>> call(Integer partition,
-      Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> fileParitionRecordKeyTripletItr)
+      Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr)
       throws Exception {
     return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
   }
 
   class LazyKeyCheckIterator extends
-      LazyIterableIterator<Tuple2<String, Tuple2<String, HoodieKey>>, List<KeyLookupResult>> {
+      LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {
 
     private List<String> candidateRecordKeys;
 
@@ -103,7 +103,7 @@ public class HoodieBloomIndexCheckFunction implements
     private long totalKeysChecked;
 
     LazyKeyCheckIterator(
-        Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> filePartitionRecordKeyTripletItr) {
+        Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
       super(filePartitionRecordKeyTripletItr);
       currentFile = null;
       candidateRecordKeys = new ArrayList<>();
@@ -162,10 +162,10 @@ public class HoodieBloomIndexCheckFunction implements
       try {
         // process one file in each go.
         while (inputItr.hasNext()) {
-          Tuple2<String, Tuple2<String, HoodieKey>> currentTuple = inputItr.next();
-          String fileName = currentTuple._2._1;
-          String partitionPath = currentTuple._2._2.getPartitionPath();
-          String recordKey = currentTuple._2._2.getRecordKey();
+          Tuple2<String, HoodieKey> currentTuple = inputItr.next();
+          String fileName = currentTuple._1;
+          String partitionPath = currentTuple._2.getPartitionPath();
+          String recordKey = currentTuple._2.getRecordKey();
 
           // lazily init state
           if (currentFile == null) {
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
index 05a5aa7..7bf85cb 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
@@ -27,12 +27,14 @@ import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.exception.HoodieIOException;
 import com.uber.hoodie.table.HoodieTable;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import scala.Tuple2;
 
@@ -76,7 +78,7 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
 
   @Override
   @VisibleForTesting
-  JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
+  JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
       final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
       JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
     Map<String, String> indexToPartitionMap = new HashMap<>();
@@ -87,17 +89,14 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
     IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges()
         ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
         : new ListBasedGlobalIndexFileFilter(partitionToFileIndexInfo);
+
     return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
       String recordKey = partitionRecordKeyPair._2();
       String partitionPath = partitionRecordKeyPair._1();
-      List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
-      indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> {
-        recordComparisons.add(
-            new Tuple2<>(String.format("%s#%s", matchingFile, recordKey),
-                new Tuple2<>(matchingFile,
-                    new HoodieKey(recordKey, indexToPartitionMap.get(matchingFile)))));
-      });
-      return recordComparisons;
-    }).flatMapToPair(List::iterator);
+
+      return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
+          .map(file -> new Tuple2<>(file, new HoodieKey(recordKey, indexToPartitionMap.get(file))))
+          .collect(Collectors.toList());
+    }).flatMap(List::iterator);
   }
 }
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
index 664c146..277c9a3 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
@@ -259,12 +259,12 @@ public class TestHoodieBloomIndex {
         new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"),
         new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t);
 
-    List<Tuple2<String, Tuple2<String, HoodieKey>>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
+    List<Tuple2<String, HoodieKey>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
         partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();
 
     assertEquals(10, comparisonKeyList.size());
     Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy(
-        t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList())));
+        t -> t._2.getRecordKey(), Collectors.mapping(t -> t._1, Collectors.toList())));
 
     assertEquals(4, recordKeyToFileComps.size());
     assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002")));
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
index 0228c2b..b220231 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -190,25 +190,25 @@ public class TestHoodieGlobalBloomIndex {
         new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"),
         new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t);
 
-    List<Tuple2<String, Tuple2<String, HoodieKey>>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
-        partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();
-
-    /* epecting:
-      f4#003, f4, HoodieKey { recordKey=003 partitionPath=2017/10/23}
-      f1#003, f1, HoodieKey { recordKey=003 partitionPath=2017/10/22}
-      f3#003, f3, HoodieKey { recordKey=003 partitionPath=2017/10/22}
-      f4#002, f4, HoodieKey { recordKey=002 partitionPath=2017/10/23}
-      f1#002, f1, HoodieKey { recordKey=002 partitionPath=2017/10/22}
-      f3#002, f3, HoodieKey { recordKey=002 partitionPath=2017/10/22}
-      f4#005, f4, HoodieKey { recordKey=005 partitionPath=2017/10/23}
-      f1#005, f1, HoodieKey { recordKey=005 partitionPath=2017/10/22}
-      f4#004, f4, HoodieKey { recordKey=004 partitionPath=2017/10/23}
-      f1#004, f1, HoodieKey { recordKey=004 partitionPath=2017/10/22}
+    List<Tuple2<String, HoodieKey>> comparisonKeyList =
+        index.explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();
+
+    /* expecting:
+      f4, HoodieKey { recordKey=003 partitionPath=2017/10/23}
+      f1, HoodieKey { recordKey=003 partitionPath=2017/10/22}
+      f3, HoodieKey { recordKey=003 partitionPath=2017/10/22}
+      f4, HoodieKey { recordKey=002 partitionPath=2017/10/23}
+      f1, HoodieKey { recordKey=002 partitionPath=2017/10/22}
+      f3, HoodieKey { recordKey=002 partitionPath=2017/10/22}
+      f4, HoodieKey { recordKey=005 partitionPath=2017/10/23}
+      f1, HoodieKey { recordKey=005 partitionPath=2017/10/22}
+      f4, HoodieKey { recordKey=004 partitionPath=2017/10/23}
+      f1, HoodieKey { recordKey=004 partitionPath=2017/10/22}
      */
     assertEquals(10, comparisonKeyList.size());
 
-    Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy(
-        t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList())));
+    Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
+        .collect(Collectors.groupingBy(t -> t._2.getRecordKey(), Collectors.mapping(Tuple2::_1, Collectors.toList())));
 
     assertEquals(4, recordKeyToFileComps.size());
     assertEquals(new HashSet<>(Arrays.asList("f4", "f1", "f3")), new HashSet<>(recordKeyToFileComps.get("002")));