You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/03 07:15:50 UTC
[hudi] branch master updated: [MINOR] Filter out empty GCS objects in events table. (#7592)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 870b05e1fcd [MINOR] Filter out empty GCS objects in events table. (#7592)
870b05e1fcd is described below
commit 870b05e1fcd11685d97560dd6707479d059a44aa
Author: Jainendra Tarun <ja...@gmail.com>
AuthorDate: Tue Jan 3 12:45:38 2023 +0530
[MINOR] Filter out empty GCS objects in events table. (#7592)
Co-authored-by: jainendra tarun <ja...@onehouse.ai>
---
.../sources/helpers/gcs/MetadataMessage.java | 30 ++++++++++++++++++++++
1 file changed, 30 insertions(+)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMessage.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMessage.java
index e42ed7fe6d2..01b32fbc39a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMessage.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMessage.java
@@ -18,7 +18,14 @@
package org.apache.hudi.utilities.sources.helpers.gcs;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.pubsub.v1.PubsubMessage;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.ProcessingDecision.DO_SKIP;
@@ -29,6 +36,8 @@ import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.Proc
*/
public class MetadataMessage {
+ private static final Logger LOG = LogManager.getLogger(MetadataMessage.class);
+
// The CSPN message to wrap
private final PubsubMessage message;
@@ -37,6 +46,7 @@ public class MetadataMessage {
private static final String ATTR_EVENT_TYPE = "eventType";
private static final String ATTR_OBJECT_ID = "objectId";
private static final String ATTR_OVERWROTE_GENERATION = "overwroteGeneration";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public MetadataMessage(PubsubMessage message) {
this.message = message;
@@ -63,6 +73,14 @@ public class MetadataMessage {
);
}
+ try {
+ if (isEmptyFile()) {
+ return new MessageValidity(DO_SKIP, "Object " + getObjectId() + " is empty.");
+ }
+ } catch (IOException e) {
+ LOG.error("Exception while extracting the size for object " + getObjectId(), e);
+ }
+
return MessageValidity.DEFAULT_VALID_MESSAGE;
}
@@ -82,6 +100,12 @@ public class MetadataMessage {
return EVENT_NAME_OBJECT_FINALIZE.equals(getEventType());
}
+ private boolean isEmptyFile() throws IOException {
+ String sizeValue = getDataField("size");
+ long size = Long.parseLong(sizeValue);
+ return size <= 0;
+ }
+
public String getEventType() {
return getAttr(ATTR_EVENT_TYPE);
}
@@ -98,4 +122,10 @@ public class MetadataMessage {
return message.getAttributesMap().get(attrName);
}
+ private String getDataField(String fieldName) throws IOException {
+ JsonNode root = OBJECT_MAPPER.readValue(toStringUtf8(), JsonNode.class);
+ JsonNode fieldNode = root.get(fieldName);
+ return fieldNode.asText();
+ }
+
}