You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/06/28 09:05:00 UTC
[hudi] branch master updated: [HUDI-855] Run Cleaner async with
writing (#1577)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8919be6 [HUDI-855] Run Cleaner async with writing (#1577)
8919be6 is described below
commit 8919be6a5d8038db7265bfd7459d72fbd545f133
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Sun Jun 28 02:04:50 2020 -0700
[HUDI-855] Run Cleaner async with writing (#1577)
- Cleaner can now run concurrently with write operation
- Configs to turn on/off
Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
.../hudi/cli/commands/TestCleansCommand.java | 28 +++---
.../apache/hudi/cli/integ/ITTestCleansCommand.java | 106 ---------------------
.../HoodieTestCommitMetadataGenerator.java | 20 +++-
.../apache/hudi/async/AbstractAsyncService.java | 22 +++--
.../apache/hudi/client/AsyncCleanerService.java | 85 +++++++++++++++++
.../org/apache/hudi/client/HoodieWriteClient.java | 50 +++++++---
.../apache/hudi/config/HoodieCompactionConfig.java | 10 ++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../table/action/clean/CleanActionExecutor.java | 5 +-
.../java/org/apache/hudi/table/TestCleaner.java | 50 +++++-----
.../deltastreamer/HoodieDeltaStreamer.java | 5 +-
11 files changed, 207 insertions(+), 178 deletions(-)
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
index 69aa5b3..c14cf0b 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
@@ -26,7 +26,6 @@ import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -36,6 +35,8 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
@@ -43,11 +44,10 @@ import org.springframework.shell.core.CommandResult;
import java.io.File;
import java.io.IOException;
import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -77,6 +77,10 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
Configuration conf = HoodieCLI.conf;
metaClient = HoodieCLI.getTableMetaClient();
+ String fileId1 = UUID.randomUUID().toString();
+ String fileId2 = UUID.randomUUID().toString();
+ HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath);
+
// Create four commits
for (int i = 100; i < 104; i++) {
String timestamp = String.valueOf(i);
@@ -86,7 +90,8 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
// Inflight Compaction
HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
- HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf);
+ HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf, fileId1, fileId2,
+ Option.empty(), Option.empty());
}
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -103,9 +108,6 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
assertNotNull(propsFilePath, "Not found properties file");
// First, run clean
- Files.createFile(Paths.get(tablePath,
- HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
- HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.getPath(), new ArrayList<>());
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
"Loaded 1 clean and the count should match");
@@ -125,7 +127,7 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
// EarliestCommandRetained should be 102, since hoodie.cleaner.commits.retained=2
// Total Time Taken need read from metadata
- rows.add(new Comparable[] {clean.getTimestamp(), "102", "0", getLatestCleanTimeTakenInMillis().toString()});
+ rows.add(new Comparable[] {clean.getTimestamp(), "102", "2", getLatestCleanTimeTakenInMillis().toString()});
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
expected = removeNonWordAndStripSpace(expected);
@@ -142,12 +144,6 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
assertNotNull(propsFilePath, "Not found properties file");
// First, run clean with two partition
- Files.createFile(Paths.get(tablePath,
- HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
- HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
- Files.createFile(Paths.get(tablePath,
- HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
- HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.toString(), new ArrayList<>());
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
"Loaded 1 clean and the count should match");
@@ -165,9 +161,11 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
// There should be two partition path
List<Comparable[]> rows = new ArrayList<>();
rows.add(new Comparable[] {HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
+ HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "1", "0"});
+ rows.add(new Comparable[] {HoodieTestCommitMetadataGenerator.DEFAULT_THIRD_PARTITION_PATH,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "0", "0"});
rows.add(new Comparable[] {HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
- HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "0", "0"});
+ HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "1", "0"});
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
expected = removeNonWordAndStripSpace(expected);
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java
deleted file mode 100644
index 1f6f6c7..0000000
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cli.integ;
-
-import org.apache.hudi.cli.HoodieCLI;
-import org.apache.hudi.cli.commands.TableCommand;
-import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
-import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.shell.core.CommandResult;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class ITTestCleansCommand extends AbstractShellIntegrationTest {
-
- private String tablePath;
- private URL propsFilePath;
-
- @BeforeEach
- public void init() throws IOException {
- HoodieCLI.conf = jsc.hadoopConfiguration();
-
- String tableName = "test_table";
- tablePath = basePath + File.separator + tableName;
- propsFilePath = this.getClass().getClassLoader().getResource("clean.properties");
-
- // Create table and connect
- new TableCommand().createTable(
- tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
- "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
-
- Configuration conf = HoodieCLI.conf;
-
- metaClient = HoodieCLI.getTableMetaClient();
- // Create four commits
- for (int i = 100; i < 104; i++) {
- String timestamp = String.valueOf(i);
- // Requested Compaction
- HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
- new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
- // Inflight Compaction
- HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
- new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
- HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf);
- }
- }
-
- /**
- * Test case for cleans run.
- */
- @Test
- public void testRunClean() throws IOException {
- // First, there should none of clean instant.
- assertEquals(0, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count());
-
- // Check properties file exists.
- assertNotNull(propsFilePath, "Not found properties file");
-
- // Create partition metadata
- Files.createFile(Paths.get(tablePath,
- HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
- HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
- Files.createFile(Paths.get(tablePath,
- HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
- HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-
- CommandResult cr = getShell().executeCommand("cleans run --sparkMaster local --propsFilePath " + propsFilePath.toString());
- assertTrue(cr.isSuccess());
-
- // After run clean, there should have 1 clean instant
- assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
- "Loaded 1 clean and the count should match");
- }
-}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
index bdf623e..94904c5 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
@@ -18,6 +18,7 @@
package org.apache.hudi.cli.testutils;
+import java.util.UUID;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -67,6 +68,12 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
Option<Integer> writes, Option<Integer> updates) {
+ createCommitFileWithMetadata(basePath, commitTime, configuration, UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(), writes, updates);
+ }
+
+ public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
+ String fileId1, String fileId2, Option<Integer> writes, Option<Integer> updates) {
Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
HoodieTimeline.makeRequestedCommitFileName(commitTime))
.forEach(f -> {
@@ -77,7 +84,8 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
FileSystem fs = FSUtils.getFs(basePath, configuration);
os = fs.create(commitFile, true);
// Generate commitMetadata
- HoodieCommitMetadata commitMetadata = generateCommitMetadata(basePath, commitTime, writes, updates);
+ HoodieCommitMetadata commitMetadata =
+ generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates);
// Write empty commit metadata
os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException ioe) {
@@ -103,8 +111,14 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime,
Option<Integer> writes, Option<Integer> updates) throws IOException {
- String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime);
- String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime);
+ return generateCommitMetadata(basePath, commitTime, UUID.randomUUID().toString(), UUID.randomUUID().toString(),
+ writes, updates);
+ }
+
+ public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, String fileId1,
+ String fileId2, Option<Integer> writes, Option<Integer> updates) throws IOException {
+ String file1P0C0 = HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId1);
+ String file1P1C0 = HoodieTestUtils.createDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime, fileId2);
return generateCommitMetadata(new HashMap<String, List<String>>() {
{
put(DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0));
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
similarity index 88%
rename from hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
rename to hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
index 8fe1a71..7ac236d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
+++ b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.utilities.deltastreamer;
+package org.apache.hudi.async;
import org.apache.hudi.common.util.collection.Pair;
@@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
- * Base Class for running delta-sync/compaction in separate thread and controlling their life-cycle.
+ * Base Class for running clean/delta-sync/compaction in separate thread and controlling their life-cycle.
*/
-public abstract class AbstractDeltaStreamerService implements Serializable {
+public abstract class AbstractAsyncService implements Serializable {
- private static final Logger LOG = LogManager.getLogger(AbstractDeltaStreamerService.class);
+ private static final Logger LOG = LogManager.getLogger(AbstractAsyncService.class);
// Flag to track if the service is started.
private boolean started;
@@ -49,15 +49,15 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
// Future tracking delta-sync/compaction
private transient CompletableFuture future;
- AbstractDeltaStreamerService() {
+ protected AbstractAsyncService() {
shutdownRequested = false;
}
- boolean isShutdownRequested() {
+ protected boolean isShutdownRequested() {
return shutdownRequested;
}
- boolean isShutdown() {
+ protected boolean isShutdown() {
return shutdown;
}
@@ -67,7 +67,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
* @throws ExecutionException
* @throws InterruptedException
*/
- void waitForShutdown() throws ExecutionException, InterruptedException {
+ public void waitForShutdown() throws ExecutionException, InterruptedException {
try {
future.get();
} catch (ExecutionException ex) {
@@ -82,7 +82,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
*
* @param force Forcefully shutdown
*/
- void shutdown(boolean force) {
+ public void shutdown(boolean force) {
if (!shutdownRequested || force) {
shutdownRequested = true;
if (executor != null) {
@@ -145,7 +145,9 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
} finally {
// Mark as shutdown
shutdown = true;
- onShutdownCallback.apply(error);
+ if (null != onShutdownCallback) {
+ onShutdownCallback.apply(error);
+ }
}
});
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
new file mode 100644
index 0000000..6367e79
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AbstractAsyncService;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Clean service running concurrently with write operation.
+ */
+class AsyncCleanerService extends AbstractAsyncService {
+
+ private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);
+
+ private final HoodieWriteClient<?> writeClient;
+ private final String cleanInstantTime;
+ private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ protected AsyncCleanerService(HoodieWriteClient<?> writeClient, String cleanInstantTime) {
+ this.writeClient = writeClient;
+ this.cleanInstantTime = cleanInstantTime;
+ }
+
+ @Override
+ protected Pair<CompletableFuture, ExecutorService> startService() {
+ return Pair.of(CompletableFuture.supplyAsync(() -> {
+ writeClient.clean(cleanInstantTime);
+ return true;
+ }), executor);
+ }
+
+ public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient,
+ String instantTime) {
+ AsyncCleanerService asyncCleanerService = null;
+ if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
+ LOG.info("Auto cleaning is enabled. Running cleaner async to write operation");
+ asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
+ asyncCleanerService.start(null);
+ } else {
+ LOG.info("Auto cleaning is not enabled. Not running cleaner now");
+ }
+ return asyncCleanerService;
+ }
+
+ public static void waitForCompletion(AsyncCleanerService asyncCleanerService) {
+ if (asyncCleanerService != null) {
+ LOG.info("Waiting for async cleaner to finish");
+ try {
+ asyncCleanerService.waitForShutdown();
+ } catch (Exception e) {
+ throw new HoodieException("Error waiting for async cleaning to finish", e);
+ }
+ }
+ }
+
+ public static void forceShutdown(AsyncCleanerService asyncCleanerService) {
+ if (asyncCleanerService != null) {
+ LOG.info("Shutting down async cleaner");
+ asyncCleanerService.shutdown(true);
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 8f562ea..a6d9d0d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -80,6 +80,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private final boolean rollbackPending;
private final transient HoodieMetrics metrics;
private transient Timer.Context compactionTimer;
+ private transient AsyncCleanerService asyncCleanerService;
/**
* Create a write client, without cleaning up failed/inflight commits.
@@ -95,28 +96,28 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* Create a write client, with new hudi index.
*
* @param jsc Java Spark Context
- * @param clientConfig instance of HoodieWriteConfig
+ * @param writeConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
*/
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending) {
- this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig));
+ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending) {
+ this(jsc, writeConfig, rollbackPending, HoodieIndex.createIndex(writeConfig));
}
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
- this(jsc, clientConfig, rollbackPending, index, Option.empty());
+ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending, HoodieIndex index) {
+ this(jsc, writeConfig, rollbackPending, index, Option.empty());
}
/**
* Create a write client, allows to specify all parameters.
*
* @param jsc Java Spark Context
- * @param clientConfig instance of HoodieWriteConfig
+ * @param writeConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
* @param timelineService Timeline Service that runs as part of write client.
*/
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending,
+ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending,
HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
- super(jsc, index, clientConfig, timelineService);
+ super(jsc, index, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;
}
@@ -158,6 +159,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -178,6 +180,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT_PREPPED);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@@ -196,6 +199,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
return postWrite(result, instantTime, table);
}
@@ -215,6 +219,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_PREPPED);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@@ -254,6 +259,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
@@ -279,6 +285,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
@@ -338,15 +345,27 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true));
archiveLog.archiveIfRequired(hadoopConf);
- if (config.isAutoClean()) {
- // Call clean to cleanup if there is anything to cleanup after the commit,
+ autoCleanOnCommit(instantTime);
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ /**
+ * Handle auto clean during commit.
+ * @param instantTime
+ */
+ private void autoCleanOnCommit(String instantTime) {
+ if (config.isAutoClean()) {
+ // Call clean to cleanup if there is anything to cleanup after the commit,
+ if (config.isAsyncClean()) {
+ LOG.info("Cleaner has been spawned already. Waiting for it to finish");
+ AsyncCleanerService.waitForCompletion(asyncCleanerService);
+ LOG.info("Cleaner has finished");
+ } else {
LOG.info("Auto cleaning is enabled. Running cleaner now");
clean(instantTime);
- } else {
- LOG.info("Auto cleaning is not enabled. Not running cleaner now");
}
- } catch (IOException ioe) {
- throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
@@ -477,7 +496,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
@Override
public void close() {
- // Stop timeline-server if running
+ AsyncCleanerService.forceShutdown(asyncCleanerService);
+ asyncCleanerService = null;
super.close();
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 4ec0485..1b993b1 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -40,6 +40,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy";
public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
+ public static final String ASYNC_CLEAN_PROP = "hoodie.clean.async";
+
// Turn on inline compaction - after fw delta commits a inline compaction will be run
public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline";
// Run a compaction every N delta commits
@@ -101,6 +103,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = "false";
private static final String DEFAULT_CLEANER_POLICY = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
private static final String DEFAULT_AUTO_CLEAN = "true";
+ private static final String DEFAULT_ASYNC_CLEAN = "false";
private static final String DEFAULT_INLINE_COMPACT = "false";
private static final String DEFAULT_INCREMENTAL_CLEANER = "true";
private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "5";
@@ -143,6 +146,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withAsyncClean(Boolean asyncClean) {
+ props.setProperty(ASYNC_CLEAN_PROP, String.valueOf(asyncClean));
+ return this;
+ }
+
public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
props.setProperty(CLEANER_INCREMENTAL_MODE, String.valueOf(incrementalCleaningMode));
return this;
@@ -247,6 +255,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
public HoodieCompactionConfig build() {
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN);
+ setDefaultOnCondition(props, !props.containsKey(ASYNC_CLEAN_PROP), ASYNC_CLEAN_PROP,
+ DEFAULT_ASYNC_CLEAN);
setDefaultOnCondition(props, !props.containsKey(CLEANER_INCREMENTAL_MODE), CLEANER_INCREMENTAL_MODE,
DEFAULT_INCREMENTAL_CLEANER);
setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_PROP), INLINE_COMPACT_PROP,
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 083d780..3b822f0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -296,6 +296,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP));
}
+ public boolean isAsyncClean() {
+ return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.ASYNC_CLEAN_PROP));
+ }
+
public boolean incrementalCleanerModeEnabled() {
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.CLEANER_INCREMENTAL_MODE));
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index b3caa44..57feebc 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -185,8 +185,9 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
final HoodieCleanerPlan cleanerPlan = requestClean(jsc);
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
- && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
-
+ && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()
+ && cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
+ // Only create cleaner plan which does some work
final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
// Save to both aux and timeline folder
try {
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 82f911d..541f84f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -51,6 +51,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
@@ -83,6 +84,7 @@ import scala.Tuple3;
import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -128,15 +130,8 @@ public class TestCleaner extends HoodieClientTestBase {
HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), hadoopConf);
assertFalse(table.getCompletedCommitsTimeline().empty());
- if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) {
- // We no longer write empty cleaner plans when there are not enough commits present
- assertTrue(table.getCompletedCleanTimeline().empty());
- } else {
- String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
- assertFalse(table.getCompletedCleanTimeline().empty());
- assertEquals(instantTime, table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(),
- "The clean instant should be the same as the commit instant");
- }
+ // We no longer write empty cleaner plans when there is nothing to be cleaned.
+ assertTrue(table.getCompletedCleanTimeline().empty());
HoodieIndex index = HoodieIndex.createIndex(cfg);
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
@@ -439,17 +434,32 @@ public class TestCleaner extends HoodieClientTestBase {
if (simulateRetryFailure) {
HoodieInstant completedCleanInstant = new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs);
+ HoodieCleanMetadata metadata = CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant);
+ metadata.getPartitionMetadata().values().forEach(p -> {
+ String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath();
+ p.getSuccessDeleteFiles().forEach(p2 -> {
+ try {
+ metaClient.getFs().create(new Path(dirPath, p2), true);
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ });
+ });
metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);
- HoodieCleanMetadata cleanMetadata2 = writeClient.clean(getNextInstant());
+ HoodieCleanMetadata newCleanMetadata = writeClient.clean(getNextInstant());
+ // No new clean metadata would be created. Only the previous one will be retried
+ assertNull(newCleanMetadata);
+ HoodieCleanMetadata cleanMetadata2 = CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant);
assertEquals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain());
- assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted());
+ assertEquals(cleanMetadata1.getTotalFilesDeleted(), cleanMetadata2.getTotalFilesDeleted());
assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), cleanMetadata2.getPartitionMetadata().keySet());
final HoodieCleanMetadata retriedCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), completedCleanInstant);
cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> {
HoodieCleanPartitionMetadata p1 = cleanMetadata1.getPartitionMetadata().get(k);
HoodieCleanPartitionMetadata p2 = retriedCleanMetadata.getPartitionMetadata().get(k);
assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns());
- assertEquals(p1.getSuccessDeleteFiles(), p2.getFailedDeleteFiles());
+ assertEquals(p1.getSuccessDeleteFiles(), p2.getSuccessDeleteFiles());
+ assertEquals(p1.getFailedDeleteFiles(), p2.getFailedDeleteFiles());
assertEquals(p1.getPartitionPath(), p2.getPartitionPath());
assertEquals(k, p1.getPartitionPath());
});
@@ -487,12 +497,7 @@ public class TestCleaner extends HoodieClientTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
- assertEquals(0,
- getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
- .size(), "Must not clean any files");
- assertEquals(0,
- getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
- .size(), "Must not clean any files");
+ assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
@@ -548,9 +553,7 @@ public class TestCleaner extends HoodieClientTestBase {
// No cleaning on partially written file, with no commit.
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
- assertEquals(0,
- getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
- .size(), "Must not clean any files");
+ assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
file3P0C2));
}
@@ -819,11 +822,8 @@ public class TestCleaner extends HoodieClientTestBase {
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry);
- assertEquals(0,
- getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
- .getSuccessDeleteFiles().size(),
+ assertEquals(0, hoodieCleanStatsThree.size(),
"Must not clean any file. We have to keep 1 version before the latest commit time to keep");
-
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
file1P0C0));
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index a3d81fa..ccd5c49 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.async.AbstractAsyncService;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
@@ -326,7 +327,7 @@ public class HoodieDeltaStreamer implements Serializable {
/**
* Syncs data either in single-run or in continuous mode.
*/
- public static class DeltaSyncService extends AbstractDeltaStreamerService {
+ public static class DeltaSyncService extends AbstractAsyncService {
private static final long serialVersionUID = 1L;
/**
@@ -532,7 +533,7 @@ public class HoodieDeltaStreamer implements Serializable {
/**
* Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time.
*/
- public static class AsyncCompactService extends AbstractDeltaStreamerService {
+ public static class AsyncCompactService extends AbstractAsyncService {
private static final long serialVersionUID = 1L;
private final int maxConcurrentCompaction;