You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/01/04 17:05:57 UTC
[incubator-hudi] branch master updated: [MINOR] Optimize hudi-cli
module (#1136)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a733f4e [MINOR] Optimize hudi-cli module (#1136)
a733f4e is described below
commit a733f4ef723865738d8541282c0c7234d64668db
Author: SteNicholas <pr...@163.com>
AuthorDate: Sun Jan 5 01:05:50 2020 +0800
[MINOR] Optimize hudi-cli module (#1136)
---
.../org/apache/hudi/cli/HoodiePrintHelper.java | 2 +-
.../src/main/java/org/apache/hudi/cli/Table.java | 36 ++++++---------
.../hudi/cli/commands/ArchivedCommitsCommand.java | 53 +++++++++-------------
.../apache/hudi/cli/commands/CleansCommand.java | 13 +++---
.../apache/hudi/cli/commands/CommitsCommand.java | 18 +++-----
.../hudi/cli/commands/CompactionCommand.java | 31 ++++++-------
.../apache/hudi/cli/commands/DatasetsCommand.java | 12 ++---
.../hudi/cli/commands/FileSystemViewCommand.java | 17 +++----
.../cli/commands/HDFSParquetImportCommand.java | 4 +-
.../hudi/cli/commands/HoodieLogFileCommand.java | 29 ++++++------
.../hudi/cli/commands/HoodieSyncCommand.java | 45 ++++++++----------
.../apache/hudi/cli/commands/RepairsCommand.java | 2 +-
.../apache/hudi/cli/commands/RollbacksCommand.java | 26 +++++------
.../hudi/cli/commands/SavepointsCommand.java | 4 +-
.../org/apache/hudi/cli/commands/SparkMain.java | 6 +--
.../org/apache/hudi/cli/commands/StatsCommand.java | 8 ++--
.../java/org/apache/hudi/cli/utils/SparkUtil.java | 12 ++---
17 files changed, 142 insertions(+), 176 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
index 5325432..53114ce 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
@@ -84,7 +84,7 @@ public class HoodiePrintHelper {
buffer.getFieldNames().toArray(header);
String[][] rows =
- buffer.getRenderRows().stream().map(l -> l.stream().toArray(String[]::new)).toArray(String[][]::new);
+ buffer.getRenderRows().stream().map(l -> l.toArray(new String[l.size()])).toArray(String[][]::new);
return printTextTable(header, rows);
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java b/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
index bebc7fc..8158eef 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
@@ -22,7 +22,6 @@ import org.apache.hudi.common.util.Option;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -89,7 +88,7 @@ public class Table implements Iterable<List<String>> {
* @return
*/
public Table addAll(List<List<Comparable>> rows) {
- rows.forEach(r -> add(r));
+ rows.forEach(this::add);
return this;
}
@@ -120,16 +119,11 @@ public class Table implements Iterable<List<String>> {
*/
private List<List<Comparable>> orderRows() {
return orderingFieldNameOptional.map(orderingColumnName -> {
- return rawRows.stream().sorted(new Comparator<List<Comparable>>() {
- @Override
- public int compare(List<Comparable> row1, List<Comparable> row2) {
- Comparable fieldForRow1 = row1.get(rowHeader.indexOf(orderingColumnName));
- Comparable fieldForRow2 = row2.get(rowHeader.indexOf(orderingColumnName));
- int cmpRawResult = fieldForRow1.compareTo(fieldForRow2);
- return isDescendingOptional.map(isDescending -> {
- return isDescending ? -1 * cmpRawResult : cmpRawResult;
- }).orElse(cmpRawResult);
- }
+ return rawRows.stream().sorted((row1, row2) -> {
+ Comparable fieldForRow1 = row1.get(rowHeader.indexOf(orderingColumnName));
+ Comparable fieldForRow2 = row2.get(rowHeader.indexOf(orderingColumnName));
+ int cmpRawResult = fieldForRow1.compareTo(fieldForRow2);
+ return isDescendingOptional.map(isDescending -> isDescending ? -1 * cmpRawResult : cmpRawResult).orElse(cmpRawResult);
}).collect(Collectors.toList());
}).orElse(rawRows);
}
@@ -141,16 +135,14 @@ public class Table implements Iterable<List<String>> {
this.renderRows = new ArrayList<>();
final int limit = this.limitOptional.orElse(rawRows.size());
final List<List<Comparable>> orderedRows = orderRows();
- renderRows = orderedRows.stream().limit(limit).map(row -> {
- return IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
- String fieldName = rowHeader.get(idx);
- if (fieldNameToConverterMap.containsKey(fieldName)) {
- return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
- }
- Object v = row.get(idx);
- return v == null ? "null" : v.toString();
- }).collect(Collectors.toList());
- }).collect(Collectors.toList());
+ renderRows = orderedRows.stream().limit(limit).map(row -> IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
+ String fieldName = rowHeader.get(idx);
+ if (fieldNameToConverterMap.containsKey(fieldName)) {
+ return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
+ }
+ Object v = row.get(idx);
+ return v == null ? "null" : v.toString();
+ }).collect(Collectors.toList())).collect(Collectors.toList());
}
@Override
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index c8f1dc8..f455504 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -89,29 +89,27 @@ public class ArchivedCommitsCommand implements CommandMarker {
.deepCopy(HoodieCommitMetadata.SCHEMA$, r.get("hoodieCommitMetadata"));
final String instantTime = r.get("commitTime").toString();
final String action = r.get("actionType").toString();
- return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> {
- return hoodieWriteStats.stream().map(hoodieWriteStat -> {
- List<Comparable> row = new ArrayList<>();
- row.add(action);
- row.add(instantTime);
- row.add(hoodieWriteStat.getPartitionPath());
- row.add(hoodieWriteStat.getFileId());
- row.add(hoodieWriteStat.getPrevCommit());
- row.add(hoodieWriteStat.getNumWrites());
- row.add(hoodieWriteStat.getNumInserts());
- row.add(hoodieWriteStat.getNumDeletes());
- row.add(hoodieWriteStat.getNumUpdateWrites());
- row.add(hoodieWriteStat.getTotalLogFiles());
- row.add(hoodieWriteStat.getTotalLogBlocks());
- row.add(hoodieWriteStat.getTotalCorruptLogBlock());
- row.add(hoodieWriteStat.getTotalRollbackBlocks());
- row.add(hoodieWriteStat.getTotalLogRecords());
- row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted());
- row.add(hoodieWriteStat.getTotalWriteBytes());
- row.add(hoodieWriteStat.getTotalWriteErrors());
- return row;
- });
- }).map(rowList -> rowList.toArray(new Comparable[0]));
+ return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> hoodieWriteStats.stream().map(hoodieWriteStat -> {
+ List<Comparable> row = new ArrayList<>();
+ row.add(action);
+ row.add(instantTime);
+ row.add(hoodieWriteStat.getPartitionPath());
+ row.add(hoodieWriteStat.getFileId());
+ row.add(hoodieWriteStat.getPrevCommit());
+ row.add(hoodieWriteStat.getNumWrites());
+ row.add(hoodieWriteStat.getNumInserts());
+ row.add(hoodieWriteStat.getNumDeletes());
+ row.add(hoodieWriteStat.getNumUpdateWrites());
+ row.add(hoodieWriteStat.getTotalLogFiles());
+ row.add(hoodieWriteStat.getTotalLogBlocks());
+ row.add(hoodieWriteStat.getTotalCorruptLogBlock());
+ row.add(hoodieWriteStat.getTotalRollbackBlocks());
+ row.add(hoodieWriteStat.getTotalLogRecords());
+ row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted());
+ row.add(hoodieWriteStat.getTotalWriteBytes());
+ row.add(hoodieWriteStat.getTotalWriteErrors());
+ return row;
+ })).map(rowList -> rowList.toArray(new Comparable[0]));
}).collect(Collectors.toList());
allStats.addAll(readCommits);
reader.close();
@@ -183,14 +181,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
}
break;
}
- case HoodieTimeline.COMMIT_ACTION: {
- commitDetails.add(record.get("commitTime"));
- commitDetails.add(record.get("actionType").toString());
- if (!skipMetadata) {
- commitDetails.add(record.get("hoodieCommitMetadata").toString());
- }
- break;
- }
+ case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION: {
commitDetails.add(record.get("commitTime"));
commitDetails.add(record.get("actionType").toString());
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
index 857fb0d..7c30498 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
@@ -66,12 +66,11 @@ public class CleansCommand implements CommandMarker {
HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants();
List<HoodieInstant> cleans = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
- for (int i = 0; i < cleans.size(); i++) {
- HoodieInstant clean = cleans.get(i);
+ for (HoodieInstant clean : cleans) {
HoodieCleanMetadata cleanMetadata =
- AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
- rows.add(new Comparable[] {clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
- cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()});
+ AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
+ rows.add(new Comparable[]{clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
+ cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()});
}
TableHeader header =
@@ -110,8 +109,8 @@ public class CleansCommand implements CommandMarker {
String path = entry.getKey();
HoodieCleanPartitionMetadata stats = entry.getValue();
String policy = stats.getPolicy();
- Integer totalSuccessDeletedFiles = stats.getSuccessDeleteFiles().size();
- Integer totalFailedDeletedFiles = stats.getFailedDeleteFiles().size();
+ int totalSuccessDeletedFiles = stats.getSuccessDeleteFiles().size();
+ int totalFailedDeletedFiles = stats.getFailedDeleteFiles().size();
rows.add(new Comparable[] {path, policy, totalSuccessDeletedFiles, totalFailedDeletedFiles});
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 089f6f4..c5b2faa 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -53,7 +53,7 @@ public class CommitsCommand implements CommandMarker {
@CliCommand(value = "commits show", help = "Show the commits")
public String showCommits(
- @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
+ @CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@@ -65,8 +65,7 @@ public class CommitsCommand implements CommandMarker {
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
- for (int i = 0; i < commits.size(); i++) {
- HoodieInstant commit = commits.get(i);
+ for (HoodieInstant commit : commits) {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class);
rows.add(new Comparable[] {commit.getTimestamp(), commitMetadata.fetchTotalBytesWritten(),
@@ -76,9 +75,7 @@ public class CommitsCommand implements CommandMarker {
}
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
- fieldNameToConverterMap.put("Total Bytes Written", entry -> {
- return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
- });
+ fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))));
TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated")
@@ -95,7 +92,7 @@ public class CommitsCommand implements CommandMarker {
@CliCommand(value = "commit rollback", help = "Rollback a commit")
public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String commitTime,
- @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath)
+ @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
@@ -163,9 +160,7 @@ public class CommitsCommand implements CommandMarker {
}
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
- fieldNameToConverterMap.put("Total Bytes Written", entry -> {
- return NumericUtils.humanReadableByteCount((Long.valueOf(entry.toString())));
- });
+ fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
TableHeader header = new TableHeader().addTableHeaderField("Partition Path")
.addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated")
@@ -240,8 +235,7 @@ public class CommitsCommand implements CommandMarker {
}
@CliCommand(value = "commits sync", help = "Compare commits with another Hoodie dataset")
- public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path)
- throws Exception {
+ public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path) {
HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, path);
HoodieCLI.state = HoodieCLI.CLIState.SYNC;
return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index 6a188c1..3a518ee 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -38,11 +38,12 @@ import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.func.OperationResult;
+import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
@@ -85,7 +86,7 @@ public class CompactionCommand implements CommandMarker {
public String compactionsAll(
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
- @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
+ @CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@@ -100,10 +101,9 @@ public class CompactionCommand implements CommandMarker {
List<HoodieInstant> instants = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
- for (int i = 0; i < instants.size(); i++) {
- HoodieInstant instant = instants.get(i);
+ for (HoodieInstant instant : instants) {
HoodieCompactionPlan compactionPlan = null;
- if (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) {
+ if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
try {
// This could be a completed compaction. Assume a compaction request file is present but skip if fails
compactionPlan = AvroUtils.deserializeCompactionPlan(
@@ -118,7 +118,7 @@ public class CompactionCommand implements CommandMarker {
}
if (null != compactionPlan) {
- HoodieInstant.State state = instant.getState();
+ State state = instant.getState();
if (committed.contains(instant.getTimestamp())) {
state = State.COMPLETED;
}
@@ -146,7 +146,7 @@ public class CompactionCommand implements CommandMarker {
public String compactionShow(
@CliOption(key = "instant", mandatory = true,
help = "Base path for the target hoodie dataset") final String compactionInstantTime,
- @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
+ @CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@@ -212,8 +212,7 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry,
- @CliOption(key = "compactionInstant", mandatory = false,
- help = "Base path for the target hoodie dataset") String compactionInstantTime,
+ @CliOption(key = "compactionInstant", help = "Base path for the target hoodie dataset") String compactionInstantTime,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
@@ -286,7 +285,7 @@ public class CompactionCommand implements CommandMarker {
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
- String output = null;
+ String output;
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -300,10 +299,10 @@ public class CompactionCommand implements CommandMarker {
return "Failed to validate compaction for " + compactionInstant;
}
List<ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
- boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true);
+ boolean valid = res.stream().map(OperationResult::isSuccess).reduce(Boolean::logicalAnd).orElse(true);
String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
List<Comparable[]> rows = new ArrayList<>();
- res.stream().forEach(r -> {
+ res.forEach(r -> {
Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(),
r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "",
r.getOperation().getDeltaFileNames().size(), r.isSuccess(),
@@ -347,7 +346,7 @@ public class CompactionCommand implements CommandMarker {
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
- String output = "";
+ String output;
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -391,7 +390,7 @@ public class CompactionCommand implements CommandMarker {
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
- String output = "";
+ String output;
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -437,7 +436,7 @@ public class CompactionCommand implements CommandMarker {
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
- String output = "";
+ String output;
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -476,7 +475,7 @@ public class CompactionCommand implements CommandMarker {
}
List<Comparable[]> rows = new ArrayList<>();
- res.stream().forEach(r -> {
+ res.forEach(r -> {
Comparable[] row =
new Comparable[] {r.getOperation().fileId, r.getOperation().srcPath, r.getOperation().destPath,
r.isExecuted(), r.isSuccess(), r.getException().isPresent() ? r.getException().get().getMessage() : ""};
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java
index d5d1e82..302931e 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java
@@ -49,14 +49,14 @@ public class DatasetsCommand implements CommandMarker {
@CliCommand(value = "connect", help = "Connect to a hoodie dataset")
public String connect(
@CliOption(key = {"path"}, mandatory = true, help = "Base Path of the dataset") final String path,
- @CliOption(key = {"layoutVersion"}, mandatory = false, help = "Timeline Layout version") Integer layoutVersion,
- @CliOption(key = {"eventuallyConsistent"}, mandatory = false, unspecifiedDefaultValue = "false",
+ @CliOption(key = {"layoutVersion"}, help = "Timeline Layout version") Integer layoutVersion,
+ @CliOption(key = {"eventuallyConsistent"}, unspecifiedDefaultValue = "false",
help = "Enable eventual consistency") final boolean eventuallyConsistent,
- @CliOption(key = {"initialCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "2000",
+ @CliOption(key = {"initialCheckIntervalMs"}, unspecifiedDefaultValue = "2000",
help = "Initial wait time for eventual consistency") final Integer initialConsistencyIntervalMs,
- @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "300000",
+ @CliOption(key = {"maxCheckIntervalMs"}, unspecifiedDefaultValue = "300000",
help = "Max wait time for eventual consistency") final Integer maxConsistencyIntervalMs,
- @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "7",
+ @CliOption(key = {"maxCheckIntervalMs"}, unspecifiedDefaultValue = "7",
help = "Max checks for eventual consistency") final Integer maxConsistencyChecks)
throws IOException {
HoodieCLI
@@ -118,7 +118,7 @@ public class DatasetsCommand implements CommandMarker {
/**
* Describes table properties.
*/
- @CliCommand(value = "desc", help = "Describle Hoodie Table properties")
+ @CliCommand(value = "desc", help = "Describe Hoodie Table properties")
public String descTable() {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
TableHeader header = new TableHeader().addTableHeaderField("Property").addTableHeaderField("Value");
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
index e94e16a..597bab3 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
@@ -22,6 +22,7 @@ import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
@@ -90,13 +91,13 @@ public class FileSystemViewCommand implements CommandMarker {
row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getFileSize() : -1;
if (!readOptimizedOnly) {
row[idx++] = fs.getLogFiles().count();
- row[idx++] = fs.getLogFiles().mapToLong(lf -> lf.getFileSize()).sum();
+ row[idx++] = fs.getLogFiles().mapToLong(HoodieLogFile::getFileSize).sum();
row[idx++] = fs.getLogFiles().collect(Collectors.toList()).toString();
}
rows.add(row);
}));
Function<Object, String> converterFunction =
- entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
+ entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Delta File Size", converterFunction);
fieldNameToConverterMap.put("Data-File Size", converterFunction);
@@ -160,15 +161,15 @@ public class FileSystemViewCommand implements CommandMarker {
if (!readOptimizedOnly) {
row[idx++] = fs.getLogFiles().count();
- row[idx++] = fs.getLogFiles().mapToLong(lf -> lf.getFileSize()).sum();
+ row[idx++] = fs.getLogFiles().mapToLong(HoodieLogFile::getFileSize).sum();
long logFilesScheduledForCompactionTotalSize =
fs.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(fs.getBaseInstantTime()))
- .mapToLong(lf -> lf.getFileSize()).sum();
+ .mapToLong(HoodieLogFile::getFileSize).sum();
row[idx++] = logFilesScheduledForCompactionTotalSize;
long logFilesUnscheduledTotalSize =
fs.getLogFiles().filter(lf -> !lf.getBaseCommitTime().equals(fs.getBaseInstantTime()))
- .mapToLong(lf -> lf.getFileSize()).sum();
+ .mapToLong(HoodieLogFile::getFileSize).sum();
row[idx++] = logFilesUnscheduledTotalSize;
double logSelectedForCompactionToBaseRatio =
@@ -186,7 +187,7 @@ public class FileSystemViewCommand implements CommandMarker {
});
Function<Object, String> converterFunction =
- entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
+ entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Data-File Size", converterFunction);
if (!readOptimizedOnly) {
@@ -230,9 +231,9 @@ public class FileSystemViewCommand implements CommandMarker {
FileSystem fs = HoodieCLI.fs;
String globPath = String.format("%s/%s/*", client.getBasePath(), globRegex);
FileStatus[] statuses = fs.globStatus(new Path(globPath));
- Stream<HoodieInstant> instantsStream = null;
+ Stream<HoodieInstant> instantsStream;
- HoodieTimeline timeline = null;
+ HoodieTimeline timeline;
if (readOptimizedOnly) {
timeline = metaClient.getActiveTimeline().getCommitTimeline();
} else if (excludeCompaction) {
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
index 4ba9d3d..4c81475 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
@@ -23,8 +23,8 @@ import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.utilities.HDFSParquetImporter.FormatValidator;
-
import org.apache.hudi.utilities.UtilHelpers;
+
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker;
@@ -42,7 +42,7 @@ public class HDFSParquetImportCommand implements CommandMarker {
@CliCommand(value = "hdfsparquetimport", help = "Imports Parquet dataset to a hoodie dataset")
public String convert(
- @CliOption(key = "upsert", mandatory = false, unspecifiedDefaultValue = "false",
+ @CliOption(key = "upsert", unspecifiedDefaultValue = "false",
help = "Uses upsert API instead of the default insert API of WriteClient") boolean useUpsert,
@CliOption(key = "srcPath", mandatory = true, help = "Base path for the input dataset") final String srcPath,
@CliOption(key = "targetPath", mandatory = true,
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index f9bea47..8a50309 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -38,8 +38,10 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.hive.util.SchemaUtil;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
@@ -84,14 +86,13 @@ public class HoodieLogFileCommand implements CommandMarker {
.map(status -> status.getPath().toString()).collect(Collectors.toList());
Map<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata =
Maps.newHashMap();
- int totalEntries = 0;
int numCorruptBlocks = 0;
int dummyInstantTimeCount = 0;
for (String logFilePath : logFilePaths) {
FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath));
Schema writerSchema = new AvroSchemaConverter()
- .convert(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePath)));
+ .convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePath))));
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
// read the avro blocks
@@ -124,14 +125,12 @@ public class HoodieLogFileCommand implements CommandMarker {
if (commitCountAndMetadata.containsKey(instantTime)) {
commitCountAndMetadata.get(instantTime).add(
new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount));
- totalEntries++;
} else {
List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>> list =
new ArrayList<>();
list.add(
new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount));
commitCountAndMetadata.put(instantTime, list);
- totalEntries++;
}
}
reader.close();
@@ -141,7 +140,7 @@ public class HoodieLogFileCommand implements CommandMarker {
ObjectMapper objectMapper = new ObjectMapper();
for (Map.Entry<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> entry : commitCountAndMetadata
.entrySet()) {
- String instantTime = entry.getKey().toString();
+ String instantTime = entry.getKey();
for (Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer> tuple3 : entry
.getValue()) {
Comparable[] output = new Comparable[5];
@@ -163,11 +162,11 @@ public class HoodieLogFileCommand implements CommandMarker {
@CliCommand(value = "show logfile records", help = "Read records from log files")
public String showLogFileRecords(
- @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
+ @CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "10") final Integer limit,
@CliOption(key = "logFilePathPattern", mandatory = true,
help = "Fully qualified paths for the log files") final String logFilePathPattern,
- @CliOption(key = "mergeRecords", mandatory = false, help = "If the records in the log files should be merged",
+ @CliOption(key = "mergeRecords", help = "If the records in the log files should be merged",
unspecifiedDefaultValue = "false") final Boolean shouldMerge)
throws IOException {
@@ -182,7 +181,7 @@ public class HoodieLogFileCommand implements CommandMarker {
AvroSchemaConverter converter = new AvroSchemaConverter();
// get schema from last log file
Schema readerSchema =
- converter.convert(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1))));
+ converter.convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1)))));
List<IndexedRecord> allRecords = new ArrayList<>();
@@ -191,11 +190,11 @@ public class HoodieLogFileCommand implements CommandMarker {
HoodieMergedLogRecordScanner scanner =
new HoodieMergedLogRecordScanner(fs, client.getBasePath(), logFilePaths, readerSchema,
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(),
- Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES),
- Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
- Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
- Integer.valueOf(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
- HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
+ HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
+ Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
+ Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
+ HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
+ HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
if (allRecords.size() < limit) {
@@ -205,7 +204,7 @@ public class HoodieLogFileCommand implements CommandMarker {
} else {
for (String logFile : logFilePaths) {
Schema writerSchema = new AvroSchemaConverter()
- .convert(SchemaUtil.readSchemaFromLogFile(client.getFs(), new Path(logFile)));
+ .convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(client.getFs(), new Path(logFile))));
HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema);
// read the avro blocks
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
index 152e21c..5346b98 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
@@ -31,6 +31,7 @@ import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
+import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
@@ -50,7 +51,7 @@ public class HoodieSyncCommand implements CommandMarker {
help = "total number of recent partitions to validate") final int partitionCount,
@CliOption(key = {"hiveServerUrl"}, mandatory = true,
help = "hiveServerURL to connect to") final String hiveServerUrl,
- @CliOption(key = {"hiveUser"}, mandatory = false, unspecifiedDefaultValue = "",
+ @CliOption(key = {"hiveUser"}, unspecifiedDefaultValue = "",
help = "hive username to connect to") final String hiveUser,
@CliOption(key = {"hivePass"}, mandatory = true, unspecifiedDefaultValue = "",
help = "hive password to connect to") final String hivePass)
@@ -80,33 +81,27 @@ public class HoodieSyncCommand implements CommandMarker {
if (sourceLatestCommit != null
&& HoodieTimeline.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) {
// source is behind the target
- List<HoodieInstant> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
- .getInstants().collect(Collectors.toList());
- if (commitsToCatchup.isEmpty()) {
- return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count("
- + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount);
- } else {
- long newInserts = CommitUtil.countNewRecords(target,
- commitsToCatchup.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()));
- return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count("
- + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount) + ". Catch up count is "
- + newInserts;
- }
+ return getString(target, targetTimeline, source, sourceCount, targetCount, sourceLatestCommit);
} else {
- List<HoodieInstant> commitsToCatchup = sourceTimeline.findInstantsAfter(targetLatestCommit, Integer.MAX_VALUE)
- .getInstants().collect(Collectors.toList());
- if (commitsToCatchup.isEmpty()) {
- return "Count difference now is (count(" + source.getTableConfig().getTableName() + ") - count("
- + target.getTableConfig().getTableName() + ") == " + (sourceCount - targetCount);
- } else {
- long newInserts = CommitUtil.countNewRecords(source,
- commitsToCatchup.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()));
- return "Count difference now is (count(" + source.getTableConfig().getTableName() + ") - count("
- + target.getTableConfig().getTableName() + ") == " + (sourceCount - targetCount) + ". Catch up count is "
- + newInserts;
- }
+ return getString(source, sourceTimeline, target, targetCount, sourceCount, targetLatestCommit);
}
}
+ private String getString(HoodieTableMetaClient target, HoodieTimeline targetTimeline, HoodieTableMetaClient source, long sourceCount, long targetCount, String sourceLatestCommit)
+ throws IOException {
+ List<HoodieInstant> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
+ .getInstants().collect(Collectors.toList());
+ if (commitsToCatchup.isEmpty()) {
+ return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count("
+ + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount);
+ } else {
+ long newInserts = CommitUtil.countNewRecords(target,
+ commitsToCatchup.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()));
+ return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count("
+ + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount) + ". Catch up count is "
+ + newInserts;
+ }
+ }
+
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 4b4c283..37f66f6 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -49,7 +49,7 @@ public class RepairsCommand implements CommandMarker {
mandatory = true) final String duplicatedPartitionPath,
@CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files",
mandatory = true) final String repairedOutputPath,
- @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path",
+ @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path",
mandatory = true) final String sparkPropertiesPath)
throws Exception {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java
index a3eb6a7..4a122c6 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java
@@ -99,20 +99,18 @@ public class RollbacksCommand implements CommandMarker {
HoodieRollbackMetadata metadata = AvroUtils.deserializeAvroMetadata(
activeTimeline.getInstantDetails(new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, rollbackInstant)).get(),
HoodieRollbackMetadata.class);
- metadata.getPartitionMetadata().entrySet().forEach(e -> {
- Stream
- .concat(e.getValue().getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)),
- e.getValue().getFailedDeleteFiles().stream().map(f -> Pair.of(f, false)))
- .forEach(fileWithDeleteStatus -> {
- Comparable[] row = new Comparable[5];
- row[0] = metadata.getStartRollbackTime();
- row[1] = metadata.getCommitsRollback().toString();
- row[2] = e.getKey();
- row[3] = fileWithDeleteStatus.getLeft();
- row[4] = fileWithDeleteStatus.getRight();
- rows.add(row);
- });
- });
+ metadata.getPartitionMetadata().forEach((key, value) -> Stream
+ .concat(value.getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)),
+ value.getFailedDeleteFiles().stream().map(f -> Pair.of(f, false)))
+ .forEach(fileWithDeleteStatus -> {
+ Comparable[] row = new Comparable[5];
+ row[0] = metadata.getStartRollbackTime();
+ row[1] = metadata.getCommitsRollback().toString();
+ row[2] = key;
+ row[3] = fileWithDeleteStatus.getLeft();
+ row[4] = fileWithDeleteStatus.getRight();
+ rows.add(row);
+ }));
TableHeader header = new TableHeader().addTableHeaderField("Instant").addTableHeaderField("Rolledback Instants")
.addTableHeaderField("Partition").addTableHeaderField("Deleted File").addTableHeaderField("Succeeded");
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
index 69a1584..d28ba27 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
@@ -93,7 +93,7 @@ public class SavepointsCommand implements CommandMarker {
@CliCommand(value = "savepoint rollback", help = "Savepoint a commit")
public String rollbackToSavepoint(
@CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String commitTime,
- @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath)
+ @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath)
throws Exception {
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
if (metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty()) {
@@ -122,7 +122,7 @@ public class SavepointsCommand implements CommandMarker {
}
@CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
- public String refreshMetaClient() throws IOException {
+ public String refreshMetaClient() {
HoodieCLI.refreshTableMetadata();
return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed.";
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 8ff52fa..13d1c8b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -124,19 +124,19 @@ public class SparkMain {
case COMPACT_REPAIR:
assert (args.length == 8);
doCompactRepair(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
- Boolean.valueOf(args[7]));
+ Boolean.parseBoolean(args[7]));
returnCode = 0;
break;
case COMPACT_UNSCHEDULE_FILE:
assert (args.length == 9);
doCompactUnscheduleFile(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
- Boolean.valueOf(args[7]), Boolean.valueOf(args[8]));
+ Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
returnCode = 0;
break;
case COMPACT_UNSCHEDULE_PLAN:
assert (args.length == 9);
doCompactUnschedule(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
- Boolean.valueOf(args[7]), Boolean.valueOf(args[8]));
+ Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
returnCode = 0;
break;
case CLEAN:
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
index 7fc3b25..b05aee2 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
@@ -73,7 +73,6 @@ public class StatsCommand implements CommandMarker {
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
List<Comparable[]> rows = new ArrayList<>();
- int i = 0;
DecimalFormat df = new DecimalFormat("#.00");
for (HoodieInstant commitTime : timeline.getInstants().collect(Collectors.toList())) {
String waf = "0";
@@ -94,7 +93,7 @@ public class StatsCommand implements CommandMarker {
rows.add(new Comparable[] {"Total", totalRecordsUpserted, totalRecordsWritten, waf});
TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Upserted")
- .addTableHeaderField("Total Written").addTableHeaderField("Write Amplifiation Factor");
+ .addTableHeaderField("Total Written").addTableHeaderField("Write Amplification Factor");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
}
@@ -120,7 +119,7 @@ public class StatsCommand implements CommandMarker {
// max, min, #small files < 10MB, 50th, avg, 95th
Histogram globalHistogram = new Histogram(new UniformReservoir(MAX_FILES));
- HashMap<String, Histogram> commitHistoMap = new HashMap<String, Histogram>();
+ HashMap<String, Histogram> commitHistoMap = new HashMap<>();
for (FileStatus fileStatus : statuses) {
String commitTime = FSUtils.getCommitTime(fileStatus.getPath().getName());
long sz = fileStatus.getLen();
@@ -132,7 +131,6 @@ public class StatsCommand implements CommandMarker {
}
List<Comparable[]> rows = new ArrayList<>();
- int ind = 0;
for (String commitTime : commitHistoMap.keySet()) {
Snapshot s = commitHistoMap.get(commitTime).getSnapshot();
rows.add(printFileSizeHistogram(commitTime, s));
@@ -141,7 +139,7 @@ public class StatsCommand implements CommandMarker {
rows.add(printFileSizeHistogram("ALL", s));
Function<Object, String> converterFunction =
- entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
+ entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Min", converterFunction);
fieldNameToConverterMap.put("10th", converterFunction);
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
index 5b5a3f5..b71a979 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
@@ -24,7 +24,8 @@ import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.log4j.Logger;
+import com.google.common.base.Preconditions;
+
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
@@ -38,8 +39,7 @@ import java.util.Map;
*/
public class SparkUtil {
- private static final Logger LOG = Logger.getLogger(SparkUtil.class);
- public static final String DEFUALT_SPARK_MASTER = "yarn-client";
+ public static final String DEFAULT_SPARK_MASTER = "yarn-client";
/**
* TODO: Need to fix a bunch of hardcoded stuff here eg: history server, spark distro.
@@ -55,7 +55,7 @@ public class SparkUtil {
sparkLauncher.setPropertiesFile(propertiesFile);
}
File libDirectory = new File(new File(currentJar).getParent(), "lib");
- for (String library : libDirectory.list()) {
+ for (String library : Preconditions.checkNotNull(libDirectory.list())) {
sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath());
}
return sparkLauncher;
@@ -66,7 +66,7 @@ public class SparkUtil {
String defMasterFromEnv = sparkConf.getenv("SPARK_MASTER");
if ((null == defMasterFromEnv) || (defMasterFromEnv.isEmpty())) {
- sparkConf.setMaster(DEFUALT_SPARK_MASTER);
+ sparkConf.setMaster(DEFAULT_SPARK_MASTER);
} else {
sparkConf.setMaster(defMasterFromEnv);
}
@@ -82,7 +82,7 @@ public class SparkUtil {
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
- sparkConf = HoodieWriteClient.registerClasses(sparkConf);
+ HoodieWriteClient.registerClasses(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean("parquet.enable.summary-metadata", false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());