You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/03 07:15:50 UTC

[hudi] branch master updated: [MINOR] Filter out empty GCS objects in events table. (#7592)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 870b05e1fcd [MINOR] Filter out empty GCS objects in events table. (#7592)
870b05e1fcd is described below

commit 870b05e1fcd11685d97560dd6707479d059a44aa
Author: Jainendra Tarun <ja...@gmail.com>
AuthorDate: Tue Jan 3 12:45:38 2023 +0530

    [MINOR] Filter out empty GCS objects in events table. (#7592)
    
    Co-authored-by: jainendra tarun <ja...@onehouse.ai>
---
 .../sources/helpers/gcs/MetadataMessage.java       | 30 ++++++++++++++++++++++
 1 file changed, 30 insertions(+)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMessage.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMessage.java
index e42ed7fe6d2..01b32fbc39a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMessage.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMessage.java
@@ -18,7 +18,14 @@
 
 package org.apache.hudi.utilities.sources.helpers.gcs;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.pubsub.v1.PubsubMessage;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.ProcessingDecision.DO_SKIP;
 
@@ -29,6 +36,8 @@ import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.Proc
  */
 public class MetadataMessage {
 
+  private static final Logger LOG = LogManager.getLogger(MetadataMessage.class);
+
   // The CSPN message to wrap
   private final PubsubMessage message;
 
@@ -37,6 +46,7 @@ public class MetadataMessage {
   private static final String ATTR_EVENT_TYPE = "eventType";
   private static final String ATTR_OBJECT_ID = "objectId";
   private static final String ATTR_OVERWROTE_GENERATION = "overwroteGeneration";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   public MetadataMessage(PubsubMessage message) {
     this.message = message;
@@ -63,6 +73,14 @@ public class MetadataMessage {
       );
     }
 
+    try {
+      if (isEmptyFile()) {
+        return new MessageValidity(DO_SKIP, "Object " + getObjectId() + " is empty.");
+      }
+    } catch (IOException e) {
+      LOG.error("Exception while extracting the size for object " + getObjectId(), e);
+    }
+
     return MessageValidity.DEFAULT_VALID_MESSAGE;
   }
 
@@ -82,6 +100,12 @@ public class MetadataMessage {
     return EVENT_NAME_OBJECT_FINALIZE.equals(getEventType());
   }
 
+  private boolean isEmptyFile() throws IOException {
+    String sizeValue = getDataField("size");
+    long size = Long.parseLong(sizeValue);
+    return size <= 0;
+  }
+
   public String getEventType() {
     return getAttr(ATTR_EVENT_TYPE);
   }
@@ -98,4 +122,10 @@ public class MetadataMessage {
     return message.getAttributesMap().get(attrName);
   }
 
+  private String getDataField(String fieldName) throws IOException {
+    JsonNode root = OBJECT_MAPPER.readValue(toStringUtf8(), JsonNode.class);
+    JsonNode fieldNode = root.get(fieldName);
+    return fieldNode.asText();
+  }
+
 }