You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/06/16 14:37:42 UTC
[hudi] branch master updated: [HUDI-2008] Avoid the raw type usage
in some classes under hudi-utilities module (#3076)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d519c74 [HUDI-2008] Avoid the raw type usage in some classes under hudi-utilities module (#3076)
d519c74 is described below
commit d519c746264c7210a604c8fb43efd084ae69ae41
Author: Wei <hs...@163.com>
AuthorDate: Wed Jun 16 22:37:29 2021 +0800
[HUDI-2008] Avoid the raw type usage in some classes under hudi-utilities module (#3076)
---
.../main/java/org/apache/hudi/utilities/HDFSParquetImporter.java | 4 ++--
.../main/java/org/apache/hudi/utilities/HoodieClusteringJob.java | 7 ++++---
.../src/main/java/org/apache/hudi/utilities/HoodieCompactor.java | 5 +++--
.../src/main/java/org/apache/hudi/utilities/UtilHelpers.java | 7 ++++---
4 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index c9955bd..6d8769b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -144,7 +144,7 @@ public class HDFSParquetImporter implements Serializable {
// Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
- SparkRDDWriteClient client =
+ SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, cfg.parallelism, Option.empty(), props);
JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr);
@@ -206,7 +206,7 @@ public class HDFSParquetImporter implements Serializable {
* @param hoodieRecords Hoodie Records
* @param <T> Type
*/
- protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(SparkRDDWriteClient client, String instantTime,
+ protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(SparkRDDWriteClient<T> client, String instantTime,
JavaRDD<HoodieRecord<T>> hoodieRecords) {
switch (cfg.command.toLowerCase()) {
case "upsert": {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index 1e10eef..65cb404 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -27,6 +27,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
@@ -148,10 +149,10 @@ public class HoodieClusteringJob {
private int doCluster(JavaSparkContext jsc) throws Exception {
String schemaStr = getSchemaFromLatestInstant();
- SparkRDDWriteClient client =
+ SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
JavaRDD<WriteStatus> writeResponse =
- (JavaRDD<WriteStatus>) client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses();
+ client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses();
return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse);
}
@@ -162,7 +163,7 @@ public class HoodieClusteringJob {
private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
String schemaStr = getSchemaFromLatestInstant();
- SparkRDDWriteClient client =
+ SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
if (cfg.clusteringInstantTime != null) {
client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty());
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index c1493e6..1996fb8 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import com.beust.jcommander.JCommander;
@@ -131,9 +132,9 @@ public class HoodieCompactor {
private int doCompact(JavaSparkContext jsc) throws Exception {
// Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
- SparkRDDWriteClient client =
+ SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
- JavaRDD<WriteStatus> writeResponse = (JavaRDD<WriteStatus>) client.compact(cfg.compactionInstantTime);
+ JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index e7a8b3c..79a495e 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -274,8 +275,8 @@ public class UtilHelpers {
* @param schemaStr Schema
* @param parallelism Parallelism
*/
- public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
- int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
+ public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
+ int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
HoodieCompactionConfig compactionConfig = compactionStrategyClass
.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
@@ -288,7 +289,7 @@ public class UtilHelpers {
.withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(properties).build();
- return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config);
+ return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), config);
}
public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD<WriteStatus> writeResponse) {