You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/10/30 00:37:43 UTC

[hudi] branch master updated: [HUDI-2654] Schedules the compaction from earliest for flink (#3891)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 92a3c45  [HUDI-2654] Schedules the compaction from earliest for flink (#3891)
92a3c45 is described below

commit 92a3c458bde7ca4d2bb72f4dbe486073f6a5ec4f
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sat Oct 30 08:37:30 2021 +0800

    [HUDI-2654] Schedules the compaction from earliest for flink (#3891)
---
 .../apache/hudi/configuration/FlinkOptions.java    |  6 ++
 .../hudi/sink/bootstrap/BootstrapOperator.java     | 14 +---
 .../hudi/sink/compact/CompactionPlanOperator.java  | 29 ++++----
 .../java/org/apache/hudi/util/CompactionUtil.java  | 19 +++++-
 .../java/org/apache/hudi/util/FlinkTables.java     | 77 ++++++++++++++++++++++
 .../org/apache/hudi/utils/TestCompactionUtil.java  | 75 ++++++++++++++++-----
 6 files changed, 178 insertions(+), 42 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 6f8c1ff..65a95ed 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -492,6 +492,12 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(3600) // default 1 hour
       .withDescription("Max delta seconds time needed to trigger compaction, default 1 hour");
 
+  public static final ConfigOption<Integer> COMPACTION_TIMEOUT_SECONDS = ConfigOptions
+      .key("compaction.timeout.seconds")
+      .intType()
+      .defaultValue(1200) // default 20 minutes
+      .withDescription("Max timeout time in seconds for online compaction to rollback, default 20 minutes");
+
   public static final ConfigOption<Integer> COMPACTION_MAX_MEMORY = ConfigOptions
       .key("compaction.max_memory")
       .intType()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 81ab836..76e6695 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -18,9 +18,6 @@
 
 package org.apache.hudi.sink.bootstrap;
 
-import org.apache.hudi.client.FlinkTaskContextSupplier;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieKey;
@@ -37,9 +34,9 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.util.FlinkTables;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.avro.Schema;
@@ -119,7 +116,7 @@ public class BootstrapOperator<I, O extends HoodieRecord>
 
     this.hadoopConf = StreamerUtil.getHadoopConf();
     this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
-    this.hoodieTable = getTable();
+    this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
 
     preLoadIndexRecords();
   }
@@ -146,13 +143,6 @@ public class BootstrapOperator<I, O extends HoodieRecord>
     output.collect((StreamRecord<O>) element);
   }
 
-  private HoodieFlinkTable getTable() {
-    HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
-        new SerializableConfiguration(this.hadoopConf),
-        new FlinkTaskContextSupplier(getRuntimeContext()));
-    return HoodieFlinkTable.create(this.writeConfig, context);
-  }
-
   /**
    * Loads all the indices of give partition path into the backup state.
    *
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 945d428..9c0549a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -27,7 +26,7 @@ import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.CompactionUtil;
-import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.util.FlinkTables;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
@@ -55,13 +54,9 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
   private final Configuration conf;
 
   /**
-   * Write Client.
-   */
-  private transient HoodieFlinkWriteClient writeClient;
-
-  /**
    * Meta Client.
    */
