You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/10/30 00:37:43 UTC
[hudi] branch master updated: [HUDI-2654] Schedules the compaction
from earliest for flink (#3891)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 92a3c45 [HUDI-2654] Schedules the compaction from earliest for flink (#3891)
92a3c45 is described below
commit 92a3c458bde7ca4d2bb72f4dbe486073f6a5ec4f
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sat Oct 30 08:37:30 2021 +0800
[HUDI-2654] Schedules the compaction from earliest for flink (#3891)
---
.../apache/hudi/configuration/FlinkOptions.java | 6 ++
.../hudi/sink/bootstrap/BootstrapOperator.java | 14 +---
.../hudi/sink/compact/CompactionPlanOperator.java | 29 ++++----
.../java/org/apache/hudi/util/CompactionUtil.java | 19 +++++-
.../java/org/apache/hudi/util/FlinkTables.java | 77 ++++++++++++++++++++++
.../org/apache/hudi/utils/TestCompactionUtil.java | 75 ++++++++++++++++-----
6 files changed, 178 insertions(+), 42 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 6f8c1ff..65a95ed 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -492,6 +492,12 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(3600) // default 1 hour
.withDescription("Max delta seconds time needed to trigger compaction, default 1 hour");
+ public static final ConfigOption<Integer> COMPACTION_TIMEOUT_SECONDS = ConfigOptions
+ .key("compaction.timeout.seconds")
+ .intType()
+ .defaultValue(1200) // default 20 minutes
+ .withDescription("Max timeout time in seconds for online compaction to rollback, default 20 minutes");
+
public static final ConfigOption<Integer> COMPACTION_MAX_MEMORY = ConfigOptions
.key("compaction.max_memory")
.intType()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 81ab836..76e6695 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -18,9 +18,6 @@
package org.apache.hudi.sink.bootstrap;
-import org.apache.hudi.client.FlinkTaskContextSupplier;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
@@ -37,9 +34,9 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.util.FlinkTables;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
@@ -119,7 +116,7 @@ public class BootstrapOperator<I, O extends HoodieRecord>
this.hadoopConf = StreamerUtil.getHadoopConf();
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
- this.hoodieTable = getTable();
+ this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
preLoadIndexRecords();
}
@@ -146,13 +143,6 @@ public class BootstrapOperator<I, O extends HoodieRecord>
output.collect((StreamRecord<O>) element);
}
- private HoodieFlinkTable getTable() {
- HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
- new SerializableConfiguration(this.hadoopConf),
- new FlinkTaskContextSupplier(getRuntimeContext()));
- return HoodieFlinkTable.create(this.writeConfig, context);
- }
-
/**
* Loads all the indices of give partition path into the backup state.
*
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 945d428..9c0549a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -19,7 +19,6 @@
package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -27,7 +26,7 @@ import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
-import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.util.FlinkTables;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
@@ -55,13 +54,9 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
private final Configuration conf;
/**
- * Write Client.
- */
- private transient HoodieFlinkWriteClient writeClient;
-
- /**
* Meta Client.
*/
+ @SuppressWarnings("rawtypes")
private transient HoodieFlinkTable table;
public CompactionPlanOperator(Configuration conf) {
@@ -71,8 +66,11 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
@Override
public void open() throws Exception {
super.open();
- this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
- this.table = writeClient.getHoodieTable();
+ this.table = FlinkTables.createTable(conf, getRuntimeContext());
+ // when starting up, rolls back the first inflight compaction instant if there exists,
+ // the instant is the next one to schedule for scheduling task because the compaction instants are
+ // scheduled from earliest(FIFO sequence).
+ CompactionUtil.rollbackEarliestCompaction(this.table);
}
@Override
@@ -84,6 +82,11 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
public void notifyCheckpointComplete(long checkpointId) {
try {
table.getMetaClient().reloadActiveTimeline();
+ // There is no good way to infer when the compaction task for an instant crushed
+ // or is still undergoing. So we use a configured timeout threshold to control the rollback:
+ // {@code FlinkOptions.COMPACTION_TIMEOUT_SECONDS},
+ // when the threshold hits, but an instant is still in pending(inflight) state, assumes it has failed
+ // already and just roll it back.
CompactionUtil.rollbackCompaction(table, conf);
scheduleCompaction(table, checkpointId);
} catch (Throwable throwable) {
@@ -94,15 +97,15 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
// the last instant takes the highest priority.
- Option<HoodieInstant> lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
- .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant();
- if (!lastRequested.isPresent()) {
+ Option<HoodieInstant> firstRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
+ .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
+ if (!firstRequested.isPresent()) {
// do nothing.
LOG.info("No compaction plan for checkpoint " + checkpointId);
return;
}
- String compactionInstantTime = lastRequested.get().getTimestamp();
+ String compactionInstantTime = firstRequested.get().getTimestamp();
// generate compaction plan
// should support configurable commit metadata
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 89ffef3..e064a05 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -112,20 +112,35 @@ public class CompactionUtil {
public static void rollbackCompaction(HoodieFlinkTable<?> table, Configuration conf) {
String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
- int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
+ int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS);
HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
.filterPendingCompactionTimeline()
.filter(instant ->
instant.getState() == HoodieInstant.State.INFLIGHT
&& StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
- LOG.info("Rollback the pending compaction instant: " + inflightInstant);
+ LOG.info("Rollback the inflight compaction instant: " + inflightInstant + " for timeout(" + deltaSeconds + "s)");
table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline();
});
}
/**
+ * Rolls back the earliest compaction if there exists.
+ */
+ public static void rollbackEarliestCompaction(HoodieFlinkTable<?> table) {
+ Option<HoodieInstant> earliestInflight = table.getActiveTimeline()
+ .filterPendingCompactionTimeline()
+ .filter(instant ->
+ instant.getState() == HoodieInstant.State.INFLIGHT).firstInstant();
+ if (earliestInflight.isPresent()) {
+ LOG.info("Rollback the inflight compaction instant: " + earliestInflight.get() + " for failover");
+ table.rollbackInflightCompaction(earliestInflight.get());
+ table.getMetaClient().reloadActiveTimeline();
+ }
+ }
+
+ /**
* Returns whether the execution sequence is LIFO.
*/
public static boolean isLIFO(String seq) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java
new file mode 100644
index 0000000..6918a06
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+import static org.apache.hudi.util.StreamerUtil.getHadoopConf;
+import static org.apache.hudi.util.StreamerUtil.getHoodieClientConfig;
+
+/**
+ * Utilities for {@link org.apache.hudi.table.HoodieFlinkTable}.
+ */
+public class FlinkTables {
+ private FlinkTables() {
+ }
+
+ /**
+ * Creates the hoodie flink table.
+ *
+ * <p>This expects to be used by client.
+ */
+ public static HoodieFlinkTable<?> createTable(Configuration conf, RuntimeContext runtimeContext) {
+ HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
+ new SerializableConfiguration(getHadoopConf()),
+ new FlinkTaskContextSupplier(runtimeContext));
+ HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
+ return HoodieFlinkTable.create(writeConfig, context);
+ }
+
+ /**
+ * Creates the hoodie flink table.
+ *
+ * <p>This expects to be used by client.
+ */
+ public static HoodieFlinkTable<?> createTable(
+ HoodieWriteConfig writeConfig,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ RuntimeContext runtimeContext) {
+ HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
+ new SerializableConfiguration(hadoopConf),
+ new FlinkTaskContextSupplier(runtimeContext));
+ return HoodieFlinkTable.create(writeConfig, context);
+ }
+
+ /**
+ * Creates the hoodie flink table.
+ *
+ * <p>This expects to be used by driver.
+ */
+ public static HoodieFlinkTable<?> createTable(Configuration conf) {
+ HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf, true, false);
+ return HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT);
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 473a33e..8b93707 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -30,38 +29,90 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.FlinkTables;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for {@link org.apache.hudi.util.CompactionUtil}.
*/
public class TestCompactionUtil {
+ private HoodieFlinkTable<?> table;
+ private HoodieTableMetaClient metaClient;
+ private Configuration conf;
+
@TempDir
File tempFile;
+ @BeforeEach
+ void beforeEach() throws IOException {
+ this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ StreamerUtil.initTableIfNotExists(conf);
+ this.table = FlinkTables.createTable(conf);
+ this.metaClient = table.getMetaClient();
+ }
+
@Test
- void rollbackCompaction() throws IOException {
- Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
- conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, 0);
+ void rollbackCompaction() {
+ conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0);
+ List<String> oriInstants = IntStream.range(0, 3)
+ .mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
+ List<HoodieInstant> instants = metaClient.getActiveTimeline()
+ .filterPendingCompactionTimeline()
+ .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT)
+ .getInstants()
+ .collect(Collectors.toList());
+ assertThat("all the instants should be in pending state", instants.size(), is(3));
+ CompactionUtil.rollbackCompaction(table, conf);
+ boolean allRolledBack = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
+ .allMatch(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
+ assertTrue(allRolledBack, "all the instants should be rolled back");
+ List<String> actualInstants = metaClient.getActiveTimeline()
+ .filterPendingCompactionTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ assertThat(actualInstants, is(oriInstants));
+ }
- StreamerUtil.initTableIfNotExists(conf);
+ @Test
+ void rollbackEarliestCompaction() {
+ List<String> oriInstants = IntStream.range(0, 3)
+ .mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
+ List<HoodieInstant> instants = metaClient.getActiveTimeline()
+ .filterPendingCompactionTimeline()
+ .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT)
+ .getInstants()
+ .collect(Collectors.toList());
+ assertThat("all the instants should be in pending state", instants.size(), is(3));
+ CompactionUtil.rollbackEarliestCompaction(table);
+ long requestedCnt = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
+ .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).count();
+ assertThat("Only the first instant expects to be rolled back", requestedCnt, is(1L));
- HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
- HoodieFlinkTable table = writeClient.getHoodieTable();
- HoodieTableMetaClient metaClient = table.getMetaClient();
+ String instantTime = metaClient.getActiveTimeline()
+ .filterPendingCompactionTimeline().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED)
+ .firstInstant().get().getTimestamp();
+ assertThat(instantTime, is(oriInstants.get(0)));
+ }
+ /**
+ * Generates a compaction plan on the timeline and returns its instant time.
+ */
+ private String generateCompactionPlan() {
HoodieCompactionOperation operation = new HoodieCompactionOperation();
HoodieCompactionPlan plan = new HoodieCompactionPlan(Collections.singletonList(operation), Collections.emptyMap(), 1);
String instantTime = HoodieActiveTimeline.createNewInstantTime();
@@ -75,13 +126,7 @@ public class TestCompactionUtil {
throw new HoodieIOException("Exception scheduling compaction", ioe);
}
metaClient.reloadActiveTimeline();
- HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().orElse(null);
- assertThat(instant.getTimestamp(), is(instantTime));
-
- CompactionUtil.rollbackCompaction(table, conf);
- HoodieInstant rollbackInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
- assertThat(rollbackInstant.getState(), is(HoodieInstant.State.REQUESTED));
- assertThat(rollbackInstant.getTimestamp(), is(instantTime));
+ return instantTime;
}
}