You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/02/14 02:38:28 UTC

[hudi] branch master updated: [HUDI-2413] fix Sql source's checkpoint issue (#3648)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 55777fe  [HUDI-2413] fix Sql source's checkpoint issue (#3648)
55777fe is described below

commit 55777fec0519c1aa26a4bd4415d545b658dfec29
Author: 冯健 <fe...@gmail.com>
AuthorDate: Mon Feb 14 10:37:48 2022 +0800

    [HUDI-2413] fix Sql source's checkpoint issue (#3648)
    
    * [HUDI-2413] fix Sql source's checkpoint
    
    * Fixing sql source checkpoint handling
    
    * Fixing docs
    
    Co-authored-by: jian.feng <fe...@gmial.com>
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  6 ++-
 .../deltastreamer/HoodieDeltaStreamer.java         |  4 ++
 .../apache/hudi/utilities/sources/SqlSource.java   |  2 +
 .../functional/HoodieDeltaStreamerTestBase.java    |  2 +
 .../functional/TestHoodieDeltaStreamer.java        | 45 ++++++++++++++++++++++
 .../hudi/utilities/sources/TestSqlSource.java      | 18 +++++++++
 6 files changed, 75 insertions(+), 2 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 91f09da..c376243 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -460,7 +460,7 @@ public class DeltaSync implements Serializable {
       schemaProvider = dataAndCheckpoint.getSchemaProvider();
     }
 
-    if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
+    if (!cfg.allowCommitOnNoCheckpointChange && Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
       LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=("
           + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
@@ -596,7 +596,9 @@ public class DeltaSync implements Serializable {
     long metaSyncTimeMs = 0;
     if (!hasErrors || cfg.commitOnErrors) {
       HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
-      checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
+      if (checkpointStr != null) {
+        checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
+      }
       if (cfg.checkpoint != null) {
         checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint);
       }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 8edd45d..65cf2c3 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -378,6 +378,10 @@ public class HoodieDeltaStreamer implements Serializable {
     @Parameter(names = {"--max-retry-count"}, description = "the max retry count if --retry-on-source-failures is enabled")
     public Integer maxRetryCount = 3;
 
+    @Parameter(names = {"--allow-commit-on-no-checkpoint-change"}, description = "allow commits even if checkpoint has not changed before and after fetch data"
+        + "from souce. This might be useful in sources like SqlSource where there is not checkpoint. And is not recommended to enable in continuous mode.")
+    public Boolean allowCommitOnNoCheckpointChange = false;
+
     @Parameter(names = {"--help", "-h"}, help = true)
     public Boolean help = false;
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java
index d832e43..056e035 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java
@@ -48,6 +48,8 @@ import java.util.Collections;
  * <p>To fetch and use the latest incremental checkpoint, you need to also set this hoodie_conf for deltastremer jobs:
  *
  * <p>hoodie.write.meta.key.prefixes = 'deltastreamer.checkpoint.key'
+ *
+ * Also, users are expected to set --allow-commit-on-no-checkpoint-change while using this SqlSource.
  */
 public class SqlSource extends RowSource {
   private static final long serialVersionUID = 1L;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
index b24faf7..9b7ee3b 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
@@ -61,6 +61,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
   static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
   static final String PROPS_FILENAME_TEST_ORC = "test-orc-dfs-source.properties";
   static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
+  static final String PROPS_FILENAME_TEST_SQL_SOURCE = "test-sql-source-source.properties";
   static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties";
   static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
   static final String FIRST_ORC_FILE_NAME = "1.orc";
@@ -71,6 +72,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
   static final int ORC_NUM_RECORDS = 5;
   static final int CSV_NUM_RECORDS = 3;
   static final int JSON_KAFKA_NUM_RECORDS = 5;
+  static final int SQL_SOURCE_NUM_RECORDS = 1000;
   String kafkaCheckpointType = "string";
   // Required fields
   static final String TGT_BASE_PATH_PARAM = "--target-base-path";
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 7fabcae..afd888e 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -64,6 +64,7 @@ import org.apache.hudi.utilities.sources.JdbcSource;
 import org.apache.hudi.utilities.sources.JsonKafkaSource;
 import org.apache.hudi.utilities.sources.ORCDFSSource;
 import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.sources.SqlSource;
 import org.apache.hudi.utilities.sources.TestDataSource;
 import org.apache.hudi.utilities.testutils.JdbcTestUtils;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -208,6 +209,14 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
                                                  List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
                                                  int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField,
                                                  String checkpoint) {
+      return makeConfig(basePath, op, sourceClassName, transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, sourceLimit, updatePayloadClass, payloadClassName,
+          tableType, sourceOrderingField, checkpoint, false);
+    }
+
+    static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName,
+                                                 List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
+                                                 int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField,
+                                                 String checkpoint, boolean allowCommitOnNoCheckpointChange) {
       HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
       cfg.targetBasePath = basePath;
       cfg.targetTableName = "hoodie_trips";
@@ -226,6 +235,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
       if (useSchemaProviderClass) {
         cfg.schemaProviderClassName = defaultSchemaProviderClassName;
       }
+      cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
       return cfg;
     }
 
@@ -1701,6 +1711,41 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
+  private void prepareSqlSource() throws IOException {
+    String sourceRoot = dfsBasePath + "sqlSourceFiles";
+    TypedProperties sqlSourceProps = new TypedProperties();
+    sqlSourceProps.setProperty("include", "base.properties");
+    sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false");
+    sqlSourceProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+    sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+    sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query","select * from test_sql_table");
+
+    UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SQL_SOURCE);
+
+    // Data generation
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    generateSqlSourceTestTable(sourceRoot, "1", "1000", SQL_SOURCE_NUM_RECORDS, dataGenerator);
+  }
+
+  private void generateSqlSourceTestTable(String dfsRoot, String filename, String instantTime, int n, HoodieTestDataGenerator dataGenerator) throws IOException {
+    Path path = new Path(dfsRoot, filename);
+    Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts(instantTime, n, false)), path);
+    sparkSession.read().parquet(dfsRoot).createOrReplaceTempView("test_sql_table");
+  }
+
+  @Test
+  public void testSqlSourceSource() throws Exception {
+    prepareSqlSource();
+    String tableBasePath = dfsBasePath + "/test_sql_source_table" + testNum++;
+    HoodieDeltaStreamer deltaStreamer =
+        new HoodieDeltaStreamer(TestHelpers.makeConfig(
+            tableBasePath, WriteOperationType.INSERT, SqlSource.class.getName(),
+            Collections.emptyList(), PROPS_FILENAME_TEST_SQL_SOURCE, false,
+            false, 1000, false, null, null, "timestamp", null, true), jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
+  }
+
   @Test
   public void testJdbcSourceIncrementalFetchInContinuousMode() {
     try (Connection connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc")) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
index 9c3d558..e4ca518 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
@@ -137,6 +138,23 @@ public class TestSqlSource extends UtilitiesTestBase {
 
   /**
    * Runs the test scenario of reading data from the source in row format.
+   * Source has no records.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testSqlSourceCheckpoint() throws IOException {
+    props.setProperty(sqlSourceConfig, "select * from test_sql_table where 1=0");
+    sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+    sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+    InputBatch<Dataset<Row>> fetch1AsRows =
+            sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
+    assertNull(fetch1AsRows.getCheckpointForNextBatch());
+  }
+
+  /**
+   * Runs the test scenario of reading data from the source in row format.
    * Source has more records than source limit.
    *
    * @throws IOException