You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2023/01/09 01:01:05 UTC

[GitHub] [hudi] yihua commented on a diff in pull request #6133: [HUDI-1575] Early Conflict Detection For Multi-writer

yihua commented on code in PR #6133:
URL: https://github.com/apache/hudi/pull/6133#discussion_r1063209840


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -549,6 +553,50 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("When table is upgraded from pre 0.12 to 0.12, we check for \"default\" partition and fail if found one. "
           + "Users are expected to rewrite the data in those partitions. Enabling this config will bypass this validation");
 
+  // Pluggable strategies to use when early conflict detection
+  public static final ConfigProperty<String> EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+      .key(CONCURRENCY_PREFIX + "early.conflict.detection.strategy")
+      .noDefaultValue()
+      .sinceVersion("0.13.0")
+      .withInferFunction(cfg -> {
+        MarkerType markerType = MarkerType.valueOf(cfg.getString(MARKERS_TYPE).toUpperCase());
+        switch (markerType) {
+          case DIRECT:
+            return Option.of(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName());
+          case TIMELINE_SERVER_BASED:
+          default:
+            return Option.of(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName());
+        }
+      })
+      .withDocumentation("Early conflict detection class name, this should be subclass of "
+          + "org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy");
+
+  public static final ConfigProperty<Boolean> EARLY_CONFLICT_DETECTION_ENABLE = ConfigProperty
+      .key(CONCURRENCY_PREFIX + "early.conflict.detection.enable")
+      .defaultValue(false)
+      .sinceVersion("0.13.0")
+      .withDocumentation("Enable early conflict detection based on markers. It will try to detect writing conflict before create markers and fast fail"
+          + " which will release cluster resources as soon as possible.");
+
+  public static final ConfigProperty<Long> MARKER_CONFLICT_CHECKER_BATCH_INTERVAL = ConfigProperty

Review Comment:
   Add unit for the time configs



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -148,13 +185,30 @@ protected Option<Path> create(String partitionPath, String dataFileName, IOType
     } catch (IOException e) {
       throw new HoodieRemoteException("Failed to create marker file " + partitionPath + "/" + markerFileName, e);
     }
