You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/06/16 14:37:42 UTC

[hudi] branch master updated: [HUDI-2008] Avoid the raw type usage in some classes under hudi-utilities module (#3076)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d519c74  [HUDI-2008] Avoid the raw type usage in some classes under hudi-utilities module (#3076)
d519c74 is described below

commit d519c746264c7210a604c8fb43efd084ae69ae41
Author: Wei <hs...@163.com>
AuthorDate: Wed Jun 16 22:37:29 2021 +0800

    [HUDI-2008] Avoid the raw type usage in some classes under hudi-utilities module (#3076)
---
 .../main/java/org/apache/hudi/utilities/HDFSParquetImporter.java   | 4 ++--
 .../main/java/org/apache/hudi/utilities/HoodieClusteringJob.java   | 7 ++++---
 .../src/main/java/org/apache/hudi/utilities/HoodieCompactor.java   | 5 +++--
 .../src/main/java/org/apache/hudi/utilities/UtilHelpers.java       | 7 ++++---
 4 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index c9955bd..6d8769b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -144,7 +144,7 @@ public class HDFSParquetImporter implements Serializable {
       // Get schema.
       String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
 
-      SparkRDDWriteClient client =
+      SparkRDDWriteClient<HoodieRecordPayload> client =
           UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, cfg.parallelism, Option.empty(), props);
 
       JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr);
@@ -206,7 +206,7 @@ public class HDFSParquetImporter implements Serializable {
    * @param hoodieRecords Hoodie Records
    * @param <T> Type
    */
-  protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(SparkRDDWriteClient client, String instantTime,
+  protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(SparkRDDWriteClient<T> client, String instantTime,
                                                                       JavaRDD<HoodieRecord<T>> hoodieRecords) {
     switch (cfg.command.toLowerCase()) {
       case "upsert": {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index 1e10eef..65cb404 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -27,6 +27,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.Option;
@@ -148,10 +149,10 @@ public class HoodieClusteringJob {
 
   private int doCluster(JavaSparkContext jsc) throws Exception {
     String schemaStr = getSchemaFromLatestInstant();
-    SparkRDDWriteClient client =
+    SparkRDDWriteClient<HoodieRecordPayload> client =
         UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
     JavaRDD<WriteStatus> writeResponse =
-        (JavaRDD<WriteStatus>) client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses();
+        client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses();
     return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse);
   }
 
@@ -162,7 +163,7 @@ public class HoodieClusteringJob {
 
   private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
     String schemaStr = getSchemaFromLatestInstant();
-    SparkRDDWriteClient client =
+    SparkRDDWriteClient<HoodieRecordPayload> client =
         UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
     if (cfg.clusteringInstantTime != null) {
       client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty());
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index c1493e6..1996fb8 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
 
 import com.beust.jcommander.JCommander;
@@ -131,9 +132,9 @@ public class HoodieCompactor {
   private int doCompact(JavaSparkContext jsc) throws Exception {
     // Get schema.
     String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
-    SparkRDDWriteClient client =
+    SparkRDDWriteClient<HoodieRecordPayload> client =
         UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
-    JavaRDD<WriteStatus> writeResponse = (JavaRDD<WriteStatus>) client.compact(cfg.compactionInstantTime);
+    JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
     return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index e7a8b3c..79a495e 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -274,8 +275,8 @@ public class UtilHelpers {
    * @param schemaStr   Schema
    * @param parallelism Parallelism
    */
-  public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
-                                                             int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
+  public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
+                                                                            int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
     HoodieCompactionConfig compactionConfig = compactionStrategyClass
         .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
             .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
@@ -288,7 +289,7 @@ public class UtilHelpers {
             .withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig)
             .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
             .withProps(properties).build();
-    return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config);
+    return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), config);
   }
 
   public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD<WriteStatus> writeResponse) {