You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/08/12 15:31:27 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3433: [HUDI-1897] Deltastreamer source for AWS S3

nsivabalan commented on a change in pull request #3433:
URL: https://github.com/apache/hudi/pull/3433#discussion_r687820245



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsMetaSource.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsMetaSelector;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This source provides capability to create the hoodie table for cloudObject Metadata (eg. s3
+ * events data). It will use the cloud queue for receiving the object key events. This can be useful
+ * for check cloud file activity over time and consuming this to create other hoodie table from

Review comment:
       minor: this can be useful "to" check cloud file activity over time and "create the hoodie cloud meta table."

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsMetaSource.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsMetaSelector;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This source provides capability to create the hoodie table for cloudObject Metadata (eg. s3

Review comment:
       to avoid confusion w/ hoodie metadata table in general, lets call the table we create in this 2 stage pipeline as "hoodie cloud meta table". If you agree, can you please fix the terminology throughout the patch.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.json.JSONObject;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Cloud Objects Selector Class. This class has methods for processing cloud objects. It currently
+ * supports only AWS S3 objects and AWS SQS queue.
+ */
+public class CloudObjectsSelector {
+  public static final List<String> ALLOWED_S3_EVENT_PREFIX =
+      Collections.singletonList("ObjectCreated");
+  public static volatile Logger log = LogManager.getLogger(CloudObjectsSelector.class);
+  public final String queueUrl;
+  public final int longPollWait;
+  public final int maxMessagesEachRequest;
+  public final int maxMessageEachBatch;
+  public final int visibilityTimeout;
+  public final TypedProperties props;
+  public final String fsName;
+  private final String regionName;
+
+  /**
+   * Cloud Objects Selector Class. {@link CloudObjectsSelector}
+   */
+  public CloudObjectsSelector(TypedProperties props) {
+    DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.QUEUE_URL_PROP, Config.QUEUE_REGION));
+    this.props = props;
+    this.queueUrl = props.getString(Config.QUEUE_URL_PROP);
+    this.regionName = props.getString(Config.QUEUE_REGION);
+    this.fsName = props.getString(Config.SOURCE_QUEUE_FS_PROP, "s3").toLowerCase();
+    this.longPollWait = props.getInteger(Config.QUEUE_LONGPOLLWAIT_PROP, 20);
+    this.maxMessageEachBatch = props.getInteger(Config.QUEUE_MAXMESSAGESEACHBATCH_PROP, 5);
+    this.visibilityTimeout = props.getInteger(Config.QUEUE_VISIBILITYTIMEOUT_PROP, 30);
+    this.maxMessagesEachRequest = 10;
+  }
+
+  /**
+   * Get SQS queue attributes.
+   *
+   * @param sqsClient AWSClient for sqsClient
+   * @param queueUrl  queue full url
+   * @return map of attributes needed
+   */
+  protected Map<String, String> getSqsQueueAttributes(AmazonSQS sqsClient, String queueUrl) {
+    GetQueueAttributesResult queueAttributesResult =
+        sqsClient.getQueueAttributes(
+            new GetQueueAttributesRequest(queueUrl)
+                .withAttributeNames("ApproximateNumberOfMessages"));
+    return queueAttributesResult.getAttributes();
+  }
+
+  /**
+   * Get the file attributes filePath, eventTime and size from JSONObject record.
+   *
+   * @param record of object event
+   * @return map of file attribute
+   */
+  protected Map<String, Object> getFileAttributesFromRecord(JSONObject record)
+      throws UnsupportedEncodingException {
+
+    Map<String, Object> fileRecord = new HashMap<>();
+    String eventTimeStr = record.getString("eventTime");
+    long eventTime =
+        Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(eventTimeStr))).getTime();
+
+    JSONObject s3Object = record.getJSONObject("s3").getJSONObject("object");
+    String bucket =
+        URLDecoder.decode(
+            record.getJSONObject("s3").getJSONObject("bucket").getString("name"), "UTF-8");
+    String key = URLDecoder.decode(s3Object.getString("key"), "UTF-8");
+    String filePath = this.fsName + "://" + bucket + "/" + key;
+
+    fileRecord.put("eventTime", eventTime);
+    fileRecord.put("fileSize", s3Object.getLong("size"));
+    fileRecord.put("filePath", filePath);
+    return fileRecord;
+  }
+
+  /**
+   * Amazon SQS Client Builder.
+   */
+  public AmazonSQS createAmazonSqsClient() {
+    return AmazonSQSClientBuilder.standard().withRegion(Regions.fromName(regionName)).build();
+  }
+
+  /**
+   * List messages from queue.
+   */
+  protected List<Message> getMessagesToProcess(
+      AmazonSQS sqsClient,
+      String queueUrl,
+      ReceiveMessageRequest receiveMessageRequest,
+      int maxMessageEachBatch,
+      int maxMessagesEachRequest) {
+    List<Message> messagesToProcess = new ArrayList<>();
+
+    // Get count for available messages
+    Map<String, String> queueAttributesResult = getSqsQueueAttributes(sqsClient, queueUrl);
+    long approxMessagesAvailable =
+        Long.parseLong(queueAttributesResult.get("ApproximateNumberOfMessages"));
+    log.info("Approx. " + approxMessagesAvailable + " messages available in queue.");
+
+    for (int i = 0;
+         i < (int) Math.ceil((double) approxMessagesAvailable / maxMessagesEachRequest);
+         ++i) {
+      List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages();
+      log.debug("Messages size: " + messages.size());
+
+      for (Message message : messages) {
+        log.debug("message id: " + message.getMessageId());
+        messagesToProcess.add(message);
+      }
+      log.debug("total fetched messages size: " + messagesToProcess.size());
+      if (messages.isEmpty() || (messagesToProcess.size() >= maxMessageEachBatch)) {
+        break;
+      }
+    }
+    return messagesToProcess;
+  }
+
+  /**
+   * create partitions of list using specific batch size. we can't use third party API for this
+   * functionality, due to https://github.com/apache/hudi/blob/master/style/checkstyle.xml#L270
+   */
+  protected List<List<Message>> createListPartitions(List<Message> singleList, int eachBatchSize) {
+    List<List<Message>> listPartitions = new ArrayList<>();
+
+    if (singleList.size() == 0 || eachBatchSize < 1) {
+      return listPartitions;
+    }
+
+    for (int start = 0; start < singleList.size(); start += eachBatchSize) {
+      int end = Math.min(start + eachBatchSize, singleList.size());
+
+      if (start > end) {
+        throw new IndexOutOfBoundsException(
+            "Index " + start + " is out of the list range <0," + (singleList.size() - 1) + ">");
+      }
+      listPartitions.add(new ArrayList<>(singleList.subList(start, end)));
+    }
+    return listPartitions;
+  }
+
+  /**
+   * delete batch of messages from queue.
+   */
+  protected void deleteBatchOfMessages(
+      AmazonSQS sqs, String queueUrl, List<Message> messagesToBeDeleted) {
+    DeleteMessageBatchRequest deleteBatchReq =
+        new DeleteMessageBatchRequest().withQueueUrl(queueUrl);
+    List<DeleteMessageBatchRequestEntry> deleteEntries = deleteBatchReq.getEntries();
+
+    for (Message message : messagesToBeDeleted) {
+      deleteEntries.add(
+          new DeleteMessageBatchRequestEntry()
+              .withId(message.getMessageId())
+              .withReceiptHandle(message.getReceiptHandle()));
+    }
+    DeleteMessageBatchResult deleteResult = sqs.deleteMessageBatch(deleteBatchReq);
+    List<String> deleteFailures =
+        deleteResult.getFailed().stream()
+            .map(BatchResultErrorEntry::getId)
+            .collect(Collectors.toList());
+    System.out.println("Delete is" + deleteFailures.isEmpty() + "or ignoring it.");
+    if (!deleteFailures.isEmpty()) {
+      log.warn(
+          "Failed to delete "
+              + deleteFailures.size()
+              + " messages out of "
+              + deleteEntries.size()
+              + " from queue.");
+    } else {
+      log.info("Successfully deleted " + deleteEntries.size() + " messages from queue.");
+    }
+  }
+
+  /**
+   * Delete Queue Messages after hudi commit. This method will be invoked by source.onCommit.
+   */
+  public void onCommitDeleteProcessedMessages(

Review comment:
       minor. rename to "deleteProcessedMessages". Every caller is already calling this from within onCommit(). so its understandable. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+  /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+  public CloudObjectsMetaSelector(TypedProperties props) {
+    super(props);
+  }
+
+  /**
+   * Factory method for creating custom CloudObjectsMetaSelector. Default selector to use is {@link
+   * CloudObjectsMetaSelector}
+   */
+  public static CloudObjectsMetaSelector createSourceSelector(TypedProperties props) {
+    String sourceSelectorClass =
+        props.getString(
+            CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+            CloudObjectsMetaSelector.class.getName());
+    try {
+      CloudObjectsMetaSelector selector =
+          (CloudObjectsMetaSelector)
+              ReflectionUtils.loadClass(
+                  sourceSelectorClass, new Class<?>[] {TypedProperties.class}, props);
+
+      log.info("Using path selector " + selector.getClass().getName());
+      return selector;
+    } catch (Exception e) {
+      throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e);
+    }
+  }
+
+  /**
+   * List messages from queue, filter out illegible events while doing so. It will also delete the
+   * ineligible messages from queue.
+   *
+   * @param processedMessages array of processed messages to add more messages
+   * @return the list of eligible records
+   */
+  protected List<Map<String, Object>> getEligibleEvents(
+      AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+    List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+    List<Message> ineligibleMessages = new ArrayList<>();
+
+    ReceiveMessageRequest receiveMessageRequest =
+        new ReceiveMessageRequest()
+            .withQueueUrl(this.queueUrl)
+            .withWaitTimeSeconds(this.longPollWait)
+            .withVisibilityTimeout(this.visibilityTimeout);
+    receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+    List<Message> messages =
+        getMessagesToProcess(
+            sqs,
+            this.queueUrl,
+            receiveMessageRequest,
+            this.maxMessageEachBatch,
+            this.maxMessagesEachRequest);
+
+    for (Message message : messages) {
+      boolean isMessageDelete = Boolean.TRUE;
+
+      JSONObject messageBody = new JSONObject(message.getBody());
+      Map<String, Object> messageMap;
+      ObjectMapper mapper = new ObjectMapper();
+
+      if (messageBody.has("Message")) {
+        // If this messages is from S3Event -> SNS -> SQS
+        messageMap =
+            (Map<String, Object>) mapper.readValue(messageBody.getString("Message"), Map.class);
+      } else {
+        // If this messages is from S3Event -> SQS
+        messageMap = (Map<String, Object>) mapper.readValue(messageBody.toString(), Map.class);
+      }
+      if (messageMap.containsKey("Records")) {
+        List<Map<String, Object>> records = (List<Map<String, Object>>) messageMap.get("Records");
+        for (Map<String, Object> record : records) {
+          String eventName = (String) record.get("eventName");
+
+          // filter only allowed s3 event types
+          if (ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+            eligibleRecords.add(record);
+            isMessageDelete = Boolean.FALSE;
+            processedMessages.add(message);
+
+          } else {
+            log.info("This S3 event " + eventName + " is not allowed, so ignoring it.");
+          }
+        }
+      } else {
+        log.info("Message is not expected format or it's s3:TestEvent");
+      }
+      if (isMessageDelete) {
+        ineligibleMessages.add(message);
+      }
+    }
+    if (!ineligibleMessages.isEmpty()) {
+      deleteBatchOfMessages(sqs, queueUrl, ineligibleMessages);
+    }
+
+    return eligibleRecords;
+  }
+
+  /**
+   * Get the list of events from queue.
+   *
+   * @param sparkContext JavaSparkContext to help parallelize certain operations
+   * @param lastCheckpointStr the last checkpoint time string, empty if first run
+   * @return the list of events
+   */
+  public Pair<List<String>, String> getNextEventsFromQueue(
+      AmazonSQS sqs,
+      JavaSparkContext sparkContext,

Review comment:
       Looks like jsc is not used.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+  /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+  public CloudObjectsMetaSelector(TypedProperties props) {
+    super(props);
+  }
+
+  /**
+   * Factory method for creating custom CloudObjectsMetaSelector. Default selector to use is {@link
+   * CloudObjectsMetaSelector}
+   */
+  public static CloudObjectsMetaSelector createSourceSelector(TypedProperties props) {

Review comment:
       why modelled as a factory? I see this is instantiated only from within MetaSource. So, why not directly call new CloudObjectsMetaSelector(TypedProperties props) from within the constructor of CloudObjectsMetaSource.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.json.JSONObject;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Cloud Objects Selector Class. This class has methods for processing cloud objects. It currently
+ * supports only AWS S3 objects and AWS SQS queue.
+ */
+public class CloudObjectsSelector {
+  public static final List<String> ALLOWED_S3_EVENT_PREFIX =
+      Collections.singletonList("ObjectCreated");
+  public static volatile Logger log = LogManager.getLogger(CloudObjectsSelector.class);
+  public final String queueUrl;
+  public final int longPollWait;
+  public final int maxMessagesEachRequest;
+  public final int maxMessageEachBatch;
+  public final int visibilityTimeout;
+  public final TypedProperties props;
+  public final String fsName;
+  private final String regionName;
+
+  /**
+   * Cloud Objects Selector Class. {@link CloudObjectsSelector}
+   */
+  public CloudObjectsSelector(TypedProperties props) {
+    DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.QUEUE_URL_PROP, Config.QUEUE_REGION));
+    this.props = props;
+    this.queueUrl = props.getString(Config.QUEUE_URL_PROP);
+    this.regionName = props.getString(Config.QUEUE_REGION);
+    this.fsName = props.getString(Config.SOURCE_QUEUE_FS_PROP, "s3").toLowerCase();
+    this.longPollWait = props.getInteger(Config.QUEUE_LONGPOLLWAIT_PROP, 20);
+    this.maxMessageEachBatch = props.getInteger(Config.QUEUE_MAXMESSAGESEACHBATCH_PROP, 5);
+    this.visibilityTimeout = props.getInteger(Config.QUEUE_VISIBILITYTIMEOUT_PROP, 30);
+    this.maxMessagesEachRequest = 10;
+  }
+
+  /**
+   * Get SQS queue attributes.
+   *
+   * @param sqsClient AWSClient for sqsClient
+   * @param queueUrl  queue full url
+   * @return map of attributes needed
+   */
+  protected Map<String, String> getSqsQueueAttributes(AmazonSQS sqsClient, String queueUrl) {
+    GetQueueAttributesResult queueAttributesResult =
+        sqsClient.getQueueAttributes(
+            new GetQueueAttributesRequest(queueUrl)
+                .withAttributeNames("ApproximateNumberOfMessages"));
+    return queueAttributesResult.getAttributes();
+  }
+
+  /**
+   * Get the file attributes filePath, eventTime and size from JSONObject record.
+   *
+   * @param record of object event
+   * @return map of file attribute
+   */
+  protected Map<String, Object> getFileAttributesFromRecord(JSONObject record)
+      throws UnsupportedEncodingException {
+
+    Map<String, Object> fileRecord = new HashMap<>();
+    String eventTimeStr = record.getString("eventTime");
+    long eventTime =
+        Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(eventTimeStr))).getTime();
+
+    JSONObject s3Object = record.getJSONObject("s3").getJSONObject("object");
+    String bucket =
+        URLDecoder.decode(
+            record.getJSONObject("s3").getJSONObject("bucket").getString("name"), "UTF-8");
+    String key = URLDecoder.decode(s3Object.getString("key"), "UTF-8");
+    String filePath = this.fsName + "://" + bucket + "/" + key;
+
+    fileRecord.put("eventTime", eventTime);
+    fileRecord.put("fileSize", s3Object.getLong("size"));
+    fileRecord.put("filePath", filePath);
+    return fileRecord;
+  }
+
+  /**
+   * Amazon SQS Client Builder.
+   */
+  public AmazonSQS createAmazonSqsClient() {
+    return AmazonSQSClientBuilder.standard().withRegion(Regions.fromName(regionName)).build();
+  }
+
+  /**
+   * List messages from queue.
+   */
+  protected List<Message> getMessagesToProcess(
+      AmazonSQS sqsClient,
+      String queueUrl,
+      ReceiveMessageRequest receiveMessageRequest,
+      int maxMessageEachBatch,
+      int maxMessagesEachRequest) {
+    List<Message> messagesToProcess = new ArrayList<>();
+
+    // Get count for available messages
+    Map<String, String> queueAttributesResult = getSqsQueueAttributes(sqsClient, queueUrl);
+    long approxMessagesAvailable =
+        Long.parseLong(queueAttributesResult.get("ApproximateNumberOfMessages"));
+    log.info("Approx. " + approxMessagesAvailable + " messages available in queue.");
+
+    for (int i = 0;
+         i < (int) Math.ceil((double) approxMessagesAvailable / maxMessagesEachRequest);
+         ++i) {
+      List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages();
+      log.debug("Messages size: " + messages.size());
+
+      for (Message message : messages) {
+        log.debug("message id: " + message.getMessageId());
+        messagesToProcess.add(message);
+      }
+      log.debug("total fetched messages size: " + messagesToProcess.size());
+      if (messages.isEmpty() || (messagesToProcess.size() >= maxMessageEachBatch)) {
+        break;
+      }
+    }
+    return messagesToProcess;
+  }
+
+  /**
+   * create partitions of list using specific batch size. we can't use third party API for this
+   * functionality, due to https://github.com/apache/hudi/blob/master/style/checkstyle.xml#L270
+   */
+  protected List<List<Message>> createListPartitions(List<Message> singleList, int eachBatchSize) {
+    List<List<Message>> listPartitions = new ArrayList<>();
+
+    if (singleList.size() == 0 || eachBatchSize < 1) {
+      return listPartitions;
+    }
+
+    for (int start = 0; start < singleList.size(); start += eachBatchSize) {
+      int end = Math.min(start + eachBatchSize, singleList.size());
+
+      if (start > end) {
+        throw new IndexOutOfBoundsException(
+            "Index " + start + " is out of the list range <0," + (singleList.size() - 1) + ">");
+      }
+      listPartitions.add(new ArrayList<>(singleList.subList(start, end)));
+    }
+    return listPartitions;
+  }
+
+  /**
+   * delete batch of messages from queue.
+   */
+  protected void deleteBatchOfMessages(
+      AmazonSQS sqs, String queueUrl, List<Message> messagesToBeDeleted) {
+    DeleteMessageBatchRequest deleteBatchReq =
+        new DeleteMessageBatchRequest().withQueueUrl(queueUrl);
+    List<DeleteMessageBatchRequestEntry> deleteEntries = deleteBatchReq.getEntries();
+
+    for (Message message : messagesToBeDeleted) {
+      deleteEntries.add(
+          new DeleteMessageBatchRequestEntry()
+              .withId(message.getMessageId())
+              .withReceiptHandle(message.getReceiptHandle()));
+    }
+    DeleteMessageBatchResult deleteResult = sqs.deleteMessageBatch(deleteBatchReq);
+    List<String> deleteFailures =
+        deleteResult.getFailed().stream()
+            .map(BatchResultErrorEntry::getId)
+            .collect(Collectors.toList());
+    System.out.println("Delete is" + deleteFailures.isEmpty() + "or ignoring it.");
+    if (!deleteFailures.isEmpty()) {
+      log.warn(
+          "Failed to delete "
+              + deleteFailures.size()
+              + " messages out of "
+              + deleteEntries.size()
+              + " from queue.");
+    } else {
+      log.info("Successfully deleted " + deleteEntries.size() + " messages from queue.");
+    }
+  }
+
+  /**
+   * Delete Queue Messages after hudi commit. This method will be invoked by source.onCommit.
+   */
+  public void onCommitDeleteProcessedMessages(
+      AmazonSQS sqs, String queueUrl, List<Message> processedMessages) {
+
+    if (!processedMessages.isEmpty()) {
+
+      // create batch for deletion, SES DeleteMessageBatchRequest only accept max 10 entries
+      List<List<Message>> deleteBatches = createListPartitions(processedMessages, 10);
+      for (List<Message> deleteBatch : deleteBatches) {
+        deleteBatchOfMessages(sqs, queueUrl, deleteBatch);
+      }
+    }
+  }
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    /**
+     * {@value #QUEUE_URL_PROP} is the queue url for cloud object events.
+     */
+    public static final String QUEUE_URL_PROP = "hoodie.deltastreamer.source.queue.url";