-    LOG.info("[timeline-server-based] Created marker file " + partitionPath + "/" + markerFileName
-        + " in " + timer.endTimer() + " ms");
-    if (success) {
-      return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath, partitionPath), markerFileName));
+    return success;
+  }
+
+  /**
+   * init create marker related config maps.
+   * @param partitionPath
+   * @param markerFileName
+   * @return
+   */
+  private Map<String, String> initConfigMap(String partitionPath, String markerFileName, boolean initEarlyConflictConfigs) {

Review Comment:
   nit: `initEarlyConflictConfigs` -> `initConflictDetectionConfigs`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -549,6 +553,50 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("When table is upgraded from pre 0.12 to 0.12, we check for \"default\" partition and fail if found one. "
           + "Users are expected to rewrite the data in those partitions. Enabling this config will bypass this validation");
 
+  // Pluggable strategies to use when early conflict detection
+  public static final ConfigProperty<String> EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+      .key(CONCURRENCY_PREFIX + "early.conflict.detection.strategy")
+      .noDefaultValue()
+      .sinceVersion("0.13.0")
+      .withInferFunction(cfg -> {
+        MarkerType markerType = MarkerType.valueOf(cfg.getString(MARKERS_TYPE).toUpperCase());
+        switch (markerType) {
+          case DIRECT:
+            return Option.of(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName());
+          case TIMELINE_SERVER_BASED:
+          default:
+            return Option.of(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName());
+        }
+      })
+      .withDocumentation("Early conflict detection class name, this should be subclass of "
+          + "org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy");
+
+  public static final ConfigProperty<Boolean> EARLY_CONFLICT_DETECTION_ENABLE = ConfigProperty
+      .key(CONCURRENCY_PREFIX + "early.conflict.detection.enable")
+      .defaultValue(false)
+      .sinceVersion("0.13.0")
+      .withDocumentation("Enable early conflict detection based on markers. It will try to detect writing conflict before create markers and fast fail"
+          + " which will release cluster resources as soon as possible.");
+
+  public static final ConfigProperty<Long> MARKER_CONFLICT_CHECKER_BATCH_INTERVAL = ConfigProperty

Review Comment:
   naming: `checker` -> `detector` in all places.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -43,23 +47,52 @@
 public class LockManager implements Serializable, AutoCloseable {
 
   private static final Logger LOG = LogManager.getLogger(LockManager.class);
-  private final HoodieWriteConfig writeConfig;
-  private final LockConfiguration lockConfiguration;
-  private final SerializableConfiguration hadoopConf;
-  private final int maxRetries;
-  private final long maxWaitTimeInMs;
+  private HoodieWriteConfig writeConfig;
+  private LockConfiguration lockConfiguration;
+  private SerializableConfiguration hadoopConf;
+  private int maxRetries;
+  private long maxWaitTimeInMs;

Review Comment:
   These should still be final.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java:
##########
@@ -52,16 +58,46 @@ public WriteMarkers(String basePath, String markerFolderPath, String instantTime
     this.instantTime = instantTime;
   }
 
+  public Option<Path> create(String partitionPath, String dataFileName, IOType type) {
+    return create(partitionPath, dataFileName, type, Option.empty());
+  }
+
   /**
    * Creates a marker without checking if the marker already exists.
    *
    * @param partitionPath partition path in the table
    * @param dataFileName data file name
    * @param type  write IO type
+   * @param handler could be empty
    * @return the marker path
    */
-  public Option<Path> create(String partitionPath, String dataFileName, IOType type) {
-    return create(partitionPath, dataFileName, type, false);
+  public Option<Path> create(String partitionPath, String dataFileName, IOType type, Option<HoodieWriteHandle> handler) {
+    boolean checkIfExists = false;
+
+    if (handler.isPresent()
+        && handler.get().getConfig().getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
+        && handler.get().getConfig().isEarlyConflictDetectionEnable()) {
+
+      HoodieTableMetaClient metaClient = handler.get().getHoodieTableMetaClient();
+      HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+
+      HoodieTimeline pendingCompactionTimeline = activeTimeline.filterPendingCompactionTimeline();
+      HoodieTimeline pendingReplaceTimeline = activeTimeline.filterPendingReplaceTimeline();
+      // TODO if current is compact or clustering then create marker directly without early conflict detection.
+      // Need to support early conflict detection between table service and common writers.
+      if (pendingCompactionTimeline.containsInstant(instantTime) || pendingReplaceTimeline.containsInstant(instantTime)) {

Review Comment:
   Sg.  @zhangyue19921010 could you add the details around this implementation decision to the RFC?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java:
##########
@@ -86,7 +86,17 @@ public void startServer() throws IOException {
           .enableMarkerRequests(true)
           .markerBatchNumThreads(writeConfig.getMarkersTimelineServerBasedBatchNumThreads())
           .markerBatchIntervalMs(writeConfig.getMarkersTimelineServerBasedBatchIntervalMs())
-          .markerParallelism(writeConfig.getMarkersDeleteParallelism());
+          .markerParallelism(writeConfig.getMarkersDeleteParallelism())
+      ;

Review Comment:
   nit: should be in the same line.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -155,6 +160,20 @@ protected Option<Path> create(String partitionPath, String dataFileName, IOType
     return create(getMarkerPath(partitionPath, dataFileName, type), checkIfExists);
   }
 
+  @Override
+  public Option<Path> createWithEarlyConflictDetection(String partitionPath, String dataFileName, IOType type, boolean checkIfExists, Set<HoodieInstant> completedCommitInstants,

Review Comment:
   Could `completedCommitInstants` be derived from `activeTimeline`?  If so, there is no need to pass in `completedCommitInstants`.



##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieTimelineServerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import java.util.Set;
+
+public abstract class HoodieTimelineServerBasedEarlyConflictDetectionStrategy implements HoodieEarlyConflictDetectionStrategy {

Review Comment:
   Implementation-wise, `HoodieEarlyConflictDetectionStrategy` is used by the write client.  I suggest that the timeline-server-related logic, i.e., the `fresh` method containing the scheduling of the marker checker, be moved to the timeline server.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java:
##########
@@ -52,16 +58,47 @@ public WriteMarkers(String basePath, String markerFolderPath, String instantTime
     this.instantTime = instantTime;
   }
 
+  public Option<Path> create(String partitionPath, String dataFileName, IOType type) {
+    return create(partitionPath, dataFileName, type, Option.empty());
+  }
+
   /**
    * Creates a marker without checking if the marker already exists.
    *
    * @param partitionPath partition path in the table
    * @param dataFileName data file name
    * @param type  write IO type
+   * @param handler could be empty
    * @return the marker path
    */
-  public Option<Path> create(String partitionPath, String dataFileName, IOType type) {
-    return create(partitionPath, dataFileName, type, false);
+  public Option<Path> create(String partitionPath, String dataFileName, IOType type, Option<HoodieWriteHandle> handler) {

Review Comment:
   nit: `HoodieWriteHandle` instance should not be directly passed in here.  Instead, only pass in what's needed, i.e., relevant write config, the timeline, and the file ID, to avoid exposure of `HoodieWriteHandle`.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -62,6 +63,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
     metrics = new HoodieLockMetrics(writeConfig);
   }
 
+  /**
+   * Try to have a lock at partitionPath + fileID level for different write handler.
+   * @param writeConfig
+   * @param fs
+   * @param partitionPath
+   * @param fileId
+   */
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String partitionPath, String fileId) {
+    this.writeConfig = writeConfig;
+    this.hadoopConf = new SerializableConfiguration(fs.getConf());
+    TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/" + fileId);
+    this.lockConfiguration = new LockConfiguration(props);
+    maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
+        Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
+    maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
+        Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
+  }
+
+  /**
+   * rebuild lock related configs, only support ZK related lock for now.
+   */
+  private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig, String key) {
+    TypedProperties props = new TypedProperties(writeConfig.getProps());
+    props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, key);

Review Comment:
   Sg.  Let's use `LOCK_PROVIDER_CLASS_NAME` instead of `ZK_BASE_PATH_PROP_KEY` for checking whether ZK-based lock is configured.
   
   @zhangyue19921010 could you file a JIRA ticket besides the TODO because this requires more work?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -57,6 +62,14 @@ public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
     }
   }
 
+  public void beginTransaction(String partitionPath, String fileId) {

Review Comment:
   This can be simplified without taking the arguments because the constructor takes the partition path and file ID.  The transaction manager for the direct makers should be implemented in a separate class to isolate the logic.



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java:
##########
@@ -124,6 +124,29 @@ public static class Config implements Serializable {
     @Parameter(names = {"--marker-parallelism", "-mdp"}, description = "Parallelism to use for reading and deleting marker files")
     public int markerParallelism = 100;
 
+    @Parameter(names = {"--early-conflict-detection-strategy"}, description = "Early conflict detection class name, this should be subclass of "
+        + "org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy")
+    public String earlyConflictDetectStrategy = "org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineMarkerEarlyConflictDetectionStrategy";
+
+    @Parameter(names = {"--early-conflict-detection-check-commit-conflict"}, description = "Enable check commit conflict or not during early conflict detect")
+    public Boolean checkCommitConflict = false;
+
+    @Parameter(names = {"--early-conflict-detection-enable"}, description = "Enable early conflict detection based on markers. It will try to detect writing conflict "
+        + "before create markers and fast fail which will release cluster resources as soon as possible.")
+    public Boolean earlyConflictDetectionEnable = false;
+
+    @Parameter(names = {"--early-conflict-async-checker-batch-interval"}, description = "Used for timeline based marker AsyncTimelineMarkerConflictResolutionStrategy. "

Review Comment:
   nit: rename the timeline server configs to be aligned with `HoodieWriteConfig`



##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class HoodieDirectMarkerBasedEarlyConflictDetectionStrategy implements HoodieEarlyConflictDetectionStrategy {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+  protected final String basePath;
+  protected final FileSystem fs;
+  protected final String partitionPath;
+  protected final String fileId;
+  protected final String instantTime;
+  protected final HoodieActiveTimeline activeTimeline;
+  protected final HoodieConfig config;
+  protected Set<HoodieInstant> completedCommitInstants;
+  protected final Boolean checkCommitConflict;
+  protected final Long maxAllowableHeartbeatIntervalInMs;
+
+  public HoodieDirectMarkerBasedEarlyConflictDetectionStrategy(String basePath, HoodieWrapperFileSystem fs, String partitionPath, String fileId, String instantTime,
+                                                               HoodieActiveTimeline activeTimeline, HoodieConfig config, Boolean checkCommitConflict, Long maxAllowableHeartbeatIntervalInMs,
+                                                               HashSet<HoodieInstant> completedCommitInstants) {
+    this.basePath = basePath;
+    this.fs = fs;
+    this.partitionPath = partitionPath;
+    this.fileId = fileId;
+    this.instantTime = instantTime;
+    this.completedCommitInstants = completedCommitInstants;
+    this.activeTimeline = activeTimeline;
+    this.config = config;
+    this.checkCommitConflict = checkCommitConflict;
+    this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs;
+  }
+
+  /**
+   * We need to do list operation here.
+   * In order to reduce the list pressure as much as possible, first we build path prefix in advance:  '$base_path/.temp/instant_time/partition_path',
+   * and only list these specific partition_paths we need instead of list all the '$base_path/.temp/'
+   * @param basePath
+   * @param partitionPath
+   * @param fileId 162b13d7-9530-48cf-88a4-02241817ae0c-0_1-74-100_003.parquet
+   * @return true if current fileID is already existed under .temp/instant_time/partition_path/..
+   * @throws IOException
+   */
+  public boolean checkMarkerConflict(HoodieActiveTimeline activeTimeline, String basePath, String partitionPath, String fileId,

Review Comment:
   Similarly, the arguments should be simplified since the constructor takes most of the information already.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -43,23 +47,52 @@
 public class LockManager implements Serializable, AutoCloseable {
 
   private static final Logger LOG = LogManager.getLogger(LockManager.class);
-  private final HoodieWriteConfig writeConfig;
-  private final LockConfiguration lockConfiguration;
-  private final SerializableConfiguration hadoopConf;
-  private final int maxRetries;
-  private final long maxWaitTimeInMs;
+  private HoodieWriteConfig writeConfig;
+  private LockConfiguration lockConfiguration;
+  private SerializableConfiguration hadoopConf;
+  private int maxRetries;
+  private long maxWaitTimeInMs;
   private transient HoodieLockMetrics metrics;
   private volatile LockProvider lockProvider;
 
   public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+    init(writeConfig, fs.getConf(), writeConfig.getProps());
+  }
+
+  /**
+   * Try to have a lock at partitionPath + fileID level for different write handler.
+   * @param writeConfig
+   * @param fs
+   * @param partitionPath
+   * @param fileId
+   */
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String partitionPath, String fileId) {
+    TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/" + fileId);
+    init(writeConfig, fs.getConf(), props);
+  }
+
+  private void init(HoodieWriteConfig writeConfig, Configuration conf, TypedProperties lockProps) {
+    this.lockConfiguration = new LockConfiguration(lockProps);
     this.writeConfig = writeConfig;
-    this.hadoopConf = new SerializableConfiguration(fs.getConf());
-    this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
-    maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
+    this.hadoopConf = new SerializableConfiguration(conf);
+    this.maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
         Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
-    maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
+    this.maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
         Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
-    metrics = new HoodieLockMetrics(writeConfig);
+    this.metrics = new HoodieLockMetrics(writeConfig);
+  }
+
+  /**
+   * rebuild lock related configs, only support ZK related lock for now.
+   */
+  private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig, String key) {

Review Comment:
   It is better to move this logic of changing lock configs to the transaction manager for the direct makers, as the lock manager itself should not be aware of any higher-layer logic.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.common.conflict.detection.HoodieDirectMarkerBasedEarlyConflictDetectionStrategy;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This strategy is used for direct marker writers, trying to do early conflict detection.
+ * It will use fileSystem api like list and exist directly to check if there is any marker file conflict.
+ */
+public class SimpleDirectMarkerBasedEarlyConflictDetectionStrategy extends HoodieDirectMarkerBasedEarlyConflictDetectionStrategy {
+
+  private static final Logger LOG = LogManager.getLogger(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+
+  public SimpleDirectMarkerBasedEarlyConflictDetectionStrategy(String basePath, HoodieWrapperFileSystem fs, String partitionPath, String fileId, String instantTime,

Review Comment:
   nit: the full write config is not needed.  We should simply the list of arguments passed in here.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.common.conflict.detection.HoodieDirectMarkerBasedEarlyConflictDetectionStrategy;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ConcurrentModificationException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This strategy is used for direct marker writers, trying to do early conflict detection.
+ * It will use fileSystem api like list and exist directly to check if there is any marker file conflict.
+ */
+public class SimpleDirectMarkerBasedEarlyConflictDetectionStrategy extends HoodieDirectMarkerBasedEarlyConflictDetectionStrategy {
+
+  private static final Logger LOG = LogManager.getLogger(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+
+  public SimpleDirectMarkerBasedEarlyConflictDetectionStrategy(String basePath, HoodieWrapperFileSystem fs, String partitionPath, String fileId, String instantTime,
+                                                               HoodieActiveTimeline activeTimeline, HoodieWriteConfig config, Boolean checkCommitConflict) {
+    super(basePath, fs, partitionPath, fileId, instantTime, activeTimeline, config, checkCommitConflict);
+  }
+
+  @Override
+  public boolean hasMarkerConflict() {
+    try {
+      return checkMarkerConflict(basePath, partitionPath, fileId, fs, instantTime)
+          || (checkCommitConflict && MarkerUtils.hasCommitConflict(Stream.of(fileId).collect(Collectors.toSet()), basePath, completedCommitInstants));
+    } catch (IOException e) {
+      LOG.warn("Exception occurs during create marker file in eager conflict detection mode.");
+      throw new HoodieIOException("Exception occurs during create marker file in eager conflict detection mode.", e);
+    }
+  }
+
+  @Override
+  public void resolveMarkerConflict(String basePath, String partitionPath, String dataFileName) {
+    throw new HoodieEarlyConflictDetectionException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes"));

Review Comment:
   As @zhangyue19921010 explained, this is unnecessary because compaction and clustering instants do not go through the early conflict detection logic.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java:
##########
@@ -208,10 +219,116 @@ public static Set<String> readMarkersFromFile(Path markersFilePath, Serializable
       fsDataInputStream = fs.open(markersFilePath);
       markers = new HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream));
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to read MARKERS file " + markersFilePath, e);
+      if (ignoreException) {
+        LOG.warn("IOException occurs during read MARKERS file, ", e);
+      } else {
+        throw new HoodieIOException("Failed to read MARKERS file " + markersFilePath, e);
+      }
     } finally {
       closeQuietly(fsDataInputStream);
     }
     return markers;
   }
+
+  public static List<Path> getAllMarkerDir(Path tempPath, FileSystem fs) throws IOException {
+    return Arrays.stream(fs.listStatus(tempPath)).map(FileStatus::getPath).collect(Collectors.toList());
+  }
+
+  public static boolean hasCommitConflict(HoodieActiveTimeline activeTimeline, Set<String> currentFileIDs, Set<HoodieInstant> completedCommitInstants) {
+
+    Set<HoodieInstant> currentInstants = activeTimeline.reload().getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+
+    currentInstants.removeAll(completedCommitInstants);
+    Set<String> missingFileIDs = currentInstants.stream().flatMap(instant -> {
+      try {
+        return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class)
+            .getFileIdAndRelativePaths().keySet().stream();
+      } catch (Exception e) {
+        return Stream.empty();
+      }
+    }).collect(Collectors.toSet());
+    currentFileIDs.retainAll(missingFileIDs);
+    return !currentFileIDs.isEmpty();
+  }
+
+  /**
+   * Get Candidate Instant to do conflict checking:
+   * 1. Skip current writer related instant(currentInstantTime)
+   * 2. Skip all instants after currentInstantTime
+   * 3. Skip dead writers related instants based on heart-beat
+   * 4. Skip pending compaction instant (For now we don' do early conflict check with compact action)
+   *      Because we don't want to let pending compaction block common writer.
+   * @param instants
+   * @return
+   */
+  public static List<String> getCandidateInstants(HoodieActiveTimeline activeTimeline, List<Path> instants, String currentInstantTime,
+                                                  long maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) {
+
+    HoodieActiveTimeline reloadActive = activeTimeline.reload();
+
+    return instants.stream().map(Path::toString).filter(instantPath -> {
+      String instantTime = markerDirToInstantTime(instantPath);
+      return instantTime.compareToIgnoreCase(currentInstantTime) < 0
+          && !reloadActive.filterPendingCompactionTimeline().containsInstant(instantTime)
+          && !reloadActive.filterPendingReplaceTimeline().containsInstant(instantTime);
+    }).filter(instantPath -> {
+      try {
+        return !isHeartbeatExpired(markerDirToInstantTime(instantPath), maxAllowableHeartbeatIntervalInMs, fs, basePath);
+      } catch (IOException e) {
+        return false;
+      }
+    }).collect(Collectors.toList());
+  }
+
+  /**
+   * Get fileID from full marker path, for example:
+   * 20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0_85-15-1390_20220620181735781.parquet.marker.MERGE
+   *    ==> get 20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0
+   * @param marker
+   * @return
+   */
+  public static String makerToPartitionAndFileID(String marker) {
+    String[] ele = marker.split("_");
+    return ele[0];
+  }
+
+  /**
+   * Get instantTime from full marker path, for example:
+   * /var/folders/t3/th1dw75d0yz2x2k2qt6ys9zh0000gp/T/junit6502909693741900820/dataset/.hoodie/.temp/003
+   *    ==> 003
+   * @param marker
+   * @return
+   */
+  public static String markerDirToInstantTime(String marker) {
+    String[] ele = marker.split("/");
+    return ele[ele.length - 1];
+  }
+
+  /**
+   * Use modification time as last heart beat time
+   * @param fs
+   * @param basePath
+   * @param instantTime
+   * @return
+   * @throws IOException
+   */
+  public static Long getLastHeartbeatTime(FileSystem fs, String basePath, String instantTime) throws IOException {
+    Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + instantTime);
+    if (fs.exists(heartbeatFilePath)) {
+      return fs.getFileStatus(heartbeatFilePath).getModificationTime();
+    } else {
+      // NOTE : This can happen when a writer is upgraded to use lazy cleaning and the last write had failed
+      return 0L;
+    }
+  }
+
+  public static boolean isHeartbeatExpired(String instantTime, long maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) throws IOException {
+    Long currentTime = System.currentTimeMillis();
+    Long lastHeartbeatTime = getLastHeartbeatTime(fs, basePath, instantTime);
+    if (currentTime - lastHeartbeatTime > maxAllowableHeartbeatIntervalInMs) {
+      LOG.warn("Heartbeat expired, for instant: " + instantTime);
+      return true;
+    }
+    return false;
+  }

Review Comment:
   Given the limitation, now it makes sense to move the `HeartbeatUtils` to `hudi-common`, so common utils can be shared instead of code duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org