You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/09/01 08:48:03 UTC

[hudi] branch master updated: [HUDI-2379] Include the pending compaction file groups for flink (#3567)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f66e1ce  [HUDI-2379] Include the pending compaction file groups for flink (#3567)
f66e1ce is described below

commit f66e1ce9bf56fb4478d8b14d791644cc817924ca
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Sep 1 16:47:52 2021 +0800

    [HUDI-2379] Include the pending compaction file groups for flink (#3567)
    
    streaming reader
---
 .../hudi/source/StreamReadMonitoringFunction.java  | 61 +++++++++++++++++++---
 .../org/apache/hudi/table/HoodieTableSource.java   |  2 +-
 .../apache/hudi/table/HoodieDataSourceITCase.java  | 36 ++++++++++++-
 .../test/java/org/apache/hudi/utils/TestData.java  |  2 +-
 4 files changed, 90 insertions(+), 11 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index c7dcc0a..ae6b3e1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -52,11 +53,13 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
@@ -206,7 +209,7 @@ public class StreamReadMonitoringFunction
       return;
     }
     metaClient.reloadActiveTimeline();
-    HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
+    HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
     if (commitTimeline.empty()) {
       LOG.warn("No splits found for the table under path " + path);
       return;
@@ -228,8 +231,7 @@ public class StreamReadMonitoringFunction
             : InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
       } else {
         // first time consume and no start commit, consumes the latest incremental data set.
-        HoodieInstant latestCommitInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
-        instantRange = InstantRange.getInstance(latestCommitInstant.getTimestamp(), instantToIssue.getTimestamp(),
+        instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(),
             InstantRange.RangeType.CLOSE_CLOSE);
       }
     } else {
@@ -243,8 +245,13 @@ public class StreamReadMonitoringFunction
     // 4. use the file paths from #step 3 as the back-up of the filesystem view
 
     String tableName = conf.getString(FlinkOptions.TABLE_NAME);
-    List<HoodieCommitMetadata> metadataList = instants.stream()
+    List<HoodieCommitMetadata> activeMetadataList = instants.stream()
         .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
+    List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(instantRange, commitTimeline, tableName);
+    List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
+        ? mergeList(activeMetadataList, archivedMetadataList)
+        : activeMetadataList;
+
     Set<String> writePartitions = getWritePartitionPaths(metadataList);
     // apply partition push down
     if (this.requiredPartitionPaths.size() > 0) {
@@ -326,6 +333,38 @@ public class StreamReadMonitoringFunction
   }
 
   /**
+   * Returns the archived metadata in case the reader consumes untimely or it wants
+   * to read from the earliest.
+   *
+   * <p>Note: should improve it with metadata table when the metadata table is stable enough.
+   *
+   * @param instantRange   The instant range to filter the timeline instants
+   * @param commitTimeline The commit timeline
+   * @param tableName      The table name
+   * @return the list of archived metadata, or empty if there is no need to read the archived timeline
+   */
+  private List<HoodieCommitMetadata> getArchivedMetadata(
+      InstantRange instantRange,
+      HoodieTimeline commitTimeline,
+      String tableName) {
+    if (instantRange == null || commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
+      // read the archived metadata if:
+      // 1. the start commit is 'earliest';
+      // 2. the start instant is archived.
+      HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
+      if (!metaClient.getArchivedTimeline().empty()) {
+        Stream<HoodieInstant> instantStream = archivedTimeline.getCommitsTimeline().filterCompletedInstants().getInstants();
+        if (instantRange != null) {
+          instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant()));
+        }
+        return instantStream
+            .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());
+      }
+    }
+    return Collections.emptyList();
+  }
+
+  /**
    * Returns the instants with a given issuedInstant to start from.
    *
    * @param commitTimeline The completed commits timeline
@@ -335,19 +374,19 @@ public class StreamReadMonitoringFunction
   private List<HoodieInstant> filterInstantsWithStart(
       HoodieTimeline commitTimeline,
       final String issuedInstant) {
+    HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
     if (issuedInstant != null) {
-      return commitTimeline.getInstants()
+      return completedTimeline.getInstants()
           .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
           .collect(Collectors.toList());
     } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()
         && !this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) {
       String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
-      return commitTimeline.getInstants()
+      return completedTimeline.getInstants()
           .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit))
           .collect(Collectors.toList());
     } else {
-      return commitTimeline.getInstants()
-          .collect(Collectors.toList());
+      return completedTimeline.getInstants().collect(Collectors.toList());
     }
   }
 
@@ -363,4 +402,10 @@ public class StreamReadMonitoringFunction
         .flatMap(Collection::stream)
         .collect(Collectors.toSet());
   }
+
+  private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
+    List<T> merged = new ArrayList<>(list1);
+    merged.addAll(list2);
+    return merged;
+  }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index fc42394..52cb765 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -458,7 +458,7 @@ public class HoodieTableSource implements
   }
 
   private Schema inferSchemaFromDdl() {
-    Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toSourceRowDataType().getLogicalType());
+    Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toPhysicalRowDataType().getLogicalType());
     return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
   }
 
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 26e0be6..f8fc42a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -319,7 +319,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
-        .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+        .option(FlinkOptions.TABLE_TYPE, tableType.name())
         .option(FlinkOptions.READ_AS_STREAMING, "true")
         .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2")
         .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
@@ -334,6 +334,40 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     assertRowsEquals(result, expected, true);
   }
 
+  @Test
+  void testStreamReadMorTableWithCompactionPlan() throws Exception {
+    String createSource = TestConfigurations.getFileSourceDDL("source");
+    streamTableEnv.executeSql(createSource);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+        .option(FlinkOptions.READ_AS_STREAMING, "true")
+        .option(FlinkOptions.READ_STREAMING_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST)
+        .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2")
+        // close the async compaction
+        .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false)
+        // generate compaction plan for each commit
+        .option(FlinkOptions.COMPACTION_DELTA_COMMITS, "1")
+        .withPartition(false)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    streamTableEnv.executeSql("insert into t1 select * from source");
+
+    List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 10);
+    final String expected = "["
+        + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], "
+        + "+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1], "
+        + "+I[id3, Julian, 53, 1970-01-01T00:00:03, par2], "
+        + "+I[id4, Fabian, 31, 1970-01-01T00:00:04, par2], "
+        + "+I[id5, Sophia, 18, 1970-01-01T00:00:05, par3], "
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]";
+    assertRowsEquals(result, expected);
+  }
+
   @ParameterizedTest
   @MethodSource("executionModeAndPartitioningParams")
   void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index b6b738a..3e0afc2 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -400,7 +400,7 @@ public class TestData {
     String rowsString = rows.stream()
         .sorted(Comparator.comparing(o -> toIdSafely(o.getField(0))))
         .collect(Collectors.toList()).toString();
-    assertThat(rowDataToString(expected), is(rowsString));
+    assertThat(rowsString, is(rowDataToString(expected)));
   }
 
   /**