You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:16 UTC
[hudi] 06/45: [MINOR] fix Invalid value for YearOfEra
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c1ceb628e576dd50f9c3bdf1ab830dcf61f70296
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Sun Oct 23 17:35:58 2022 +0800
[MINOR] fix Invalid value for YearOfEra
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 28 ++++++++++++++--------
.../apache/hudi/client/SparkRDDWriteClient.java | 22 +++++++++++++++++
.../table/timeline/HoodieActiveTimeline.java | 18 ++++++++++++++
3 files changed, 58 insertions(+), 10 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index d9f260e633..ff500a617e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -104,6 +104,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -115,6 +116,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
+import static org.apache.hudi.common.model.TableServiceType.CLEAN;
/**
* Abstract Write Client providing functionality for performing commit, index updates and rollback
@@ -306,14 +308,20 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);
void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
- if (writeTimer != null) {
- long durationInMs = metrics.getDurationInMs(writeTimer.stop());
- // instantTime could be a non-standard value, so use `parseDateFromInstantTimeSafely`
- // e.g. INIT_INSTANT_TS, METADATA_BOOTSTRAP_INSTANT_TS and FULL_BOOTSTRAP_INSTANT_TS in HoodieTimeline
- HoodieActiveTimeline.parseDateFromInstantTimeSafely(instantTime).ifPresent(parsedInstant ->
- metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, actionType)
- );
- writeTimer = null;
+ try {
+ if (writeTimer != null) {
+ long durationInMs = metrics.getDurationInMs(writeTimer.stop());
+ long commitEpochTimeInMs = 0;
+ if (HoodieActiveTimeline.checkDateTime(instantTime)) {
+ commitEpochTimeInMs = HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime();
+ }
+ metrics.updateCommitMetrics(commitEpochTimeInMs, durationInMs,
+ metadata, actionType);
+ writeTimer = null;
+ }
+ } catch (ParseException e) {
+ throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime
+ + "Instant time is not of valid format", e);
}
}
@@ -862,7 +870,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
LOG.info("Cleaner started");
// proceed only if multiple clean schedules are enabled or if there are no pending cleans.
if (scheduleInline) {
- scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
+ scheduleTableServiceInternal(cleanInstantTime, Option.empty(), CLEAN);
table.getMetaClient().reloadActiveTimeline();
}
}
@@ -1286,7 +1294,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param extraMetadata Extra Metadata to be stored
*/
protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
- return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent();
+ return scheduleTableService(instantTime, extraMetadata, CLEAN).isPresent();
}
/**
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 7110e26bb0..32c4a0a06d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -45,6 +45,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
@@ -68,6 +69,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -324,6 +326,16 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant ->
metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION)
);
+ try {
+ long commitEpochTimeInMs = 0;
+ if (HoodieActiveTimeline.checkDateTime(compactionCommitTime)) {
+ commitEpochTimeInMs = HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime();
+ }
+ metrics.updateCommitMetrics(commitEpochTimeInMs, durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
+ } catch (ParseException e) {
+ throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ + config.getBasePath() + " at time " + compactionCommitTime, e);
+ }
}
LOG.info("Compacted successfully on commit " + compactionCommitTime);
}
@@ -406,6 +418,16 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant ->
metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
);
+ try {
+ long commitEpochTimeInMs = 0;
+ if (HoodieActiveTimeline.checkDateTime(clusteringCommitTime)) {
+ commitEpochTimeInMs = HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime();
+ }
+ metrics.updateCommitMetrics(commitEpochTimeInMs, durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+ } catch (ParseException e) {
+ throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ + config.getBasePath() + " at time " + clusteringCommitTime, e);
+ }
}
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 2b27d3ab5e..414e92e58b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -39,6 +39,8 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -49,6 +51,8 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
+import static org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.SECS_INSTANT_TIMESTAMP_FORMAT;
+
/**
* Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
* ActiveTimeline and the rest are Archived. ActiveTimeline is a special timeline that allows for creation of instants
@@ -125,6 +129,20 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
return parsedDate;
}
+ /**
+ * Check if the instantTime is in SECS_INSTANT_TIMESTAMP_FORMAT format.
+ */
+ public static boolean checkDateTime(String instantTime) {
+ DateTimeFormatter dtf = DateTimeFormatter.ofPattern(SECS_INSTANT_TIMESTAMP_FORMAT);
+ boolean flag = true;
+ try {
+ LocalDateTime.parse(instantTime, dtf);
+ } catch (Exception e) {
+ flag = false;
+ }
+ return flag;
+ }
+
/**
* Format the Date to a String representing the timestamp of a Hoodie Instant.
*/