You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/05/28 00:19:16 UTC
[hudi] 33/40: [HUDI-858] Allow multiple operations to be executed
within a single commit (#1633)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 555ff1aea4d7cc5947e890866b2a384b5409fb3a
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Mon May 18 19:27:24 2020 -0700
[HUDI-858] Allow multiple operations to be executed within a single commit (#1633)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 26 +++++++++++++++-
.../TestHoodieClientOnCopyOnWriteStorage.java | 36 +++++++++++++++++++++-
.../table/timeline/HoodieActiveTimeline.java | 20 ++++++++++--
3 files changed, 77 insertions(+), 5 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f88d96a..24984db 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -96,6 +96,20 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
+ /**
+ * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
+ * multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
+ *
+ * Given Hudi commit protocol, these are generally unsafe operations and user need to handle failure scenarios. It
+ * only works with COW table. Hudi 0.5.x had stopped this behavior.
+ *
+ * Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag
+ * (disabled by default) which will allow this old behavior.
+ */
+ private static final String ALLOW_MULTI_WRITE_ON_SAME_INSTANT =
+ "_.hoodie.allow.multi.write.on.same.instant";
+ private static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false";
+
private ConsistencyGuardConfig consistencyGuardConfig;
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
@@ -187,6 +201,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return StorageLevel.fromString(props.getProperty(WRITE_STATUS_STORAGE_LEVEL));
}
+ public boolean shouldAllowMultiWriteOnSameInstant() {
+ return Boolean.parseBoolean(props.getProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT));
+ }
+
public String getWriteStatusClassName() {
return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
}
@@ -706,6 +724,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withAllowMultiWriteOnSameInstant(boolean allow) {
+ props.setProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT, String.valueOf(allow));
+ return this;
+ }
+
public HoodieWriteConfig build() {
// Check for mandatory properties
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM);
@@ -721,6 +744,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
DEFAULT_COMBINE_BEFORE_UPSERT);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_DELETE_PROP), COMBINE_BEFORE_DELETE_PROP,
DEFAULT_COMBINE_BEFORE_DELETE);
+ setDefaultOnCondition(props, !props.containsKey(ALLOW_MULTI_WRITE_ON_SAME_INSTANT),
+ ALLOW_MULTI_WRITE_ON_SAME_INSTANT, DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT);
setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), WRITE_STATUS_STORAGE_LEVEL,
DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP,
@@ -760,7 +785,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
// Ensure Layout Version is good
new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
-
// Build WriteConfig at the end
HoodieWriteConfig config = new HoodieWriteConfig(props);
Objects.requireNonNull(config.getBasePath());
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index c7da7a7..bba8ecd 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -20,8 +20,8 @@ package org.apache.hudi.client;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRollingStat;
@@ -981,6 +981,40 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
return Pair.of(markerFilePath, result);
}
+ @Test
+ public void testMultiOperationsPerCommit() throws IOException {
+ HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false)
+ .withAllowMultiWriteOnSameInstant(true)
+ .build();
+ HoodieWriteClient client = getHoodieWriteClient(cfg);
+ String firstInstantTime = "0000";
+ client.startCommitWithTime(firstInstantTime);
+ int numRecords = 200;
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(firstInstantTime, numRecords), 1);
+ JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, firstInstantTime);
+ assertTrue("Commit should succeed", client.commit(firstInstantTime, result));
+ assertTrue("After explicit commit, commit file should be created", HoodieTestUtils.doesCommitExist(basePath, firstInstantTime));
+
+ // Check the entire dataset has all records still
+ String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+ for (int i = 0; i < fullPartitionPaths.length; i++) {
+ fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+ }
+ assertEquals("Must contain " + numRecords + " records", numRecords,
+ HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+ String nextInstantTime = "0001";
+ client.startCommitWithTime(nextInstantTime);
+ JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(dataGen.generateUpdates(nextInstantTime, numRecords), 1);
+ JavaRDD<HoodieRecord> insertRecords = jsc.parallelize(dataGen.generateInserts(nextInstantTime, numRecords), 1);
+ JavaRDD<WriteStatus> inserts = client.bulkInsert(insertRecords, nextInstantTime);
+ JavaRDD<WriteStatus> upserts = client.upsert(updateRecords, nextInstantTime);
+ assertTrue("Commit should succeed", client.commit(nextInstantTime, inserts.union(upserts)));
+ assertTrue("After explicit commit, commit file should be created", HoodieTestUtils.doesCommitExist(basePath, firstInstantTime));
+ int totalRecords = 2 * numRecords;
+ assertEquals("Must contain " + totalRecords + " records", totalRecords, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+ }
+
/**
* Build Hoodie Write Config for small data file sizes.
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 389314f..ab16428 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -307,11 +307,16 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
}
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
+ transitionState(fromInstant, toInstant, data, false);
+ }
+
+ private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data,
+ boolean allowRedundantTransitions) {
ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
try {
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
// Re-create the .inflight file by opening a new file and write the commit metadata in
- createFileInMetaPath(fromInstant.getFileName(), data, false);
+ createFileInMetaPath(fromInstant.getFileName(), data, allowRedundantTransitions);
Path fromInstantPath = new Path(metaClient.getMetaPath(), fromInstant.getFileName());
Path toInstantPath = new Path(metaClient.getMetaPath(), toInstant.getFileName());
boolean success = metaClient.getFs().rename(fromInstantPath, toInstantPath);
@@ -324,7 +329,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
ValidationUtils.checkArgument(metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
fromInstant.getFileName())));
// Use Write Once to create Target File
- createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
+ if (allowRedundantTransitions) {
+ createFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
+ } else {
+ createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
+ }
LOG.info("Create new file for toInstant ?" + new Path(metaClient.getMetaPath(), toInstant.getFileName()));
}
} catch (IOException e) {
@@ -367,9 +376,14 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
}
public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content) {
+ transitionRequestedToInflight(requested, content, false);
+ }
+
+ public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content,
+ boolean allowRedundantTransitions) {
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, requested.getAction(), requested.getTimestamp());
ValidationUtils.checkArgument(requested.isRequested(), "Instant " + requested + " in wrong state");
- transitionState(requested, inflight, content);
+ transitionState(requested, inflight, content, allowRedundantTransitions);
}
public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> content) {