Review comment:
       we might need to fix the naming convention of all these configs. 
   "hoodie.deltastreamer.cloud.source...." or something on similar lines. Wdyt?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+  /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+  public CloudObjectsMetaSelector(TypedProperties props) {
+    super(props);
+  }
+
+  /**
+   * Factory method for creating custom CloudObjectsMetaSelector. Default selector to use is {@link
+   * CloudObjectsMetaSelector}
+   */
+  public static CloudObjectsMetaSelector createSourceSelector(TypedProperties props) {
+    String sourceSelectorClass =
+        props.getString(
+            CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+            CloudObjectsMetaSelector.class.getName());
+    try {
+      CloudObjectsMetaSelector selector =
+          (CloudObjectsMetaSelector)
+              ReflectionUtils.loadClass(
+                  sourceSelectorClass, new Class<?>[] {TypedProperties.class}, props);
+
+      log.info("Using path selector " + selector.getClass().getName());
+      return selector;
+    } catch (Exception e) {
+      throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e);
+    }
+  }
+
+  /**
+   * List messages from queue, filter out illegible events while doing so. It will also delete the
+   * ineligible messages from queue.
+   *
+   * @param processedMessages array of processed messages to add more messages
+   * @return the list of eligible records
+   */
+  protected List<Map<String, Object>> getEligibleEvents(
+      AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+    List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+    List<Message> ineligibleMessages = new ArrayList<>();
+
+    ReceiveMessageRequest receiveMessageRequest =
+        new ReceiveMessageRequest()
+            .withQueueUrl(this.queueUrl)
+            .withWaitTimeSeconds(this.longPollWait)
+            .withVisibilityTimeout(this.visibilityTimeout);
+    receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+    List<Message> messages =
+        getMessagesToProcess(
+            sqs,
+            this.queueUrl,
+            receiveMessageRequest,
+            this.maxMessageEachBatch,
+            this.maxMessagesEachRequest);
+
+    for (Message message : messages) {
+      boolean isMessageDelete = Boolean.TRUE;
+
+      JSONObject messageBody = new JSONObject(message.getBody());
+      Map<String, Object> messageMap;
+      ObjectMapper mapper = new ObjectMapper();
+
+      if (messageBody.has("Message")) {
+        // If this messages is from S3Event -> SNS -> SQS
+        messageMap =
+            (Map<String, Object>) mapper.readValue(messageBody.getString("Message"), Map.class);
+      } else {
+        // If this messages is from S3Event -> SQS
+        messageMap = (Map<String, Object>) mapper.readValue(messageBody.toString(), Map.class);
+      }
+      if (messageMap.containsKey("Records")) {
+        List<Map<String, Object>> records = (List<Map<String, Object>>) messageMap.get("Records");
+        for (Map<String, Object> record : records) {
+          String eventName = (String) record.get("eventName");
+
+          // filter only allowed s3 event types
+          if (ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+            eligibleRecords.add(record);
+            isMessageDelete = Boolean.FALSE;
+            processedMessages.add(message);
+
+          } else {
+            log.info("This S3 event " + eventName + " is not allowed, so ignoring it.");
+          }
+        }
+      } else {
+        log.info("Message is not expected format or it's s3:TestEvent");
+      }
+      if (isMessageDelete) {
+        ineligibleMessages.add(message);
+      }
+    }
+    if (!ineligibleMessages.isEmpty()) {
+      deleteBatchOfMessages(sqs, queueUrl, ineligibleMessages);
+    }
+
+    return eligibleRecords;
+  }
+
+  /**
+   * Get the list of events from queue.
+   *
+   * @param sparkContext JavaSparkContext to help parallelize certain operations
+   * @param lastCheckpointStr the last checkpoint time string, empty if first run
+   * @return the list of events
+   */
+  public Pair<List<String>, String> getNextEventsFromQueue(
+      AmazonSQS sqs,
+      JavaSparkContext sparkContext,
+      Option<String> lastCheckpointStr,
+      List<Message> processedMessages) {
+
+    processedMessages.clear();
+
+    log.info("Reading messages....");
+
+    try {
+      log.info("Start Checkpoint : " + lastCheckpointStr);
+
+      long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+
+      List<Map<String, Object>> eligibleEventRecords = getEligibleEvents(sqs, processedMessages);
+      log.info("eligible events size: " + eligibleEventRecords.size());
+
+      // sort all events by event time.
+      eligibleEventRecords.sort(
+          Comparator.comparingLong(

Review comment:
       may I know why do we need this sorting? If Hoodie is going to do preCombine anyways, are we not duplicating the efforts here. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+  /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+  public CloudObjectsMetaSelector(TypedProperties props) {
+    super(props);
+  }
+
+  /**
+   * Factory method for creating custom CloudObjectsMetaSelector. Default selector to use is {@link
+   * CloudObjectsMetaSelector}
+   */
+  public static CloudObjectsMetaSelector createSourceSelector(TypedProperties props) {
+    String sourceSelectorClass =
+        props.getString(
+            CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+            CloudObjectsMetaSelector.class.getName());
+    try {
+      CloudObjectsMetaSelector selector =
+          (CloudObjectsMetaSelector)
+              ReflectionUtils.loadClass(
+                  sourceSelectorClass, new Class<?>[] {TypedProperties.class}, props);
+
+      log.info("Using path selector " + selector.getClass().getName());
+      return selector;
+    } catch (Exception e) {
+      throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e);
+    }
+  }
+
+  /**
+   * List messages from queue, filter out illegible events while doing so. It will also delete the
+   * ineligible messages from queue.
+   *
+   * @param processedMessages array of processed messages to add more messages
+   * @return the list of eligible records
+   */
+  protected List<Map<String, Object>> getEligibleEvents(
+      AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+    List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+    List<Message> ineligibleMessages = new ArrayList<>();
+
+    ReceiveMessageRequest receiveMessageRequest =
+        new ReceiveMessageRequest()
+            .withQueueUrl(this.queueUrl)
+            .withWaitTimeSeconds(this.longPollWait)
+            .withVisibilityTimeout(this.visibilityTimeout);
+    receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+    List<Message> messages =
+        getMessagesToProcess(
+            sqs,
+            this.queueUrl,
+            receiveMessageRequest,
+            this.maxMessageEachBatch,
+            this.maxMessagesEachRequest);
+
+    for (Message message : messages) {
+      boolean isMessageDelete = Boolean.TRUE;
+
+      JSONObject messageBody = new JSONObject(message.getBody());
+      Map<String, Object> messageMap;
+      ObjectMapper mapper = new ObjectMapper();
+
+      if (messageBody.has("Message")) {
+        // If this messages is from S3Event -> SNS -> SQS
+        messageMap =
+            (Map<String, Object>) mapper.readValue(messageBody.getString("Message"), Map.class);
+      } else {
+        // If this messages is from S3Event -> SQS
+        messageMap = (Map<String, Object>) mapper.readValue(messageBody.toString(), Map.class);
+      }
+      if (messageMap.containsKey("Records")) {
+        List<Map<String, Object>> records = (List<Map<String, Object>>) messageMap.get("Records");
+        for (Map<String, Object> record : records) {
+          String eventName = (String) record.get("eventName");
+
+          // filter only allowed s3 event types
+          if (ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+            eligibleRecords.add(record);
+            isMessageDelete = Boolean.FALSE;
+            processedMessages.add(message);
+
+          } else {
+            log.info("This S3 event " + eventName + " is not allowed, so ignoring it.");
+          }
+        }
+      } else {
+        log.info("Message is not expected format or it's s3:TestEvent");
+      }
+      if (isMessageDelete) {
+        ineligibleMessages.add(message);
+      }
+    }
+    if (!ineligibleMessages.isEmpty()) {
+      deleteBatchOfMessages(sqs, queueUrl, ineligibleMessages);
+    }
+
+    return eligibleRecords;
+  }
+
+  /**
+   * Get the list of events from queue.
+   *
+   * @param sparkContext JavaSparkContext to help parallelize certain operations
+   * @param lastCheckpointStr the last checkpoint time string, empty if first run
+   * @return the list of events
+   */
+  public Pair<List<String>, String> getNextEventsFromQueue(
+      AmazonSQS sqs,
+      JavaSparkContext sparkContext,
+      Option<String> lastCheckpointStr,
+      List<Message> processedMessages) {
+
+    processedMessages.clear();
+

Review comment:
       please avoid unnecessary line breaks.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsMetaSource.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsMetaSelector;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This source provides capability to create the hoodie table for cloudObject Metadata (eg. s3
+ * events data). It will use the cloud queue for receiving the object key events. This can be useful
+ * for check cloud file activity over time and consuming this to create other hoodie table from
+ * cloud object data.
+ */
+public class CloudObjectsMetaSource extends RowSource {
+
+  private final CloudObjectsMetaSelector pathSelector;
+  private final List<Message> processedMessages = new ArrayList<>();
+  AmazonSQS sqs;
+
+  /**
+   * Cloud Objects Meta Source Class.
+   */
+  public CloudObjectsMetaSource(
+      TypedProperties props,
+      JavaSparkContext sparkContext,
+      SparkSession sparkSession,
+      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+    this.pathSelector = CloudObjectsMetaSelector.createSourceSelector(props);
+    this.sqs = this.pathSelector.createAmazonSqsClient();
+  }
+
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(
+      Option<String> lastCkptStr, long sourceLimit) {
+
+    Pair<List<String>, String> selectPathsWithLatestSqsMessage =

Review comment:
       can you add a java doc here to explain what are the components in this pair. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsMetaSource.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsMetaSelector;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This source provides capability to create the hoodie table for cloudObject Metadata (eg. s3
+ * events data). It will use the cloud queue for receiving the object key events. This can be useful
+ * for check cloud file activity over time and consuming this to create other hoodie table from
+ * cloud object data.
+ */
+public class CloudObjectsMetaSource extends RowSource {
+
+  private final CloudObjectsMetaSelector pathSelector;
+  private final List<Message> processedMessages = new ArrayList<>();
+  AmazonSQS sqs;
+
+  /**
+   * Cloud Objects Meta Source Class.
+   */
+  public CloudObjectsMetaSource(
+      TypedProperties props,
+      JavaSparkContext sparkContext,
+      SparkSession sparkSession,
+      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+    this.pathSelector = CloudObjectsMetaSelector.createSourceSelector(props);
+    this.sqs = this.pathSelector.createAmazonSqsClient();
+  }
+
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(
+      Option<String> lastCkptStr, long sourceLimit) {
+
+    Pair<List<String>, String> selectPathsWithLatestSqsMessage =
+        pathSelector.getNextEventsFromQueue(sqs, sparkContext, lastCkptStr, processedMessages);
+    if (selectPathsWithLatestSqsMessage.getLeft().isEmpty()) {
+      return Pair.of(Option.empty(), selectPathsWithLatestSqsMessage.getRight());
+    } else {
+      return Pair.of(
+          Option.of(fromEventRecords(selectPathsWithLatestSqsMessage.getLeft())),
+          selectPathsWithLatestSqsMessage.getRight());
+    }
+  }
+
+  private Dataset<Row> fromEventRecords(List<String> jsonData) {

Review comment:
       If not going to be re-used, may be we can just inline this 2 lines in line 72 ish. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cloud Objects Meta Selector Class. This class will provide the methods to process the messages
+ * from queue for CloudObjectsMetaSource.
+ */
+public class CloudObjectsMetaSelector extends CloudObjectsSelector {
+
+  /** Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} */
+  public CloudObjectsMetaSelector(TypedProperties props) {
+    super(props);
+  }
+
+  /**
+   * Factory method for creating custom CloudObjectsMetaSelector. Default selector to use is {@link
+   * CloudObjectsMetaSelector}
+   */
+  public static CloudObjectsMetaSelector createSourceSelector(TypedProperties props) {
+    String sourceSelectorClass =
+        props.getString(
+            CloudObjectsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
+            CloudObjectsMetaSelector.class.getName());
+    try {
+      CloudObjectsMetaSelector selector =
+          (CloudObjectsMetaSelector)
+              ReflectionUtils.loadClass(
+                  sourceSelectorClass, new Class<?>[] {TypedProperties.class}, props);
+
+      log.info("Using path selector " + selector.getClass().getName());
+      return selector;
+    } catch (Exception e) {
+      throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e);
+    }
+  }
+
+  /**
+   * List messages from queue, filter out illegible events while doing so. It will also delete the
+   * ineligible messages from queue.
+   *
+   * @param processedMessages array of processed messages to add more messages
+   * @return the list of eligible records
+   */
+  protected List<Map<String, Object>> getEligibleEvents(
+      AmazonSQS sqs, List<Message> processedMessages) throws IOException {
+
+    List<Map<String, Object>> eligibleRecords = new ArrayList<>();
+    List<Message> ineligibleMessages = new ArrayList<>();
+
+    ReceiveMessageRequest receiveMessageRequest =
+        new ReceiveMessageRequest()
+            .withQueueUrl(this.queueUrl)
+            .withWaitTimeSeconds(this.longPollWait)
+            .withVisibilityTimeout(this.visibilityTimeout);
+    receiveMessageRequest.setMaxNumberOfMessages(this.maxMessagesEachRequest);
+
+    List<Message> messages =
+        getMessagesToProcess(
+            sqs,
+            this.queueUrl,
+            receiveMessageRequest,
+            this.maxMessageEachBatch,
+            this.maxMessagesEachRequest);
+
+    for (Message message : messages) {
+      boolean isMessageDelete = Boolean.TRUE;
+
+      JSONObject messageBody = new JSONObject(message.getBody());
+      Map<String, Object> messageMap;
+      ObjectMapper mapper = new ObjectMapper();
+
+      if (messageBody.has("Message")) {
+        // If this messages is from S3Event -> SNS -> SQS
+        messageMap =
+            (Map<String, Object>) mapper.readValue(messageBody.getString("Message"), Map.class);
+      } else {
+        // If this messages is from S3Event -> SQS
+        messageMap = (Map<String, Object>) mapper.readValue(messageBody.toString(), Map.class);
+      }
+      if (messageMap.containsKey("Records")) {
+        List<Map<String, Object>> records = (List<Map<String, Object>>) messageMap.get("Records");
+        for (Map<String, Object> record : records) {
+          String eventName = (String) record.get("eventName");
+
+          // filter only allowed s3 event types
+          if (ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
+            eligibleRecords.add(record);
+            isMessageDelete = Boolean.FALSE;
+            processedMessages.add(message);
+
+          } else {
+            log.info("This S3 event " + eventName + " is not allowed, so ignoring it.");
+          }
+        }
+      } else {
+        log.info("Message is not expected format or it's s3:TestEvent");
+      }
+      if (isMessageDelete) {
+        ineligibleMessages.add(message);
+      }
+    }
+    if (!ineligibleMessages.isEmpty()) {
+      deleteBatchOfMessages(sqs, queueUrl, ineligibleMessages);
+    }
+
+    return eligibleRecords;
+  }
+
+  /**
+   * Get the list of events from queue.
+   *
+   * @param sparkContext JavaSparkContext to help parallelize certain operations
+   * @param lastCheckpointStr the last checkpoint time string, empty if first run
+   * @return the list of events
+   */
+  public Pair<List<String>, String> getNextEventsFromQueue(
+      AmazonSQS sqs,
+      JavaSparkContext sparkContext,
+      Option<String> lastCheckpointStr,
+      List<Message> processedMessages) {
+
+    processedMessages.clear();
+
+    log.info("Reading messages....");
+
+    try {
+      log.info("Start Checkpoint : " + lastCheckpointStr);
+
+      long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);

Review comment:
       If parsing fails, why set to min_value. negative value does not makes sense. atleast we should set to 0. Or if its epoch, we should set it to the earliest time in epoch (1970/01/01...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org