You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/07/23 04:22:18 UTC
[hudi] branch master updated: [HUDI-1029] In inline compaction mode,
previously failed compactions needs to be retried before new compactions
(#1857)
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a8bd76c [HUDI-1029] In inline compaction mode, previously failed compactions needs to be retried before new compactions (#1857)
a8bd76c is described below
commit a8bd76c299b13af93c9eed92309ecb3a86a6734c
Author: vinoth chandar <vi...@users.noreply.github.com>
AuthorDate: Wed Jul 22 21:22:06 2020 -0700
[HUDI-1029] In inline compaction mode, previously failed compactions needs to be retried before new compactions (#1857)
- Prevents failed compactions from causing issues with future commits
---
.../org/apache/hudi/client/HoodieWriteClient.java | 9 +
.../compact/ScheduleCompactionActionExecutor.java | 2 +-
.../table/action/compact/CompactionTestBase.java | 243 +++++++++++++++++++++
.../table/action/compact/TestAsyncCompaction.java | 213 +-----------------
.../table/action/compact/TestInlineCompaction.java | 116 ++++++++++
5 files changed, 372 insertions(+), 211 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 30dfecb..2486d91 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -341,6 +341,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// Do an inline compaction if enabled
if (config.isInlineCompaction()) {
+ runAnyPendingCompactions(table);
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
inlineCompact(extraMetadata);
} else {
@@ -355,6 +356,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
}
+ private void runAnyPendingCompactions(HoodieTable<?> table) {
+ table.getActiveTimeline().getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().getInstants()
+ .forEach(instant -> {
+ LOG.info("Running previously failed inflight compaction at instant " + instant);
+ compact(instant.getTimestamp(), true);
+ });
+ }
+
/**
* Handle auto clean during commit.
* @param instantTime
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 174c64e..6d88e5c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -65,7 +65,7 @@ public class ScheduleCompactionActionExecutor extends BaseActionExecutor<Option<
int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline()
.findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
- LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
+ LOG.info("Not scheduling compaction as only " + deltaCommitsSinceLastCompaction
+ " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
+ config.getInlineCompactDeltaCommitMax());
return new HoodieCompactionPlan();
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
new file mode 100644
index 0000000..c171255
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CompactionTestBase extends HoodieClientTestBase {
+
+ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
+ return HoodieWriteConfig.newBuilder().withPath(basePath)
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
+ .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
+ .forTable("test-trip-table")
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+ .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+ }
+
+ /**
+ * HELPER METHODS FOR TESTING.
+ **/
+ protected void validateDeltaCommit(String latestDeltaCommit, final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation,
+ HoodieWriteConfig cfg) {
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ HoodieTable table = getHoodieTable(metaClient, cfg);
+ List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
+ fileSliceList.forEach(fileSlice -> {
+ Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId());
+ if (opPair != null) {
+ assertEquals(fileSlice.getBaseInstantTime(), opPair.getKey(), "Expect baseInstant to match compaction Instant");
+ assertTrue(fileSlice.getLogFiles().count() > 0,
+ "Expect atleast one log file to be present where the latest delta commit was written");
+ assertFalse(fileSlice.getBaseFile().isPresent(), "Expect no data-file to be present");
+ } else {
+ assertTrue(fileSlice.getBaseInstantTime().compareTo(latestDeltaCommit) <= 0,
+ "Expect baseInstant to be less than or equal to latestDeltaCommit");
+ }
+ });
+ }
+
+ protected List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, final HoodieReadClient readClient, List<String> deltaInstants,
+ List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants)
+ throws Exception {
+
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions();
+ List<String> gotPendingCompactionInstants =
+ pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList());
+ assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants);
+
+ Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation =
+ CompactionUtils.getAllPendingCompactionOperations(metaClient);
+
+ if (insertFirst) {
+ // Use first instant for inserting records
+ String firstInstant = deltaInstants.get(0);
+ deltaInstants = deltaInstants.subList(1, deltaInstants.size());
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(firstInstant);
+ JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, firstInstant);
+ List<WriteStatus> statusList = statuses.collect();
+
+ if (!cfg.shouldAutoCommit()) {
+ client.commit(firstInstant, statuses);
+ }
+ assertNoWriteErrors(statusList);
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
+ List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
+ assertTrue(dataFilesToRead.stream().findAny().isPresent(),
+ "should list the parquet files we wrote in the delta commit");
+ validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg);
+ }
+
+ int numRecords = records.size();
+ for (String instantTime : deltaInstants) {
+ records = dataGen.generateUpdates(instantTime, numRecords);
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false);
+ validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
+ }
+ return records;
+ }
+
+ protected void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) {
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+ metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
+ HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
+ .filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get();
+ assertTrue(instant.isInflight(), "Instant must be marked inflight");
+ }
+
+ protected void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg) {
+ client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
+ assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set");
+ }
+
+ protected void scheduleAndExecuteCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
+ HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
+ scheduleCompaction(compactionInstantTime, client, cfg);
+ executeCompaction(compactionInstantTime, client, table, cfg, expectedNumRecs, hasDeltaCommitAfterPendingCompaction);
+ }
+
+ protected void executeCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
+ HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
+
+ client.compact(compactionInstantTime);
+ List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
+ assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
+ assertFalse(fileSliceList.stream()
+ .anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)),
+ "Verify all file-slices have base-instant same as compaction instant");
+ assertFalse(fileSliceList.stream().anyMatch(fs -> !fs.getBaseFile().isPresent()),
+ "Verify all file-slices have data-files");
+
+ if (hasDeltaCommitAfterPendingCompaction) {
+ assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0),
+ "Verify all file-slices have atleast one log-file");
+ } else {
+ assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0),
+ "Verify all file-slices have no log-files");
+ }
+
+ // verify that there is a commit
+ table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
+ HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
+ String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
+ assertEquals(latestCompactionCommitTime, compactionInstantTime,
+ "Expect compaction instant time to be the latest commit time");
+ assertEquals(expectedNumRecs,
+ HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
+ "Must contain expected records");
+
+ }
+
+ protected List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records, HoodieWriteClient client,
+ HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) {
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+ client.startCommitWithTime(instantTime);
+
+ JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime);
+ List<WriteStatus> statusList = statuses.collect();
+ assertNoWriteErrors(statusList);
+ if (!cfg.shouldAutoCommit() && !skipCommit) {
+ client.commit(instantTime, statuses);
+ }
+
+ Option<HoodieInstant> deltaCommit =
+ metaClient.getActiveTimeline().reload().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
+ if (skipCommit && !cfg.shouldAutoCommit()) {
+ assertTrue(deltaCommit.get().getTimestamp().compareTo(instantTime) < 0,
+ "Delta commit should not be latest instant");
+ } else {
+ assertTrue(deltaCommit.isPresent());
+ assertEquals(instantTime, deltaCommit.get().getTimestamp(), "Delta commit should be latest instant");
+ }
+ return statusList;
+ }
+
+ protected List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
+ FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
+ HoodieTableFileSystemView view =
+ getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
+ return view.getLatestBaseFiles().collect(Collectors.toList());
+ }
+
+ protected List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) {
+ HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(),
+ table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
+ return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)
+ .flatMap(view::getLatestFileSlices).collect(Collectors.toList());
+ }
+
+ protected HoodieTableType getTableType() {
+ return HoodieTableType.MERGE_ON_READ;
+ }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index bf37a88..81840b9 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -18,77 +18,36 @@
package org.apache.hudi.table.action.compact;
-import org.apache.hudi.avro.model.HoodieCompactionOperation;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
-import org.apache.hudi.common.table.view.FileSystemViewStorageType;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.CompactionUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.testutils.HoodieClientTestBase;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
-import org.apache.hudi.testutils.HoodieTestDataGenerator;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
-import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test Cases for Async Compaction and Ingestion interaction.
*/
-public class TestAsyncCompaction extends HoodieClientTestBase {
+public class TestAsyncCompaction extends CompactionTestBase {
private HoodieWriteConfig getConfig(Boolean autoCommit) {
return getConfigBuilder(autoCommit).build();
}
- private HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
- return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
- .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
- .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
- .forTable("test-trip-table")
- .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
- .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
- .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
- }
-
@Test
public void testRollbackForInflightCompaction() throws Exception {
// Rollback inflight compaction
@@ -372,170 +331,4 @@ public class TestAsyncCompaction extends HoodieClientTestBase {
executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true);
}
}
-
- /**
- * HELPER METHODS FOR TESTING.
- **/
-
- private void validateDeltaCommit(String latestDeltaCommit,
- final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation,
- HoodieWriteConfig cfg) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
- HoodieTable table = getHoodieTable(metaClient, cfg);
- List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
- fileSliceList.forEach(fileSlice -> {
- Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId());
- if (opPair != null) {
- assertEquals(fileSlice.getBaseInstantTime(), opPair.getKey(), "Expect baseInstant to match compaction Instant");
- assertTrue(fileSlice.getLogFiles().count() > 0,
- "Expect atleast one log file to be present where the latest delta commit was written");
- assertFalse(fileSlice.getBaseFile().isPresent(), "Expect no data-file to be present");
- } else {
- assertTrue(fileSlice.getBaseInstantTime().compareTo(latestDeltaCommit) <= 0,
- "Expect baseInstant to be less than or equal to latestDeltaCommit");
- }
- });
- }
-
- private List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, final HoodieReadClient readClient, List<String> deltaInstants,
- List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants)
- throws Exception {
-
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
- List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions();
- List<String> gotPendingCompactionInstants =
- pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList());
- assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants);
-
- Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation =
- CompactionUtils.getAllPendingCompactionOperations(metaClient);
-
- if (insertFirst) {
- // Use first instant for inserting records
- String firstInstant = deltaInstants.get(0);
- deltaInstants = deltaInstants.subList(1, deltaInstants.size());
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- client.startCommitWithTime(firstInstant);
- JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, firstInstant);
- List<WriteStatus> statusList = statuses.collect();
-
- if (!cfg.shouldAutoCommit()) {
- client.commit(firstInstant, statuses);
- }
- assertNoWriteErrors(statusList);
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
- HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
- List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
- assertTrue(dataFilesToRead.stream().findAny().isPresent(),
- "should list the parquet files we wrote in the delta commit");
- validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg);
- }
-
- int numRecords = records.size();
- for (String instantTime : deltaInstants) {
- records = dataGen.generateUpdates(instantTime, numRecords);
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
- createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false);
- validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
- }
- return records;
- }
-
- private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
- HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
- metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
- HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
- .filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get();
- assertTrue(instant.isInflight(), "Instant must be marked inflight");
- }
-
- private void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg)
- throws IOException {
- client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
- HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
- assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set");
- }
-
- private void scheduleAndExecuteCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
- HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
- scheduleCompaction(compactionInstantTime, client, cfg);
- executeCompaction(compactionInstantTime, client, table, cfg, expectedNumRecs, hasDeltaCommitAfterPendingCompaction);
- }
-
- private void executeCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
- HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
-
- client.compact(compactionInstantTime);
- List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
- assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
- assertFalse(fileSliceList.stream()
- .anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)),
- "Verify all file-slices have base-instant same as compaction instant");
- assertFalse(fileSliceList.stream().anyMatch(fs -> !fs.getBaseFile().isPresent()),
- "Verify all file-slices have data-files");
-
- if (hasDeltaCommitAfterPendingCompaction) {
- assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0),
- "Verify all file-slices have atleast one log-file");
- } else {
- assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0),
- "Verify all file-slices have no log-files");
- }
-
- // verify that there is a commit
- table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
- HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
- String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
- assertEquals(latestCompactionCommitTime, compactionInstantTime,
- "Expect compaction instant time to be the latest commit time");
- assertEquals(expectedNumRecs,
- HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
- "Must contain expected records");
-
- }
-
- private List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records,
- HoodieWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) {
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
- client.startCommitWithTime(instantTime);
-
- JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime);
- List<WriteStatus> statusList = statuses.collect();
- assertNoWriteErrors(statusList);
- if (!cfg.shouldAutoCommit() && !skipCommit) {
- client.commit(instantTime, statuses);
- }
-
- Option<HoodieInstant> deltaCommit =
- metaClient.getActiveTimeline().reload().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
- if (skipCommit && !cfg.shouldAutoCommit()) {
- assertTrue(deltaCommit.get().getTimestamp().compareTo(instantTime) < 0,
- "Delta commit should not be latest instant");
- } else {
- assertTrue(deltaCommit.isPresent());
- assertEquals(instantTime, deltaCommit.get().getTimestamp(), "Delta commit should be latest instant");
- }
- return statusList;
- }
-
- private List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
- FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
- HoodieTableFileSystemView view =
- getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
- return view.getLatestBaseFiles().collect(Collectors.toList());
- }
-
- private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) {
- HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(),
- table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
- return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)
- .flatMap(view::getLatestFileSlices).collect(Collectors.toList());
- }
-
- protected HoodieTableType getTableType() {
- return HoodieTableType.MERGE_ON_READ;
- }
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
new file mode 100644
index 0000000..4cbb461
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestInlineCompaction extends CompactionTestBase {
+
+ private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits) {
+ return getConfigBuilder(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
+ .build();
+ }
+
+ @Test
+ public void testCompactionIsNotScheduledEarly() throws Exception {
+ // Given: make two commits
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
+ try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records = dataGen.generateInserts("000", 100);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, Arrays.asList("000", "001"), records, cfg, true, new ArrayList<>());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+
+ // Then: ensure no compaction is executedm since there are only 2 delta commits
+ assertEquals(2, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ }
+ }
+
+ @Test
+ public void testSuccessfulCompaction() throws Exception {
+ // Given: make three commits
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
+ List<String> instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+
+ try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 100);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
+
+ // third commit, that will trigger compaction
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, false);
+
+ // Then: ensure the file slices are compacted as per policy
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction());
+ }
+ }
+
+ @Test
+ public void testCompactionRetryOnFailure() throws Exception {
+ // Given: two commits, schedule compaction and its failed/in-flight
+ HoodieWriteConfig cfg = getConfigBuilder(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .build();
+ List<String> instants = CollectionUtils.createImmutableList("000", "001");
+ try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 100);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
+ // Schedule compaction 002, make it in-flight (simulates inline compaction failing)
+ scheduleCompaction("002", writeClient, cfg);
+ moveCompactionFromRequestedToInflight("002", cfg);
+ }
+
+ // When: a third commit happens
+ HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2);
+ try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) {
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ createNextDeltaCommit("003", dataGen.generateUpdates("003", 100), writeClient, metaClient, inlineCfg, false);
+ }
+
+ // Then: 1 delta commit is done, the failed compaction is retried
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ assertEquals("002", metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+ }
+}