You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/05/24 12:17:20 UTC
[hudi] branch master updated: [HUDI-4132] Fixing determining target table schema for delta sync with empty batch (#5648)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 10363c1412 [HUDI-4132] Fixing determining target table schema for delta sync with empty batch (#5648)
10363c1412 is described below
commit 10363c1412b8f9b5b16b3e2e075b895e0cc9a293
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Tue May 24 08:17:15 2022 -0400
[HUDI-4132] Fixing determining target table schema for delta sync with empty batch (#5648)
---
.../org/apache/hudi/utilities/deltastreamer/DeltaSync.java | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index a1a804b9ed..a4a7e10abc 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -840,8 +840,15 @@ public class DeltaSync implements Serializable {
&& SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA, targetSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) {
// target schema is null. fetch schema from commit metadata and use it
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build();
- TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
- newWriteSchema = schemaResolver.getTableAvroSchema(false);
+ int totalCompleted = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
+ if (totalCompleted > 0) {
+ try {
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
+ newWriteSchema = schemaResolver.getTableAvroSchema(false);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Could not fetch schema from table. Falling back to using target schema from schema provider");
+ }
+ }
}
}
return newWriteSchema;