You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:16 UTC

[hudi] 06/45: [MINOR] fix Invalid value for YearOfEra

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c1ceb628e576dd50f9c3bdf1ab830dcf61f70296
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Sun Oct 23 17:35:58 2022 +0800

    [MINOR] fix Invalid value for YearOfEra
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 28 ++++++++++++++--------
 .../apache/hudi/client/SparkRDDWriteClient.java    | 22 +++++++++++++++++
 .../table/timeline/HoodieActiveTimeline.java       | 18 ++++++++++++++
 3 files changed, 58 insertions(+), 10 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index d9f260e633..ff500a617e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -104,6 +104,7 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -115,6 +116,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
+import static org.apache.hudi.common.model.TableServiceType.CLEAN;
 
 /**
  * Abstract Write Client providing functionality for performing commit, index updates and rollback
@@ -306,14 +308,20 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
   protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);
 
   void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
-    if (writeTimer != null) {
-      long durationInMs = metrics.getDurationInMs(writeTimer.stop());
-      // instantTime could be a non-standard value, so use `parseDateFromInstantTimeSafely`
-      // e.g. INIT_INSTANT_TS, METADATA_BOOTSTRAP_INSTANT_TS and FULL_BOOTSTRAP_INSTANT_TS in HoodieTimeline
-      HoodieActiveTimeline.parseDateFromInstantTimeSafely(instantTime).ifPresent(parsedInstant ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, actionType)
-      );
-      writeTimer = null;
+    try {
+      if (writeTimer != null) {
+        long durationInMs = metrics.getDurationInMs(writeTimer.stop());
+        long commitEpochTimeInMs = 0;
+        if (HoodieActiveTimeline.checkDateTime(instantTime)) {
+          commitEpochTimeInMs = HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime();
+        }
+        metrics.updateCommitMetrics(commitEpochTimeInMs, durationInMs,
+            metadata, actionType);
+        writeTimer = null;
+      }
+    } catch (ParseException e) {
+      throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime
+          + "Instant time is not of valid format", e);
     }
   }
 
@@ -862,7 +870,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
       LOG.info("Cleaner started");
       // proceed only if multiple clean schedules are enabled or if there are no pending cleans.
       if (scheduleInline) {
-        scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
+        scheduleTableServiceInternal(cleanInstantTime, Option.empty(), CLEAN);
         table.getMetaClient().reloadActiveTimeline();
       }
     }
@@ -1286,7 +1294,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    * @param extraMetadata Extra Metadata to be stored
    */
   protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
-    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent();
+    return scheduleTableService(instantTime, extraMetadata, CLEAN).isPresent();
   }
 
   /**
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 7110e26bb0..32c4a0a06d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -45,6 +45,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
@@ -68,6 +69,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -324,6 +326,16 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
       HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant ->
           metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION)
       );
+      try {
+        long commitEpochTimeInMs = 0;
+        if (HoodieActiveTimeline.checkDateTime(compactionCommitTime)) {
+          commitEpochTimeInMs = HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime();
+        }
+        metrics.updateCommitMetrics(commitEpochTimeInMs, durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
+      } catch (ParseException e) {
+        throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+            + config.getBasePath() + " at time " + compactionCommitTime, e);
+      }
     }
     LOG.info("Compacted successfully on commit " + compactionCommitTime);
   }
@@ -406,6 +418,16 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
       HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant ->
           metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
       );
+      try {
+        long commitEpochTimeInMs = 0;
+        if (HoodieActiveTimeline.checkDateTime(clusteringCommitTime)) {
+          commitEpochTimeInMs = HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime();
+        }
+        metrics.updateCommitMetrics(commitEpochTimeInMs, durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+      } catch (ParseException e) {
+        throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+            + config.getBasePath() + " at time " + clusteringCommitTime, e);
+      }
     }
     LOG.info("Clustering successfully on commit " + clusteringCommitTime);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 2b27d3ab5e..414e92e58b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -39,6 +39,8 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.text.ParseException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -49,6 +51,8 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.SECS_INSTANT_TIMESTAMP_FORMAT;
+
 /**
  * Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
  * ActiveTimeline and the rest are Archived. ActiveTimeline is a special timeline that allows for creation of instants
@@ -125,6 +129,20 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     return parsedDate;
   }
 
+  /**
+   * Check if the instantTime is in SECS_INSTANT_TIMESTAMP_FORMAT format.
+   */
+  public static boolean checkDateTime(String instantTime) {
+    DateTimeFormatter dtf = DateTimeFormatter.ofPattern(SECS_INSTANT_TIMESTAMP_FORMAT);
+    boolean flag = true;
+    try {
+      LocalDateTime.parse(instantTime, dtf);
+    } catch (Exception e) {
+      flag = false;
+    }
+    return flag;
+  }
+
   /**
    * Format the Date to a String representing the timestamp of a Hoodie Instant.
    */