+  @SuppressWarnings("rawtypes")
   private transient HoodieFlinkTable table;
 
   public CompactionPlanOperator(Configuration conf) {
@@ -71,8 +66,11 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
   @Override
   public void open() throws Exception {
     super.open();
-    this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
-    this.table = writeClient.getHoodieTable();
+    this.table = FlinkTables.createTable(conf, getRuntimeContext());
+    // when starting up, rolls back the first inflight compaction instant if there exists,
+    // the instant is the next one to schedule for scheduling task because the compaction instants are
+    // scheduled from earliest(FIFO sequence).
+    CompactionUtil.rollbackEarliestCompaction(this.table);
   }
 
   @Override
@@ -84,6 +82,11 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
   public void notifyCheckpointComplete(long checkpointId) {
     try {
       table.getMetaClient().reloadActiveTimeline();
+      // There is no good way to infer when the compaction task for an instant crushed
+      // or is still undergoing. So we use a configured timeout threshold to control the rollback:
+      // {@code FlinkOptions.COMPACTION_TIMEOUT_SECONDS},
+      // when the threshold hits, but an instant is still in pending(inflight) state, assumes it has failed
+      // already and just roll it back.
       CompactionUtil.rollbackCompaction(table, conf);
       scheduleCompaction(table, checkpointId);
     } catch (Throwable throwable) {
@@ -94,15 +97,15 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
 
   private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
     // the last instant takes the highest priority.
-    Option<HoodieInstant> lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
-        .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant();
-    if (!lastRequested.isPresent()) {
+    Option<HoodieInstant> firstRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
+        .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
+    if (!firstRequested.isPresent()) {
       // do nothing.
       LOG.info("No compaction plan for checkpoint " + checkpointId);
       return;
     }
 
-    String compactionInstantTime = lastRequested.get().getTimestamp();
+    String compactionInstantTime = firstRequested.get().getTimestamp();
 
     // generate compaction plan
     // should support configurable commit metadata
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 89ffef3..e064a05 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -112,20 +112,35 @@ public class CompactionUtil {
 
   public static void rollbackCompaction(HoodieFlinkTable<?> table, Configuration conf) {
     String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
-    int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
+    int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS);
     HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
         .filterPendingCompactionTimeline()
         .filter(instant ->
             instant.getState() == HoodieInstant.State.INFLIGHT
                 && StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
     inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
-      LOG.info("Rollback the pending compaction instant: " + inflightInstant);
+      LOG.info("Rollback the inflight compaction instant: " + inflightInstant + " for timeout(" + deltaSeconds + "s)");
       table.rollbackInflightCompaction(inflightInstant);
       table.getMetaClient().reloadActiveTimeline();
     });
   }
 
   /**
+   * Rolls back the earliest compaction if there exists.
+   */
+  public static void rollbackEarliestCompaction(HoodieFlinkTable<?> table) {
+    Option<HoodieInstant> earliestInflight = table.getActiveTimeline()
+        .filterPendingCompactionTimeline()
+        .filter(instant ->
+            instant.getState() == HoodieInstant.State.INFLIGHT).firstInstant();
+    if (earliestInflight.isPresent()) {
+      LOG.info("Rollback the inflight compaction instant: " + earliestInflight.get() + " for failover");
+      table.rollbackInflightCompaction(earliestInflight.get());
+      table.getMetaClient().reloadActiveTimeline();
+    }
+  }
+
+  /**
    * Returns whether the execution sequence is LIFO.
    */
   public static boolean isLIFO(String seq) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java
new file mode 100644
index 0000000..6918a06
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+import static org.apache.hudi.util.StreamerUtil.getHadoopConf;
+import static org.apache.hudi.util.StreamerUtil.getHoodieClientConfig;
+
+/**
+ * Utilities for {@link org.apache.hudi.table.HoodieFlinkTable}.
+ */
+public class FlinkTables {
+  private FlinkTables() {
+  }
+
+  /**
+   * Creates the hoodie flink table.
+   *
+   * <p>This expects to be used by client.
+   */
+  public static HoodieFlinkTable<?> createTable(Configuration conf, RuntimeContext runtimeContext) {
+    HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(getHadoopConf()),
+        new FlinkTaskContextSupplier(runtimeContext));
+    HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
+    return HoodieFlinkTable.create(writeConfig, context);
+  }
+
+  /**
+   * Creates the hoodie flink table.
+   *
+   * <p>This expects to be used by client.
+   */
+  public static HoodieFlinkTable<?> createTable(
+      HoodieWriteConfig writeConfig,
+      org.apache.hadoop.conf.Configuration hadoopConf,
+      RuntimeContext runtimeContext) {
+    HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(hadoopConf),
+        new FlinkTaskContextSupplier(runtimeContext));
+    return HoodieFlinkTable.create(writeConfig, context);
+  }
+
+  /**
+   * Creates the hoodie flink table.
+   *
+   * <p>This expects to be used by driver.
+   */
+  public static HoodieFlinkTable<?> createTable(Configuration conf) {
+    HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf, true, false);
+    return HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT);
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 473a33e..8b93707 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utils;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -30,38 +29,90 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.FlinkTables;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for {@link org.apache.hudi.util.CompactionUtil}.
  */
 public class TestCompactionUtil {
 
+  private HoodieFlinkTable<?> table;
+  private HoodieTableMetaClient metaClient;
+  private Configuration conf;
+
   @TempDir
   File tempFile;
 
+  @BeforeEach
+  void beforeEach() throws IOException {
+    this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    StreamerUtil.initTableIfNotExists(conf);
+    this.table = FlinkTables.createTable(conf);
+    this.metaClient = table.getMetaClient();
+  }
+
   @Test
-  void rollbackCompaction() throws IOException {
-    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
-    conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, 0);
+  void rollbackCompaction() {
+    conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0);
+    List<String> oriInstants = IntStream.range(0, 3)
+        .mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
+    List<HoodieInstant> instants = metaClient.getActiveTimeline()
+        .filterPendingCompactionTimeline()
+        .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT)
+        .getInstants()
+        .collect(Collectors.toList());
+    assertThat("all the instants should be in pending state", instants.size(), is(3));
+    CompactionUtil.rollbackCompaction(table, conf);
+    boolean allRolledBack = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
+        .allMatch(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
+    assertTrue(allRolledBack, "all the instants should be rolled back");
+    List<String> actualInstants = metaClient.getActiveTimeline()
+        .filterPendingCompactionTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+    assertThat(actualInstants, is(oriInstants));
+  }
 
-    StreamerUtil.initTableIfNotExists(conf);
+  @Test
+  void rollbackEarliestCompaction() {
+    List<String> oriInstants = IntStream.range(0, 3)
+        .mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
+    List<HoodieInstant> instants = metaClient.getActiveTimeline()
+        .filterPendingCompactionTimeline()
+        .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT)
+        .getInstants()
+        .collect(Collectors.toList());
+    assertThat("all the instants should be in pending state", instants.size(), is(3));
+    CompactionUtil.rollbackEarliestCompaction(table);
+    long requestedCnt = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
+        .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).count();
+    assertThat("Only the first instant expects to be rolled back", requestedCnt, is(1L));
 
