You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "alexeykudinkin (via GitHub)" <gi...@apache.org> on 2023/02/14 21:50:11 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7914: [HUDI-5080] Unpersist only relevant RDDs instead of all

alexeykudinkin commented on code in PR #7914:
URL: https://github.com/apache/hudi/pull/7914#discussion_r1106414946


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java:
##########
@@ -58,6 +61,7 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
   private static final Logger LOG = LogManager.getLogger(HoodieSparkEngineContext.class);
   private final JavaSparkContext javaSparkContext;
   private final SQLContext sqlContext;
+  private final Map<Pair<String, String>, List<Integer>> cachedRddIds = new ConcurrentHashMap<>();

Review Comment:
   Let's add a comment elaborating why key is (basePath, instant)
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -246,6 +246,7 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieC
         .performClustering(clusteringPlan, schema, instantTime);
     HoodieData<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses();
     HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
+    context.putCachedDataIds(config.getBasePath(), instantTime, statuses.getId());

Review Comment:
   This is quite brittle -- it's far from obvious that we need to persist cached RDD ids somewhere. 
   I'd suggest we instead modify `HoodieData.persist` to accept context and this registration internally (so that we can establish it as an invariant that any persisted RDD will be registered)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java:
##########
@@ -180,4 +184,24 @@ public Option<String> getProperty(EngineProperty key) {
   public void setJobStatus(String activeModule, String activityDescription) {
     javaSparkContext.setJobGroup(activeModule, activityDescription);
   }
+
+  @Override
+  public void putCachedDataIds(String basePath, String instantTime, int... ids) {
+    Pair<String, String> key = Pair.of(basePath, instantTime);
+    cachedRddIds.putIfAbsent(key, new ArrayList<>());

Review Comment:
   Since we're appending to ArrayList here we need to guard it w/ a lock (and since we'd have to grab lock anyways we can just use HashMap)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org