You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/06/30 18:07:49 UTC

[hudi] branch master updated: [HUDI-3634] Could read empty or partial HoodieCommitMetaData in downstream if using HDFS (#5048)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 62a0c962ac [HUDI-3634] Could read empty or partial HoodieCommitMetaData in downstream if using HDFS (#5048)
62a0c962ac is described below

commit 62a0c962aceae5d1c803d48c943c2155ec3ef5f1
Author: RexAn <bo...@gmail.com>
AuthorDate: Fri Jul 1 02:07:40 2022 +0800

    [HUDI-3634] Could read empty or partial HoodieCommitMetaData in downstream if using HDFS (#5048)
    
    Add the differentiated logic of creating immutable file in HDFS by first creating the file.tmp and then renaming the file
---
 .../hudi/common/fs/HoodieWrapperFileSystem.java    | 64 ++++++++++++++++++++++
 .../table/timeline/HoodieActiveTimeline.java       | 31 +----------
 .../table/timeline/TestHoodieActiveTimeline.java   | 44 +++++++++++++++
 3 files changed, 110 insertions(+), 29 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
index a79d1571af..bceea8e367 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.fs;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -60,6 +61,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hudi.common.fs.StorageSchemes.HDFS;
+
 /**
  * HoodieWrapperFileSystem wraps the default file system. It holds state about the open streams in the file system to
  * support getting the written size to each of the open streams.
@@ -68,6 +71,8 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   public static final String HOODIE_SCHEME_PREFIX = "hoodie-";
 
+  private static final String TMP_PATH_POSTFIX = ".tmp";
+
   protected enum MetricName {
     create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles, read, write
   }
@@ -986,6 +991,65 @@ public class HoodieWrapperFileSystem extends FileSystem {
         file.toString() + " does not have a open stream. Cannot get the bytes written on the stream");
   }
 
+  protected boolean needCreateTempFile() {
+    return HDFS.getScheme().equals(fileSystem.getScheme());
+  }
+
+  /**
+   * Creates a new file with overwrite set to false. This ensures files are created
+   * only once and never rewritten, also, here we take care if the content is not
+   * empty, will first write the content to a temp file if {needCreateTempFile} is
+   * true, and then rename it back after the content is written.
+   *
+   * @param fullPath File Path
+   * @param content Content to be stored
+   */
+  public void createImmutableFileInPath(Path fullPath, Option<byte[]> content)
+      throws HoodieIOException {
+    FSDataOutputStream fsout = null;
+    Path tmpPath = null;
+
+    boolean needTempFile = needCreateTempFile();
+
+    try {
+      if (!content.isPresent()) {
+        fsout = fileSystem.create(fullPath, false);
+      }
+
+      if (content.isPresent() && needTempFile) {
+        Path parent = fullPath.getParent();
+        tmpPath = new Path(parent, fullPath.getName() + TMP_PATH_POSTFIX);
+        fsout = fileSystem.create(tmpPath, false);
+        fsout.write(content.get());
+      }
+
+      if (content.isPresent() && !needTempFile) {
+        fsout = fileSystem.create(fullPath, false);
+        fsout.write(content.get());
+      }
+    } catch (IOException e) {
+      String errorMsg = "Failed to create file" + (tmpPath != null ? tmpPath : fullPath);
+      throw new HoodieIOException(errorMsg, e);
+    } finally {
+      try {
+        if (null != fsout) {
+          fsout.close();
+        }
+      } catch (IOException e) {
+        String errorMsg = "Failed to close file" + (needTempFile ? tmpPath : fullPath);
+        throw new HoodieIOException(errorMsg, e);
+      }
+
+      try {
+        if (null != tmpPath) {
+          fileSystem.rename(tmpPath, fullPath);
+        }
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to rename " + tmpPath + " to the target " + fullPath, e);
+      }
+    }
+  }
+
   public FileSystem getFileSystem() {
     return fileSystem;
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 6e7f6a2430..b3dbe422b1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -30,7 +30,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
@@ -539,7 +538,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
         if (allowRedundantTransitions) {
           FileIOUtils.createFileInPath(metaClient.getFs(), getInstantFileNamePath(toInstant.getFileName()), data);
         } else {
-          createImmutableFileInPath(getInstantFileNamePath(toInstant.getFileName()), data);
+          metaClient.getFs().createImmutableFileInPath(getInstantFileNamePath(toInstant.getFileName()), data);
         }
         LOG.info("Create new file for toInstant ?" + getInstantFileNamePath(toInstant.getFileName()));
       }
@@ -706,33 +705,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
       FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, content);
     } else {
-      createImmutableFileInPath(fullPath, content);
-    }
-  }
-
-  /**
-   * Creates a new file in timeline with overwrite set to false. This ensures
-   * files are created only once and never rewritten
-   * @param fullPath File Path
-   * @param content Content to be stored
-   */
-  private void createImmutableFileInPath(Path fullPath, Option<byte[]> content) {
-    FSDataOutputStream fsout = null;
-    try {
-      fsout = metaClient.getFs().create(fullPath, false);
-      if (content.isPresent()) {
-        fsout.write(content.get());
-      }
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to create file " + fullPath, e);
-    } finally {
-      try {
-        if (null != fsout) {
-          fsout.close();
-        }
-      } catch (IOException e) {
-        throw new HoodieIOException("Failed to close file " + fullPath, e);
-      }
+      metaClient.getFs().createImmutableFileInPath(fullPath, content);
     }
   }
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index 767a16f0c0..5692337471 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.common.table.timeline;
 
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.fs.NoOpConsistencyGuard;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -32,6 +34,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -45,6 +48,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -199,6 +203,25 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
     assertTrue(activeCommitTimeline.isBeforeTimelineStarts("00"));
   }
 
+  @Test
+  public void testAllowTempCommit() {
+    shouldAllowTempCommit(true, hoodieMetaClient -> {
+      timeline = new HoodieActiveTimeline(hoodieMetaClient);
+
+      HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "1");
+      timeline.createNewInstant(instant1);
+
+      byte[] data = "commit".getBytes(StandardCharsets.UTF_8);
+      timeline.saveAsComplete(new HoodieInstant(true, instant1.getAction(),
+          instant1.getTimestamp()), Option.of(data));
+
+      timeline = timeline.reload();
+
+      assertTrue(timeline.getContiguousCompletedWriteTimeline().lastInstant().isPresent());
+      assertEquals(instant1.getTimestamp(), timeline.getContiguousCompletedWriteTimeline().lastInstant().get().getTimestamp());
+    });
+  }
+
   @Test
   public void testGetContiguousCompletedWriteTimeline() {
     // a mock timeline with holes
@@ -594,4 +617,25 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
     }
     return allInstants;
   }
+
+  private void shouldAllowTempCommit(boolean allowTempCommit, Consumer<HoodieTableMetaClient> fun) {
+    if (allowTempCommit) {
+      HoodieWrapperFileSystem fs = metaClient.getFs();
+      HoodieWrapperFileSystem newFs = new HoodieWrapperFileSystem(fs.getFileSystem(), new NoOpConsistencyGuard()) {
+        @Override
+        protected boolean needCreateTempFile() {
+          return true;
+        }
+      };
+      metaClient.setFs(newFs);
+      try {
+        fun.accept(metaClient);
+      } finally {
+        metaClient.setFs(fs);
+      }
+      return;
+    }
+    fun.accept(metaClient);
+  }
+
 }