-    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
-    HoodieFlinkTable table = writeClient.getHoodieTable();
-    HoodieTableMetaClient metaClient = table.getMetaClient();
+    String instantTime = metaClient.getActiveTimeline()
+        .filterPendingCompactionTimeline().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED)
+        .firstInstant().get().getTimestamp();
+    assertThat(instantTime, is(oriInstants.get(0)));
+  }
 
+  /**
+   * Generates a compaction plan on the timeline and returns its instant time.
+   */
+  private String generateCompactionPlan() {
     HoodieCompactionOperation operation = new HoodieCompactionOperation();
     HoodieCompactionPlan plan = new HoodieCompactionPlan(Collections.singletonList(operation), Collections.emptyMap(), 1);
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
@@ -75,13 +126,7 @@ public class TestCompactionUtil {
       throw new HoodieIOException("Exception scheduling compaction", ioe);
     }
     metaClient.reloadActiveTimeline();
-    HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().orElse(null);
-    assertThat(instant.getTimestamp(), is(instantTime));
-
-    CompactionUtil.rollbackCompaction(table, conf);
-    HoodieInstant rollbackInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
-    assertThat(rollbackInstant.getState(), is(HoodieInstant.State.REQUESTED));
-    assertThat(rollbackInstant.getTimestamp(), is(instantTime));
+    return instantTime;
   }
 }