You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/10 01:35:51 UTC

[GitHub] [hudi] yihua commented on a change in pull request #4856: [HUDI-2439] Replace RDD with HoodieData in HoodieSparkTable and commit executors

yihua commented on a change in pull request #4856:
URL: https://github.com/apache/hudi/pull/4856#discussion_r823226342



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
##########
@@ -48,69 +45,64 @@
  * @param <T>
  */
 @SuppressWarnings("checkstyle:LineLength")
-public class SparkDeleteHelper<T extends HoodieRecordPayload,R> extends
-    BaseDeleteHelper<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
-  private SparkDeleteHelper() {
+public class HoodieDeleteHelper<T extends HoodieRecordPayload, R> extends
+    BaseDeleteHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
+  private HoodieDeleteHelper() {
   }
 
   private static class DeleteHelperHolder {
-    private static final SparkDeleteHelper SPARK_DELETE_HELPER = new SparkDeleteHelper();
+    private static final HoodieDeleteHelper HOODIE_DELETE_HELPER = new HoodieDeleteHelper<>();
   }
 
-  public static SparkDeleteHelper newInstance() {
-    return DeleteHelperHolder.SPARK_DELETE_HELPER;
+  public static HoodieDeleteHelper newInstance() {
+    return DeleteHelperHolder.HOODIE_DELETE_HELPER;
   }
 
   @Override
-  public JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, int parallelism) {
+  public HoodieData<HoodieKey> deduplicateKeys(HoodieData<HoodieKey> keys, HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table, int parallelism) {
     boolean isIndexingGlobal = table.getIndex().isGlobal();
     if (isIndexingGlobal) {
-      return keys.keyBy(HoodieKey::getRecordKey)
-          .reduceByKey((key1, key2) -> key1, parallelism)
-          .values();
+      return keys.distinctWithKey(HoodieKey::getRecordKey, parallelism);
     } else {
-      return keys.distinct(parallelism);
+      return keys.distinct();

Review comment:
       Should the parallelism still be passed in here?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
##########
@@ -48,69 +45,64 @@
  * @param <T>
  */
 @SuppressWarnings("checkstyle:LineLength")
-public class SparkDeleteHelper<T extends HoodieRecordPayload,R> extends
-    BaseDeleteHelper<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
-  private SparkDeleteHelper() {
+public class HoodieDeleteHelper<T extends HoodieRecordPayload, R> extends
+    BaseDeleteHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
+  private HoodieDeleteHelper() {
   }
 
   private static class DeleteHelperHolder {
-    private static final SparkDeleteHelper SPARK_DELETE_HELPER = new SparkDeleteHelper();
+    private static final HoodieDeleteHelper HOODIE_DELETE_HELPER = new HoodieDeleteHelper<>();
   }
 
-  public static SparkDeleteHelper newInstance() {
-    return DeleteHelperHolder.SPARK_DELETE_HELPER;
+  public static HoodieDeleteHelper newInstance() {
+    return DeleteHelperHolder.HOODIE_DELETE_HELPER;
   }
 
   @Override
-  public JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, int parallelism) {
+  public HoodieData<HoodieKey> deduplicateKeys(HoodieData<HoodieKey> keys, HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table, int parallelism) {
     boolean isIndexingGlobal = table.getIndex().isGlobal();
     if (isIndexingGlobal) {
-      return keys.keyBy(HoodieKey::getRecordKey)
-          .reduceByKey((key1, key2) -> key1, parallelism)
-          .values();
+      return keys.distinctWithKey(HoodieKey::getRecordKey, parallelism);
     } else {
-      return keys.distinct(parallelism);
+      return keys.distinct();
     }
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(String instantTime,
-                                                           JavaRDD<HoodieKey> keys,
-                                                           HoodieEngineContext context,
-                                                           HoodieWriteConfig config,
-                                                           HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                                           BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> deleteExecutor) {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(String instantTime,
+                                                              HoodieData<HoodieKey> keys,
+                                                              HoodieEngineContext context,
+                                                              HoodieWriteConfig config,
+                                                              HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
+                                                              BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> deleteExecutor) {
     try {
-      HoodieWriteMetadata result = null;
-      JavaRDD<HoodieKey> dedupedKeys = keys;
+      HoodieData<HoodieKey> dedupedKeys = keys;
       final int parallelism = config.getDeleteShuffleParallelism();
       if (config.shouldCombineBeforeDelete()) {
         // De-dupe/merge if needed
         dedupedKeys = deduplicateKeys(keys, table, parallelism);
-      } else if (!keys.partitions().isEmpty()) {
+      } else if (keys.hasPartitions()) {

Review comment:
       Should this be replaced by `keys.isEmpty()`?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
##########
@@ -48,69 +45,64 @@
  * @param <T>
  */
 @SuppressWarnings("checkstyle:LineLength")
-public class SparkDeleteHelper<T extends HoodieRecordPayload,R> extends
-    BaseDeleteHelper<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
-  private SparkDeleteHelper() {
+public class HoodieDeleteHelper<T extends HoodieRecordPayload, R> extends

Review comment:
       Should `FlinkDeleteHelper` and `JavaDeleteHelper` be deleted and replaced by this class?  And wondering if `HoodieDeleteHelper` can be the root of the class hierarchy, i.e., combining `HoodieDeleteHelper` and `BaseDeleteHelper`?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
##########
@@ -19,60 +19,52 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.spark.api.java.JavaRDD;
+public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,

Review comment:
       Similar here for the write helper regarding Flink, Java, and BaseWriteHelper.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
##########
@@ -37,31 +38,29 @@
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.api.java.JavaRDD;
 
 import java.io.IOException;
 import java.util.Iterator;
 
-public class SparkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHelper<T, JavaRDD<HoodieRecord<T>>,
-    JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+public class HoodieMergeHelper<T extends HoodieRecordPayload> extends

Review comment:
       Similar here for the merge helper regarding Flink, Java, and BaseMergeHelper.

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
##########
@@ -94,7 +95,7 @@ public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext
   public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {
     JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
     // execute clustering for each group async and collect WriteStatus
-    Stream<JavaRDD<WriteStatus>> writeStatusRDDStream = FutureUtils.allOf(
+    Stream<HoodieData<WriteStatus>> writeStatusRDDStream = FutureUtils.allOf(

Review comment:
       nit: rename variable?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -224,44 +232,50 @@ public HoodieWriteResult insertOverwriteTable(JavaRDD<HoodieRecord<T>> records,
   }
 
   @Override
-  public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
-    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+  public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner<?>> userDefinedBulkInsertPartitioner) {
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.BULK_INSERT, Option.ofNullable(instantTime));
     table.validateInsertSchema();
     preWrite(instantTime, WriteOperationType.BULK_INSERT, table.getMetaClient());
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsert(context,instantTime, records, userDefinedBulkInsertPartitioner);
-    return postWrite(result, instantTime, table);
+    Option<BulkInsertPartitioner<?>> partitionerOption = Option.ofNullable(userDefinedBulkInsertPartitioner.orElse(null));
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.bulkInsert(context,instantTime, HoodieJavaRDD.of(records), partitionerOption);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    return postWrite(resultRDD, instantTime, table);
   }
 
   @Override
-  public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
-    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+  public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner<?>> bulkInsertPartitioner) {
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime));
     table.validateInsertSchema();
     preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient());
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsertPrepped(context,instantTime, preppedRecords, bulkInsertPartitioner);
-    return postWrite(result, instantTime, table);
+    Option<BulkInsertPartitioner<?>> partitionerOption = Option.ofNullable(bulkInsertPartitioner.orElse(null));

Review comment:
       wondering if you tested the user-defined bulk insert partitioner to see if the type wildcard does not cause any issue along the bulk insert write path.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
##########
@@ -110,6 +113,16 @@ public long count() {
         Collectors.toMap(Map.Entry::getKey, entry -> (long) entry.getValue().size()));
   }
 
+  @Override
+  public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> func, int parallelism) {

Review comment:
       Add unit tests for this?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
##########
@@ -132,6 +134,26 @@ public long count() {
     return HoodieList.of(new ArrayList<>(new HashSet<>(listData)));
   }
 
+  @Override
+  public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> keyGetter, int parallelism) {
+    Set<O> set = listData.stream().map(i -> throwingMapWrapper(keyGetter).apply(i)).collect(Collectors.toSet());
+    List<T> distinctList = new LinkedList<>();
+    listData.forEach(x -> {
+      if (set.contains(throwingMapWrapper(keyGetter).apply(x))) {
+        distinctList.add(x);
+      }
+    });
+    return HoodieList.of(distinctList);

Review comment:
       It seems that the result can still have multiple objects with the same key.  But the `distinct` should only give back one object per key?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
##########
@@ -41,21 +40,17 @@
 
   public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context,
                                                        HoodieWriteConfig config, HoodieTable table,
-                                                       String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+                                                       String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
     super(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE_TABLE);
   }
 
   @Override
-  protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
-    Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
+  protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
     List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    if (partitionPaths != null && partitionPaths.size() > 0) {
-      context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
-      JavaRDD<String> partitionPathRdd = jsc.parallelize(partitionPaths, partitionPaths.size());
-      partitionToExistingFileIds = partitionPathRdd.mapToPair(
-          partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
+    if (partitionPaths == null || partitionPaths.isEmpty()) {
+      return Collections.emptyMap();
     }
-    return partitionToExistingFileIds;
+    return HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitionPaths, partitionPaths.size()).mapToPair(

Review comment:
       Need to set the job status before returning?
   ```
   context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
   ```

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -363,7 +365,7 @@ public void completeCompaction(
       // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
       // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
       table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(
-          w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
+          w -> ((HoodieTableMetadataWriter) w).update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));

Review comment:
       nit: seems that the type cast is not needed?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -397,7 +398,7 @@ protected void rollbackFailedBootstrap() {
    * @return Collection of WriteStatus to inspect errors and counts
    */
   public abstract O bulkInsert(I records, final String instantTime,
-                               Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
+                               Option<BulkInsertPartitioner<?>> userDefinedBulkInsertPartitioner);

Review comment:
       Not required in this PR to limit the scope.  Should BulkInsertPartitioner take in HoodieData as well?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestDeleteHelper.java
##########
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.commit;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
-import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.HoodieWriteMetadata;
-
-import org.apache.spark.Partition;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class TestDeleteHelper {

Review comment:
       Should we still keep this set of tests?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org