You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/05/24 12:17:20 UTC

[hudi] branch master updated: [HUDI-4132] Fixing determining target table schema for delta sync with empty batch (#5648)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 10363c1412 [HUDI-4132] Fixing determining target table schema for delta sync with empty batch (#5648)
10363c1412 is described below

commit 10363c1412b8f9b5b16b3e2e075b895e0cc9a293
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Tue May 24 08:17:15 2022 -0400

    [HUDI-4132] Fixing determining target table schema for delta sync with empty batch (#5648)
---
 .../org/apache/hudi/utilities/deltastreamer/DeltaSync.java    | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index a1a804b9ed..a4a7e10abc 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -840,8 +840,15 @@ public class DeltaSync implements Serializable {
             && SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA, targetSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) {
           // target schema is null. fetch schema from commit metadata and use it
           HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build();
-          TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
-          newWriteSchema = schemaResolver.getTableAvroSchema(false);
+          int totalCompleted = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
+          if (totalCompleted > 0) {
+            try {
+              TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
+              newWriteSchema = schemaResolver.getTableAvroSchema(false);
+            } catch (IllegalArgumentException e) {
+              LOG.warn("Could not fetch schema from table. Falling back to using target schema from schema provider");
+            }
+          }
         }
       }
       return newWriteSchema;