You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/05/21 20:07:38 UTC
[incubator-hudi] branch master updated: Remove redundant string
from file comp rdd
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3fd2fd6 Remove redundant string from file comp rdd
3fd2fd6 is described below
commit 3fd2fd6e9dbf8c03bcc742b883fa2a0598ad99be
Author: David Muto (pseudomuto) <da...@gmail.com>
AuthorDate: Fri May 10 10:10:51 2019 -0400
Remove redundant string from file comp rdd
---
.../bloom/BucketizedBloomCheckPartitioner.java | 11 ++--
.../uber/hoodie/index/bloom/HoodieBloomIndex.java | 63 +++++++++++-----------
.../index/bloom/HoodieBloomIndexCheckFunction.java | 16 +++---
.../hoodie/index/bloom/HoodieGlobalBloomIndex.java | 21 ++++----
.../hoodie/index/bloom/TestHoodieBloomIndex.java | 4 +-
.../index/bloom/TestHoodieGlobalBloomIndex.java | 32 +++++------
6 files changed, 75 insertions(+), 72 deletions(-)
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java
index 5bc47e2..8ce72c3 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java
@@ -19,12 +19,14 @@ package com.uber.hoodie.index.bloom;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hashing;
+import com.uber.hoodie.common.util.collection.Pair;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -139,11 +141,10 @@ public class BucketizedBloomCheckPartitioner extends Partitioner {
@Override
public int getPartition(Object key) {
- String[] parts = ((String) key).split("#");
- String fileName = parts[0];
- final long hashOfKey = Hashing.md5().hashString(parts[1], StandardCharsets.UTF_8).asLong();
- List<Integer> candidatePartitions = fileGroupToPartitions.get(fileName);
- int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size());
+ final Pair<String, String> parts = (Pair<String, String>) key;
+ final long hashOfKey = Hashing.md5().hashString(parts.getRight(), StandardCharsets.UTF_8).asLong();
+ final List<Integer> candidatePartitions = fileGroupToPartitions.get(parts.getLeft());
+ final int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size());
assert idx >= 0;
return candidatePartitions.get(idx);
}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
index b5ef59e..8dc15c1 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
@@ -34,6 +34,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.MetadataNotFoundException;
import com.uber.hoodie.index.HoodieIndex;
@@ -42,9 +43,12 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -171,7 +175,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
// we will just try exploding the input and then count to determine comparisons
// FIX(vc): Only do sampling here and extrapolate?
fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo,
- partitionRecordKeyPairRDD).mapToPair(t -> t._2()).countByKey();
+ partitionRecordKeyPairRDD).mapToPair(t -> t).countByKey();
} else {
fileToComparisons = new HashMap<>();
partitionToFileInfo.entrySet().stream().forEach(e -> {
@@ -290,8 +294,6 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
return true;
}
-
-
/**
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
@@ -301,24 +303,21 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
* recordKey ranges in the index info.
*/
@VisibleForTesting
- JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
+ JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
IndexFileFilter indexFileFilter = config.useBloomIndexTreebasedFilter()
? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
+
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();
- List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
- indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> {
- recordComparisons.add(
- new Tuple2<>(String.format("%s#%s", matchingFile, recordKey),
- new Tuple2<>(matchingFile,
- new HoodieKey(recordKey, partitionPath))));
- });
- return recordComparisons;
- }).flatMapToPair(List::iterator);
+
+ return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
+ .map(matchingFile -> new Tuple2<>(matchingFile, new HoodieKey(recordKey, partitionPath)))
+ .collect(Collectors.toList());
+ }).flatMap(List::iterator);
}
/**
@@ -332,28 +331,32 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTableMetaClient metaClient,
Map<String, Long> fileGroupToComparisons) {
- JavaPairRDD<String, Tuple2<String, HoodieKey>> fileSortedTripletRDD =
+ JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
+
if (config.useBloomIndexBucketizedChecking()) {
- BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism,
- fileGroupToComparisons, config.getBloomIndexKeysPerBucket());
- fileSortedTripletRDD = fileSortedTripletRDD.repartitionAndSortWithinPartitions(partitioner);
+ Partitioner partitioner = new BucketizedBloomCheckPartitioner(
+ shuffleParallelism,
+ fileGroupToComparisons,
+ config.getBloomIndexKeysPerBucket()
+ );
+
+ fileComparisonsRDD = fileComparisonsRDD
+ .mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
+ .repartitionAndSortWithinPartitions(partitioner)
+ .map(Tuple2::_2);
} else {
- // sort further based on filename, such that all checking for the file can happen within
- // a single partition, on-the-fly
- fileSortedTripletRDD = fileSortedTripletRDD.sortByKey(true, shuffleParallelism);
+ fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
}
- return fileSortedTripletRDD.mapPartitionsWithIndex(
- new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true)
+
+ return fileComparisonsRDD
+ .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true)
.flatMap(List::iterator)
- .filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0)
- .flatMapToPair(lookupResult -> {
- List<Tuple2<String, String>> vals = new ArrayList<>();
- for (String recordKey : lookupResult.getMatchingRecordKeys()) {
- vals.add(new Tuple2<>(recordKey, lookupResult.getFileName()));
- }
- return vals.iterator();
- });
+ .filter(lr -> lr.getMatchingRecordKeys().size() > 0)
+ .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
+ .map(recordKey -> new Tuple2<>(recordKey, lookupResult.getFileName()))
+ .collect(Collectors.toList())
+ .iterator());
}
/**
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
index a313d2d..4595658 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
@@ -43,7 +43,7 @@ import scala.Tuple2;
* actual files
*/
public class HoodieBloomIndexCheckFunction implements
- Function2<Integer, Iterator<Tuple2<String, Tuple2<String, HoodieKey>>>,
+ Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<KeyLookupResult>>> {
private static Logger logger = LogManager.getLogger(HoodieBloomIndexCheckFunction.class);
@@ -84,13 +84,13 @@ public class HoodieBloomIndexCheckFunction implements
@Override
public Iterator<List<KeyLookupResult>> call(Integer partition,
- Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> fileParitionRecordKeyTripletItr)
+ Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr)
throws Exception {
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
}
class LazyKeyCheckIterator extends
- LazyIterableIterator<Tuple2<String, Tuple2<String, HoodieKey>>, List<KeyLookupResult>> {
+ LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {
private List<String> candidateRecordKeys;
@@ -103,7 +103,7 @@ public class HoodieBloomIndexCheckFunction implements
private long totalKeysChecked;
LazyKeyCheckIterator(
- Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> filePartitionRecordKeyTripletItr) {
+ Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
super(filePartitionRecordKeyTripletItr);
currentFile = null;
candidateRecordKeys = new ArrayList<>();
@@ -162,10 +162,10 @@ public class HoodieBloomIndexCheckFunction implements
try {
// process one file in each go.
while (inputItr.hasNext()) {
- Tuple2<String, Tuple2<String, HoodieKey>> currentTuple = inputItr.next();
- String fileName = currentTuple._2._1;
- String partitionPath = currentTuple._2._2.getPartitionPath();
- String recordKey = currentTuple._2._2.getRecordKey();
+ Tuple2<String, HoodieKey> currentTuple = inputItr.next();
+ String fileName = currentTuple._1;
+ String partitionPath = currentTuple._2.getPartitionPath();
+ String recordKey = currentTuple._2.getRecordKey();
// lazily init state
if (currentFile == null) {
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
index 05a5aa7..7bf85cb 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
@@ -27,12 +27,14 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
@@ -76,7 +78,7 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
@Override
@VisibleForTesting
- JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
+ JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
Map<String, String> indexToPartitionMap = new HashMap<>();
@@ -87,17 +89,14 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges()
? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedGlobalIndexFileFilter(partitionToFileIndexInfo);
+
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();
- List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
- indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> {
- recordComparisons.add(
- new Tuple2<>(String.format("%s#%s", matchingFile, recordKey),
- new Tuple2<>(matchingFile,
- new HoodieKey(recordKey, indexToPartitionMap.get(matchingFile)))));
- });
- return recordComparisons;
- }).flatMapToPair(List::iterator);
+
+ return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
+ .map(file -> new Tuple2<>(file, new HoodieKey(recordKey, indexToPartitionMap.get(file))))
+ .collect(Collectors.toList());
+ }).flatMap(List::iterator);
}
}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
index 664c146..277c9a3 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
@@ -259,12 +259,12 @@ public class TestHoodieBloomIndex {
new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"),
new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t);
- List<Tuple2<String, Tuple2<String, HoodieKey>>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
+ List<Tuple2<String, HoodieKey>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();
assertEquals(10, comparisonKeyList.size());
Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy(
- t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList())));
+ t -> t._2.getRecordKey(), Collectors.mapping(t -> t._1, Collectors.toList())));
assertEquals(4, recordKeyToFileComps.size());
assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002")));
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
index 0228c2b..b220231 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -190,25 +190,25 @@ public class TestHoodieGlobalBloomIndex {
new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"),
new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t);
- List<Tuple2<String, Tuple2<String, HoodieKey>>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
- partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();
-
- /* epecting:
- f4#003, f4, HoodieKey { recordKey=003 partitionPath=2017/10/23}
- f1#003, f1, HoodieKey { recordKey=003 partitionPath=2017/10/22}
- f3#003, f3, HoodieKey { recordKey=003 partitionPath=2017/10/22}
- f4#002, f4, HoodieKey { recordKey=002 partitionPath=2017/10/23}
- f1#002, f1, HoodieKey { recordKey=002 partitionPath=2017/10/22}
- f3#002, f3, HoodieKey { recordKey=002 partitionPath=2017/10/22}
- f4#005, f4, HoodieKey { recordKey=005 partitionPath=2017/10/23}
- f1#005, f1, HoodieKey { recordKey=005 partitionPath=2017/10/22}
- f4#004, f4, HoodieKey { recordKey=004 partitionPath=2017/10/23}
- f1#004, f1, HoodieKey { recordKey=004 partitionPath=2017/10/22}
+ List<Tuple2<String, HoodieKey>> comparisonKeyList =
+ index.explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();
+
+ /* expecting:
+ f4, HoodieKey { recordKey=003 partitionPath=2017/10/23}
+ f1, HoodieKey { recordKey=003 partitionPath=2017/10/22}
+ f3, HoodieKey { recordKey=003 partitionPath=2017/10/22}
+ f4, HoodieKey { recordKey=002 partitionPath=2017/10/23}
+ f1, HoodieKey { recordKey=002 partitionPath=2017/10/22}
+ f3, HoodieKey { recordKey=002 partitionPath=2017/10/22}
+ f4, HoodieKey { recordKey=005 partitionPath=2017/10/23}
+ f1, HoodieKey { recordKey=005 partitionPath=2017/10/22}
+ f4, HoodieKey { recordKey=004 partitionPath=2017/10/23}
+ f1, HoodieKey { recordKey=004 partitionPath=2017/10/22}
*/
assertEquals(10, comparisonKeyList.size());
- Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy(
- t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList())));
+ Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
+ .collect(Collectors.groupingBy(t -> t._2.getRecordKey(), Collectors.mapping(Tuple2::_1, Collectors.toList())));
assertEquals(4, recordKeyToFileComps.size());
assertEquals(new HashSet<>(Arrays.asList("f4", "f1", "f3")), new HashSet<>(recordKeyToFileComps.get("002")));