You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/05/19 02:27:32 UTC

[incubator-hudi] branch master updated: [HUDI-858] Allow multiple operations to be executed within a single commit (#1633)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e6f3bf1  [HUDI-858] Allow multiple operations to be executed within a single commit (#1633)
e6f3bf1 is described below

commit e6f3bf10cf2c62a1008b82765abdcd33cfd64c67
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Mon May 18 19:27:24 2020 -0700

    [HUDI-858] Allow multiple operations to be executed within a single commit (#1633)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 26 ++++++++++++++-
 .../action/commit/BaseCommitActionExecutor.java    |  3 +-
 .../hudi/table/action/commit/BulkInsertHelper.java |  3 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 38 ++++++++++++++++++++++
 .../table/timeline/HoodieActiveTimeline.java       | 20 ++++++++++--
 5 files changed, 84 insertions(+), 6 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 0467657..11931c1 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -99,6 +99,20 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
   private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
 
+  /**
+   * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
+   * multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
+   *
+   * Given Hudi commit protocol, these are generally unsafe operations and user need to handle failure scenarios. It
+   * only works with COW table. Hudi 0.5.x had stopped this behavior.
+   *
+   * Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag
+   * (disabled by default) which will allow this old behavior.
+   */
+  private static final String ALLOW_MULTI_WRITE_ON_SAME_INSTANT =
+      "_.hoodie.allow.multi.write.on.same.instant";
+  private static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false";
+
   private ConsistencyGuardConfig consistencyGuardConfig;
 
   // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
@@ -194,6 +208,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_DELETE_PROP));
   }
 
+  public boolean shouldAllowMultiWriteOnSameInstant() {
+    return Boolean.parseBoolean(props.getProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT));
+  }
+
   public String getWriteStatusClassName() {
     return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
   }
@@ -723,6 +741,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withAllowMultiWriteOnSameInstant(boolean allow) {
+      props.setProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT, String.valueOf(allow));
+      return this;
+    }
+
     public HoodieWriteConfig build() {
       // Check for mandatory properties
       setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM);
@@ -738,6 +761,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           DEFAULT_COMBINE_BEFORE_UPSERT);
       setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_DELETE_PROP), COMBINE_BEFORE_DELETE_PROP,
           DEFAULT_COMBINE_BEFORE_DELETE);
+      setDefaultOnCondition(props, !props.containsKey(ALLOW_MULTI_WRITE_ON_SAME_INSTANT),
+          ALLOW_MULTI_WRITE_ON_SAME_INSTANT, DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT);
       setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), WRITE_STATUS_STORAGE_LEVEL,
           DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
       setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP,
@@ -778,7 +803,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       // Ensure Layout Version is good
       new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
 
-
       // Build WriteConfig at the end
       HoodieWriteConfig config = new HoodieWriteConfig(props);
       Objects.requireNonNull(config.getBasePath());
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index a846de8..0717fd2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -136,7 +136,8 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
       String commitActionType = table.getMetaClient().getCommitActionType();
       HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
       activeTimeline.transitionRequestedToInflight(requested,
-          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)),
+          config.shouldAllowMultiWriteOnSameInstant());
     } catch (IOException io) {
       throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
     }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
index 4755664..782b9aa 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
@@ -73,7 +73,8 @@ public class BulkInsertHelper<T extends HoodieRecordPayload<T>> {
         IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
 
     table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
-        table.getMetaClient().getCommitActionType(), instantTime), Option.empty());
+        table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
+        config.shouldAllowMultiWriteOnSameInstant());
 
     JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
         .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true)
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 223c39d..b50c4cb 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -988,6 +988,44 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     return Pair.of(markerFilePath, result);
   }
 
+  @Test
+  public void testMultiOperationsPerCommit() throws IOException {
+    HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false)
+        .withAllowMultiWriteOnSameInstant(true)
+        .build();
+    HoodieWriteClient client = getHoodieWriteClient(cfg);
+    String firstInstantTime = "0000";
+    client.startCommitWithTime(firstInstantTime);
+    int numRecords = 200;
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(firstInstantTime, numRecords), 1);
+    JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, firstInstantTime);
+    assertTrue(client.commit(firstInstantTime, result), "Commit should succeed");
+    assertTrue(HoodieTestUtils.doesCommitExist(basePath, firstInstantTime),
+        "After explicit commit, commit file should be created");
+
+    // Check the entire dataset has all records still
+    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    }
+    assertEquals(numRecords,
+        HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
+        "Must contain " + numRecords + " records");
+
+    String nextInstantTime = "0001";
+    client.startCommitWithTime(nextInstantTime);
+    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(dataGen.generateUpdates(nextInstantTime, numRecords), 1);
+    JavaRDD<HoodieRecord> insertRecords = jsc.parallelize(dataGen.generateInserts(nextInstantTime, numRecords), 1);
+    JavaRDD<WriteStatus> inserts = client.bulkInsert(insertRecords, nextInstantTime);
+    JavaRDD<WriteStatus> upserts = client.upsert(updateRecords, nextInstantTime);
+    assertTrue(client.commit(nextInstantTime, inserts.union(upserts)), "Commit should succeed");
+    assertTrue(HoodieTestUtils.doesCommitExist(basePath, firstInstantTime),
+        "After explicit commit, commit file should be created");
+    int totalRecords = 2 * numRecords;
+    assertEquals(totalRecords, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
+        "Must contain " + totalRecords + " records");
+  }
+
   /**
    * Build Hoodie Write Config for small data file sizes.
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index c17309d..36e2b3d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -305,11 +305,16 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
   }
 
   private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
+    transitionState(fromInstant, toInstant, data, false);
+  }
+
+  private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data,
+       boolean allowRedundantTransitions) {
     ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
     try {
       if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
         // Re-create the .inflight file by opening a new file and write the commit metadata in
-        createFileInMetaPath(fromInstant.getFileName(), data, false);
+        createFileInMetaPath(fromInstant.getFileName(), data, allowRedundantTransitions);
         Path fromInstantPath = new Path(metaClient.getMetaPath(), fromInstant.getFileName());
         Path toInstantPath = new Path(metaClient.getMetaPath(), toInstant.getFileName());
         boolean success = metaClient.getFs().rename(fromInstantPath, toInstantPath);
@@ -322,7 +327,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
         ValidationUtils.checkArgument(metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
             fromInstant.getFileName())));
         // Use Write Once to create Target File
-        createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
+        if (allowRedundantTransitions) {
+          createFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
+        } else {
+          createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
+        }
         LOG.info("Create new file for toInstant ?" + new Path(metaClient.getMetaPath(), toInstant.getFileName()));
       }
     } catch (IOException e) {
@@ -365,9 +374,14 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
   }
 
   public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content) {
+    transitionRequestedToInflight(requested, content, false);
+  }
+
+  public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content,
+      boolean allowRedundantTransitions) {
     HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, requested.getAction(), requested.getTimestamp());
     ValidationUtils.checkArgument(requested.isRequested(), "Instant " + requested + " in wrong state");
-    transitionState(requested, inflight, content);
+    transitionState(requested, inflight, content, allowRedundantTransitions);
   }
 
   public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> content) {