You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/22 17:04:43 UTC

[GitHub] [hudi] codope commented on a diff in pull request #6665: [HUDI-4850] Incremental Ingestion from GCS

codope commented on code in PR #6665:
URL: https://github.com/apache/hudi/pull/6665#discussion_r977747273


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch;
+import org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity;
+import org.apache.hudi.utilities.sources.helpers.gcs.MetadataMsg;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ACK_MESSAGES;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ACK_MESSAGES_DEFAULT_VALUE;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.BATCH_SIZE_CONF;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_BATCH_SIZE;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GOOGLE_PROJECT_ID;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.PUBSUB_SUBSCRIPTION_ID;
+import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_SKIP;
+
+/*
+ * An incremental source to fetch from a Google Cloud Pubsub topic (a subscription, to be precise),
+ * and download them into a Hudi table. The messages are assumed to be of type Cloud Storage Pubsub Notification.
+ *
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore).
+
+ absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end:
+$ bin/spark-submit \
+--driver-memory 4g \
+--executor-memory 4g \
+--packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+--source-class org.apache.hudi.utilities.sources.GcsEventsSource \
+--op INSERT \
+--hoodie-conf hoodie.datasource.write.recordkey.field="id" \
+--source-ordering-field timeCreated \
+--hoodie-conf hoodie.index.type=GLOBAL_BLOOM \
+--filter-dupes \
+--allow-commit-on-no-checkpoint-change \
+--hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+--hoodie-conf hoodie.combine.before.insert=true \
+--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
+--hoodie-conf hoodie.datasource.write.partitionpath.field=bucket \
+--hoodie-conf hoodie.deltastreamer.source.gcs.project.id=infra-dev-358110 \
+--hoodie-conf hoodie.deltastreamer.source.gcs.subscription.id=gcs-obj-8-sub-1 \
+--hoodie-conf hoodie.deltastreamer.source.gcs.ack=true \
+--table-type COPY_ON_WRITE \
+--target-base-path file:\/\/\/absolute_path_to/meta-gcs \
+--target-table gcs_meta \
+--continuous \
+--source-limit 100 \
+--min-sync-interval-seconds 100 \
+--enable-hive-sync \

Review Comment:
   Why would we want to enable hive sync for metadata table?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore).
+
+ absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar

Review Comment:
   Are all these runtime jars absolutely necessary?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
+import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
+import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
+import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+
+/**
+ * Uses the start and end instants of a DeltaStreamer Source to help construct the right kind
+ * of query for subsequent requests.
+ */
+public class QueryInfo {
+
+  private final String queryType;
+  private final String startInstant;
+  private final String endInstant;
+
+  private static final Logger LOG = LogManager.getLogger(QueryInfo.class);
+
+  public QueryInfo(Pair<String, Pair<String, String>> queryInfoPair) {
+    this.queryType = queryInfoPair.getLeft();
+    this.startInstant = queryInfoPair.getRight().getLeft();
+    this.endInstant = queryInfoPair.getRight().getRight();
+  }
+
+  public Dataset<Row> initializeSourceForFilenames(String srcPath, SparkSession sparkSession) {
+    if (isIncremental()) {
+      return incrementalQuery(sparkSession).load(srcPath);
+    }
+
+    // Issue a snapshot query.
+    return snapshotQuery(sparkSession).load(srcPath)
+            .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, getStartInstant()));
+  }
+
+  public boolean areStartAndEndInstantsEqual() {
+    return getStartInstant().equals(getEndInstant());

Review Comment:
   are startInstant and endInstant non-nullable?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import static com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create;
+import java.io.IOException;
+import java.util.List;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_MAX_INBOUND_MESSAGE_SIZE;
+
+/**
+ * Fetch messages from a specified Google Cloud Pubsub subscription.
+ */
+public class PubsubMessagesFetcher {
+
+  private final String googleProjectId;
+  private final String pubsubSubscriptionId;
+
+  private final int batchSize;
+  private final SubscriberStubSettings subscriberStubSettings;
+
+  private final int maxInboundMessageSize;

Review Comment:
   convert to local variable in the constructor.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMsg.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.pubsub.v1.PubsubMessage;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_PROCESS;
+import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_SKIP;
+
+/**
+ * Wraps a PubsubMessage assuming it's from Cloud Storage Pubsub Notifications (CSPN), and
+ * adds relevant helper methods.
+ * For details of CSPN messages see: https://cloud.google.com/storage/docs/pubsub-notifications
+ */
+public class MetadataMsg {
+
+  // The CSPN message to wrap
+  private final PubsubMessage message;
+
+  private static final String EVENT_NAME_OBJECT_FINALIZE = "OBJECT_FINALIZE";
+
+  private static final String ATTR_EVENT_TYPE = "eventType";
+  private static final String ATTR_OBJECT_ID = "objectId";
+  private static final String ATTR_OVERWROTE_GENERATION = "overwroteGeneration";
+
+  public MetadataMsg(PubsubMessage message) {
+    this.message = message;
+  }
+
+  public String toStringUtf8() {
+    return message.getData().toStringUtf8();
+  }
+
+  /**
+   * Whether a message is valid to be ingested and stored by this Metadata puller.
+   * Ref: https://cloud.google.com/storage/docs/pubsub-notifications#events
+   */
+  public MessageValidity shouldBeProcessed() {
+    if (!isNewFileCreation()) {
+      return DO_SKIP.setReasonToSkip(
+              "eventType: " + getEventType() + ". Not a file creation message."
+      );
+    }
+
+    if (isOverwriteOfExistingFile()) {
+      return DO_SKIP.setReasonToSkip(
+              "eventType: " + getEventType()
+                      + ". Overwrite of existing objectId: " + getObjectId()
+                      + " with generation numner: " + getOverwroteGeneration()
+      );
+    }
+
+    return DO_PROCESS;
+  }
+
+  /**
+   * Whether message represents an overwrite of an existing file.
+   * Ref: https://cloud.google.com/storage/docs/pubsub-notifications#replacing_objects
+   */
+  private boolean isOverwriteOfExistingFile() {
+    return !isNullOrEmpty(getOverwroteGeneration());
+  }
+
+  /**
+   * Returns true if message corresponds to new file creation, false if not.
+   * Ref: https://cloud.google.com/storage/docs/pubsub-notifications#events
+   */
+  private boolean isNewFileCreation() {

Review Comment:
   this can be static as well



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsIngestionConfig.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+/**
+ * Config keys and defaults for GCS Ingestion
+ */
+public class GcsIngestionConfig {
+
+  /**
+   * The GCP Project Id where the Pubsub Subscription to ingest from resides. Needed to connect
+   * to the Pubsub subscription
+   */
+  public static final String GOOGLE_PROJECT_ID = "hoodie.deltastreamer.source.gcs.project.id";
+
+  /**
+   * The GCP Pubsub subscription id for the GCS Notifications. Needed to connect to the Pubsub
+   * subscription.
+   */
+  public static final String PUBSUB_SUBSCRIPTION_ID = "hoodie.deltastreamer.source.gcs.subscription.id";
+
+  /**
+   * How many messages to pull from Cloud Pubsub at a time. Also see {@link DEFAULT_BATCH_SIZE}.

Review Comment:
   nit: `DEFAULT_BATCH_SIZE` -> `#DEFAULT_BATCH_SIZE` for proper comment rendering. Similarly for below configs.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FilePathsFetcher.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.SELECT_RELATIVE_PATH_PREFIX;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_PREFIX;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_SUBSTR;
+
+/**
+ * Extracts a list of fully qualified GCS filepaths from a given Spark Dataset as input.
+ * Optionally:
+ * i) Match the filename and path against provided input filter strings
+ * ii) Check if each file exists on GCS, in which case it assumes SparkContext is already
+ * configured with GCS options through GcsEventsHoodieIncrSource.addGcsAccessConfs().
+ */
+public class FilePathsFetcher implements Serializable {
+
+  /**
+   * The default file format to assume if {@link GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION} is not given.

Review Comment:
   nit: `GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION` -> `GcsIngestionConfig#GCS_INCR_DATAFILE_EXTENSION` for proper comment rendering. Same below.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMsg.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.pubsub.v1.PubsubMessage;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_PROCESS;
+import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_SKIP;
+
+/**
+ * Wraps a PubsubMessage assuming it's from Cloud Storage Pubsub Notifications (CSPN), and
+ * adds relevant helper methods.
+ * For details of CSPN messages see: https://cloud.google.com/storage/docs/pubsub-notifications
+ */
+public class MetadataMsg {

Review Comment:
   Please consider changing this class to be a helper class without requiring instantiation. All methods can be static if the PubSub message is passed to it. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore).
+
+ absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;
+  private final boolean checkIfFileExists;
+  private int numInstantsPerFetch;
+
+  private MissingCheckpointStrategy missingCheckpointStrategy;
+  private final FilePathsFetcher filePathsFetcher;
+  private final FileDataFetcher fileDataFetcher;
+
+  private static final Logger LOG = LogManager.getLogger(GcsEventsHoodieIncrSource.class);
+
+  public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark,
+                                   SchemaProvider schemaProvider) {
+
+    this(props, jsc, spark, schemaProvider,
+            new FilePathsFetcher(props, props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT)),
+            new FileDataFetcher(props, props.getString(DATAFILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT))
+    );
+  }
+
+  GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark,
+                            SchemaProvider schemaProvider, FilePathsFetcher filePathsFetcher, FileDataFetcher fileDataFetcher) {
+    super(props, jsc, spark, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH));
+    srcPath = props.getString(HOODIE_SRC_BASE_PATH);
+    fileFormat = props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT);
+    missingCheckpointStrategy = getMissingCheckpointStrategy(props);
+    numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, DEFAULT_NUM_INSTANTS_PER_FETCH);
+    checkIfFileExists = props.getBoolean(ENABLE_EXISTS_CHECK, DEFAULT_ENABLE_EXISTS_CHECK);
+
+    this.filePathsFetcher = filePathsFetcher;
+    this.fileDataFetcher = fileDataFetcher;
+
+    addGcsAccessConfs(jsc);
+
+    LOG.info("srcPath: " + srcPath);
+    LOG.info("fileFormat: " + fileFormat);
+    LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
+    LOG.info("numInstantsPerFetch: " + numInstantsPerFetch);
+    LOG.info("checkIfFileExists: " + checkIfFileExists);
+  }
+
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
+    QueryInfo queryInfo = getQueryInfo(lastCkptStr);
+
+    if (queryInfo.areStartAndEndInstantsEqual()) {
+      LOG.info("Already caught up. Begin Checkpoint was: " + queryInfo.getStartInstant());
+      return Pair.of(Option.empty(), queryInfo.getStartInstant());
+    }
+
+    Dataset<Row> sourceForFilenames = queryInfo.initializeSourceForFilenames(srcPath, sparkSession);
+
+    if (sourceForFilenames.isEmpty()) {
+      LOG.info("Source of file names is empty. Returning empty result and endInstant: "
+              + queryInfo.getEndInstant());
+      return Pair.of(Option.empty(), queryInfo.getEndInstant());
+    }
+
+    return extractData(queryInfo, sourceForFilenames);
+  }
+
+  private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, Dataset<Row> sourceForFilenames) {
+    List<String> filepaths = filePathsFetcher.getGcsFilePaths(sparkContext, sourceForFilenames, checkIfFileExists);
+
+    LOG.info("Extracted " + filepaths.size() + " distinct files."
+            + " Some samples " + filepaths.stream().limit(10).collect(Collectors.toList()));
+
+    Option<Dataset<Row>> fileDataDs = fileDataFetcher.fetchFileDataFromGcs(sparkSession, filepaths);

Review Comment:
   nit: rename to `fileDataRows`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore).
+
+ absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;

Review Comment:
   Even these can be final right? Are they going to change throughout the lifecyle of ingration?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore).
+
+ absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;
+  private final boolean checkIfFileExists;
+  private int numInstantsPerFetch;
+
+  private MissingCheckpointStrategy missingCheckpointStrategy;

Review Comment:
   even this can be final.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore).
+
+ absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;
+  private final boolean checkIfFileExists;
+  private int numInstantsPerFetch;
+
+  private MissingCheckpointStrategy missingCheckpointStrategy;
+  private final FilePathsFetcher filePathsFetcher;
+  private final FileDataFetcher fileDataFetcher;
+
+  private static final Logger LOG = LogManager.getLogger(GcsEventsHoodieIncrSource.class);
+
+  public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark,
+                                   SchemaProvider schemaProvider) {
+
+    this(props, jsc, spark, schemaProvider,
+            new FilePathsFetcher(props, props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT)),
+            new FileDataFetcher(props, props.getString(DATAFILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT))
+    );
+  }
+
+  GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark,
+                            SchemaProvider schemaProvider, FilePathsFetcher filePathsFetcher, FileDataFetcher fileDataFetcher) {
+    super(props, jsc, spark, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH));
+    srcPath = props.getString(HOODIE_SRC_BASE_PATH);
+    fileFormat = props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT);
+    missingCheckpointStrategy = getMissingCheckpointStrategy(props);
+    numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, DEFAULT_NUM_INSTANTS_PER_FETCH);
+    checkIfFileExists = props.getBoolean(ENABLE_EXISTS_CHECK, DEFAULT_ENABLE_EXISTS_CHECK);
+
+    this.filePathsFetcher = filePathsFetcher;
+    this.fileDataFetcher = fileDataFetcher;
+
+    addGcsAccessConfs(jsc);
+
+    LOG.info("srcPath: " + srcPath);
+    LOG.info("fileFormat: " + fileFormat);
+    LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
+    LOG.info("numInstantsPerFetch: " + numInstantsPerFetch);
+    LOG.info("checkIfFileExists: " + checkIfFileExists);
+  }
+
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
+    QueryInfo queryInfo = getQueryInfo(lastCkptStr);
+
+    if (queryInfo.areStartAndEndInstantsEqual()) {
+      LOG.info("Already caught up. Begin Checkpoint was: " + queryInfo.getStartInstant());
+      return Pair.of(Option.empty(), queryInfo.getStartInstant());
+    }
+
+    Dataset<Row> sourceForFilenames = queryInfo.initializeSourceForFilenames(srcPath, sparkSession);
+
+    if (sourceForFilenames.isEmpty()) {
+      LOG.info("Source of file names is empty. Returning empty result and endInstant: "
+              + queryInfo.getEndInstant());
+      return Pair.of(Option.empty(), queryInfo.getEndInstant());
+    }
+
+    return extractData(queryInfo, sourceForFilenames);
+  }
+
+  private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, Dataset<Row> sourceForFilenames) {
+    List<String> filepaths = filePathsFetcher.getGcsFilePaths(sparkContext, sourceForFilenames, checkIfFileExists);
+
+    LOG.info("Extracted " + filepaths.size() + " distinct files."
+            + " Some samples " + filepaths.stream().limit(10).collect(Collectors.toList()));

Review Comment:
   Why limit 10 is hardcoded? Should it be configurable?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMsg.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.pubsub.v1.PubsubMessage;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_PROCESS;
+import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_SKIP;
+
+/**
+ * Wraps a PubsubMessage assuming it's from Cloud Storage Pubsub Notifications (CSPN), and
+ * adds relevant helper methods.
+ * For details of CSPN messages see: https://cloud.google.com/storage/docs/pubsub-notifications
+ */
+public class MetadataMsg {

Review Comment:
   nit: rename to `MetadataMessage`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
+import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
+import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
+import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+
+/**
+ * Uses the start and end instants of a DeltaStreamer Source to help construct the right kind
+ * of query for subsequent requests.
+ */
+public class QueryInfo {
+
+  private final String queryType;
+  private final String startInstant;
+  private final String endInstant;
+
+  private static final Logger LOG = LogManager.getLogger(QueryInfo.class);
+
+  public QueryInfo(Pair<String, Pair<String, String>> queryInfoPair) {

Review Comment:
   let's use class attributes instead of Pair.. more readable that way.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FileDataFetcher.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.SPARK_DATASOURCE_OPTIONS;
+
+/**
+ * Connects to GCS from Spark and downloads data from a given list of files.
+ * Assumes SparkContext is already configured with GCS options through GcsEventsHoodieIncrSource.addGcsAccessConfs().
+ */
+public class FileDataFetcher implements Serializable {
+
+  private String fileFormat;
+  private TypedProperties props;
+
+  private static final Logger LOG = LogManager.getLogger(FileDataFetcher.class);
+  private static final long serialVersionUID = 1L;
+
+  public FileDataFetcher(TypedProperties props, String fileFormat) {
+    this.fileFormat = fileFormat;
+    this.props = props;
+  }
+
+  /**
+   * @param filepaths Files in GCS from which to fetch data
+   * @return Data in the given list of files, as a Spark DataSet
+   */
+  public Option<Dataset<Row>> fetchFileDataFromGcs(SparkSession spark, List<String> filepaths) {

Review Comment:
   We can reuse the corresponding method from S3 incr source. Let's move to a util class?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GOOGLE_PROJECT_ID;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.PUBSUB_SUBSCRIPTION_ID;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestGcsEventsSource extends UtilitiesTestBase {
+
+  @Mock
+  PubsubMessagesFetcher pubsubMessagesFetcher;
+
+  protected FilebasedSchemaProvider schemaProvider;
+  private TypedProperties props;
+
+  private static final String CHECKPOINT_VALUE_ZERO = "0";
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    UtilitiesTestBase.initTestServices(false, false);
+  }
+
+  @AfterAll
+  public static void afterAll() {
+    UtilitiesTestBase.cleanupClass();
+  }
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    super.setup();
+    schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
+    MockitoAnnotations.initMocks(this);
+
+    props = new TypedProperties();
+    props.put(GOOGLE_PROJECT_ID, "dummy-project");
+    props.put(PUBSUB_SUBSCRIPTION_ID, "dummy-subscription");
+  }
+
+  @AfterEach
+  public void afterEach() throws Exception {
+    super.teardown();
+  }
+
+  @Test
+  public void shouldReturnEmptyOnNoMessages() {
+    when(pubsubMessagesFetcher.fetchMessages()).thenReturn(Collections.emptyList());
+
+    GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession, null,
+            pubsubMessagesFetcher);
+
+    Pair<Option<Dataset<Row>>, String> expected = Pair.of(Option.empty(), "0");
+    Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = source.fetchNextBatch(Option.of("0"), 100);
+
+    assertEquals(expected, dataAndCheckpoint);
+  }
+
+  @Test
+  public void shouldReturnDataOnValidMessages() throws IOException {

Review Comment:
   let's remove the unused exceptions from all tests.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsIngestionConfig.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+/**
+ * Config keys and defaults for GCS Ingestion
+ */
+public class GcsIngestionConfig {
+
+  /**
+   * The GCP Project Id where the Pubsub Subscription to ingest from resides. Needed to connect
+   * to the Pubsub subscription
+   */
+  public static final String GOOGLE_PROJECT_ID = "hoodie.deltastreamer.source.gcs.project.id";
+
+  /**
+   * The GCP Pubsub subscription id for the GCS Notifications. Needed to connect to the Pubsub
+   * subscription.
+   */
+  public static final String PUBSUB_SUBSCRIPTION_ID = "hoodie.deltastreamer.source.gcs.subscription.id";
+
+  /**
+   * How many messages to pull from Cloud Pubsub at a time. Also see {@link DEFAULT_BATCH_SIZE}.
+   */
+  public static final String BATCH_SIZE_CONF = "hoodie.deltastreamer.source.gcs.batch.size";
+
+  /**
+   * Provide a reasonable setting for default batch size.
+   * If batch size is too big, two possible issues can happen:
+   * i) Acknowledgement takes too long (given that Hudi needs to commit first). That means Pubsub
+   * will keep delivering the same message since it wasn't acked in time.
+   * ii) The size of the request that acks outstanding messages may exceed the limit,
+   * which is 512KB as per Google's docs. See: https://cloud.google.com/pubsub/quotas#resource_limits
+   */
+  public static final int DEFAULT_BATCH_SIZE = 10;
+
+  // Size of inbound messages when pulling data, in bytes
+  public static final int DEFAULT_MAX_INBOUND_MESSAGE_SIZE = 20 * 1024 * 1024; // bytes
+
+  /**
+   * Whether to acknowledge messages or not. Not acknowledging means Pubsub will keep redelivering the
+   * same messages. In Prod this should always be true. So this is mainly useful during dev and testing.
+   */
+  public static final String ACK_MESSAGES = "hoodie.deltastreamer.source.gcs.ack";
+
+  /**
+   * Default value for {@link ACK_MESSAGES}
+   */
+  public static final boolean ACK_MESSAGES_DEFAULT_VALUE = true;
+
+  /**
+   * Check whether file exists before attempting to pull it
+   */
+  public static final String ENABLE_EXISTS_CHECK = "hoodie.deltastreamer.source.gcsincr.check.file.exists";

Review Comment:
   nit: change `gcsincr` to `gcs.incr`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore).
+
+ absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;

Review Comment:
   Looks like `fileFormat` is not being used. Either remove it or pass it to the fetchers.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMsg.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.pubsub.v1.PubsubMessage;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_PROCESS;
+import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_SKIP;
+
+/**
+ * Wraps a PubsubMessage assuming it's from Cloud Storage Pubsub Notifications (CSPN), and
+ * adds relevant helper methods.
+ * For details of CSPN messages see: https://cloud.google.com/storage/docs/pubsub-notifications
+ */
+public class MetadataMsg {
+
+  // The CSPN message to wrap
+  private final PubsubMessage message;
+
+  private static final String EVENT_NAME_OBJECT_FINALIZE = "OBJECT_FINALIZE";
+
+  private static final String ATTR_EVENT_TYPE = "eventType";
+  private static final String ATTR_OBJECT_ID = "objectId";
+  private static final String ATTR_OVERWROTE_GENERATION = "overwroteGeneration";
+
+  public MetadataMsg(PubsubMessage message) {
+    this.message = message;
+  }
+
+  public String toStringUtf8() {

Review Comment:
   this can be static method if we pass the PubSub message as an argument.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import static com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create;
+import java.io.IOException;
+import java.util.List;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_MAX_INBOUND_MESSAGE_SIZE;
+
+/**
+ * Fetch messages from a specified Google Cloud Pubsub subscription.
+ */
+public class PubsubMessagesFetcher {

Review Comment:
   Why not subclass `CloudObjectsSelector`? I know it has certain S3 specific things but that can be refactored. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import static com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create;
+import java.io.IOException;
+import java.util.List;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_MAX_INBOUND_MESSAGE_SIZE;
+
+/**
+ * Fetch messages from a specified Google Cloud Pubsub subscription.
+ */
+public class PubsubMessagesFetcher {
+
+  private final String googleProjectId;
+  private final String pubsubSubscriptionId;
+
+  private final int batchSize;
+  private final SubscriberStubSettings subscriberStubSettings;
+
+  private final int maxInboundMessageSize;
+
+  private static final Logger LOG = LogManager.getLogger(PubsubMessagesFetcher.class);
+
+  public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId, int batchSize) {
+    this.googleProjectId = googleProjectId;
+    this.pubsubSubscriptionId = pubsubSubscriptionId;
+    this.batchSize = batchSize;
+
+    maxInboundMessageSize = DEFAULT_MAX_INBOUND_MESSAGE_SIZE;
+
+    try {
+      /** For details of timeout and retry configs,
+       * see {@link com.google.cloud.pubsub.v1.stub.SubscriberStubSettings#initDefaults()},
+       * and the static code block in SubscriberStubSettings */
+      subscriberStubSettings =
+              SubscriberStubSettings.newBuilder()
+                      .setTransportChannelProvider(
+                              SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
+                                      .setMaxInboundMessageSize(maxInboundMessageSize)
+                                      .build())
+                      .build();
+    } catch (IOException e) {
+      throw new HoodieException("Error creating subscriber stub settings", e);
+    }
+  }
+
+  public List<ReceivedMessage> fetchMessages() {

Review Comment:
   Does PubSub also have delete events?
   In general, does the two source handle deleting of objects from GCS? There were some caveats with S3. See https://github.com/apache/hudi/pull/3433#pullrequestreview-728946157 for more details.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FilePathsFetcher.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.SELECT_RELATIVE_PATH_PREFIX;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_PREFIX;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_SUBSTR;
+
+/**
+ * Extracts a list of fully qualified GCS filepaths from a given Spark Dataset as input.
+ * Optionally:
+ * i) Match the filename and path against provided input filter strings
+ * ii) Check if each file exists on GCS, in which case it assumes SparkContext is already
+ * configured with GCS options through GcsEventsHoodieIncrSource.addGcsAccessConfs().
+ */
+public class FilePathsFetcher implements Serializable {
+
+  /**
+   * The default file format to assume if {@link GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION} is not given.
+   */
+  private final String fileFormat;
+  private final TypedProperties props;
+
+  private static final String GCS_PREFIX = "gs://";
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LogManager.getLogger(FilePathsFetcher.class);
+
+  /**
+   * @param fileFormat The default file format to assume if {@link GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION}
+   *                   is not given.
+   */
+  public FilePathsFetcher(TypedProperties props, String fileFormat) {
+    this.props = props;
+    this.fileFormat = fileFormat;
+  }
+
+  /**
+   * @param sourceForFilenames a Dataset that contains metadata about files on GCS. Assumed to be a persisted form
+   *                           of a Cloud Storage Pubsub Notification event.
+   * @param checkIfExists      Check if each file exists, before returning its full path
+   * @return A list of fully qualifieed GCS file paths.

Review Comment:
   ```suggestion
      * @return A list of fully qualified GCS file paths.
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FilePathsFetcher.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.SELECT_RELATIVE_PATH_PREFIX;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_PREFIX;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_SUBSTR;
+
+/**
+ * Extracts a list of fully qualified GCS filepaths from a given Spark Dataset as input.
+ * Optionally:
+ * i) Match the filename and path against provided input filter strings
+ * ii) Check if each file exists on GCS, in which case it assumes SparkContext is already
+ * configured with GCS options through GcsEventsHoodieIncrSource.addGcsAccessConfs().
+ */
+public class FilePathsFetcher implements Serializable {
+
+  /**
+   * The default file format to assume if {@link GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION} is not given.
+   */
+  private final String fileFormat;
+  private final TypedProperties props;
+
+  private static final String GCS_PREFIX = "gs://";
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LogManager.getLogger(FilePathsFetcher.class);
+
+  /**
+   * @param fileFormat The default file format to assume if {@link GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION}
+   *                   is not given.
+   */
+  public FilePathsFetcher(TypedProperties props, String fileFormat) {
+    this.props = props;
+    this.fileFormat = fileFormat;
+  }
+
+  /**
+   * @param sourceForFilenames a Dataset that contains metadata about files on GCS. Assumed to be a persisted form
+   *                           of a Cloud Storage Pubsub Notification event.
+   * @param checkIfExists      Check if each file exists, before returning its full path
+   * @return A list of fully qualifieed GCS file paths.
+   */
+  public List<String> getGcsFilePaths(JavaSparkContext jsc, Dataset<Row> sourceForFilenames, boolean checkIfExists) {
+    String filter = createFilter();
+    LOG.info("Adding filter string to Dataset: " + filter);
+
+    SerializableConfiguration serializableConfiguration = new SerializableConfiguration(
+            jsc.hadoopConfiguration());
+
+    return sourceForFilenames
+            .filter(filter)
+            .select("bucket", "name")
+            .distinct()
+            .rdd().toJavaRDD().mapPartitions(
+                    getCloudFilesPerPartition(serializableConfiguration, checkIfExists)
+            ).collect();
+  }
+
+  private FlatMapFunction<Iterator<Row>, String> getCloudFilesPerPartition(
+          SerializableConfiguration serializableConfiguration, boolean checkIfExists) {
+
+    return rows -> {
+      List<String> cloudFilesPerPartition = new ArrayList<>();
+      rows.forEachRemaining(row -> {
+        addFileToList(row, cloudFilesPerPartition, serializableConfiguration, checkIfExists);
+      });
+
+      return cloudFilesPerPartition.iterator();
+    };
+  }
+
+  private void addFileToList(Row row, List<String> cloudFilesPerPartition,
+                             SerializableConfiguration serializableConfiguration, boolean checkIfExists) {
+    final Configuration configuration = serializableConfiguration.newCopy();
+
+    String bucket = row.getString(0);
+    String filePath = GCS_PREFIX + bucket + "/" + row.getString(1);
+
+    try {
+      addCloudFile(GCS_PREFIX, bucket, filePath, cloudFilesPerPartition, configuration, checkIfExists);
+    } catch (Exception exception) {
+      LOG.warn(String.format("Failed to add cloud file %s", filePath), exception);
+      throw new HoodieException(String.format("Failed to add cloud file %s", filePath), exception);
+    }
+  }
+
+  private void addCloudFile(String gcsPrefix, String bucket, String filePath,

Review Comment:
   Can we move these methods to some util class. The logic is similar to S3 and there is notthing specific to S3/GCS. The util method can be reused across S3/GCS.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore).
+
+ absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;
+  private final boolean checkIfFileExists;
+  private int numInstantsPerFetch;
+
+  private MissingCheckpointStrategy missingCheckpointStrategy;
+  private final FilePathsFetcher filePathsFetcher;
+  private final FileDataFetcher fileDataFetcher;
+
+  private static final Logger LOG = LogManager.getLogger(GcsEventsHoodieIncrSource.class);
+
+  public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark,

Review Comment:
   Do we need two constructors right now?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore).
+
+ absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;
+  private final boolean checkIfFileExists;
+  private int numInstantsPerFetch;
+
+  private MissingCheckpointStrategy missingCheckpointStrategy;
+  private final FilePathsFetcher filePathsFetcher;
+  private final FileDataFetcher fileDataFetcher;
+
+  private static final Logger LOG = LogManager.getLogger(GcsEventsHoodieIncrSource.class);
+
+  public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark,
+                                   SchemaProvider schemaProvider) {
+
+    this(props, jsc, spark, schemaProvider,
+            new FilePathsFetcher(props, props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT)),
+            new FileDataFetcher(props, props.getString(DATAFILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT))
+    );
+  }
+
+  GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark,
+                            SchemaProvider schemaProvider, FilePathsFetcher filePathsFetcher, FileDataFetcher fileDataFetcher) {
+    super(props, jsc, spark, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH));
+    srcPath = props.getString(HOODIE_SRC_BASE_PATH);
+    fileFormat = props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT);
+    missingCheckpointStrategy = getMissingCheckpointStrategy(props);
+    numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, DEFAULT_NUM_INSTANTS_PER_FETCH);
+    checkIfFileExists = props.getBoolean(ENABLE_EXISTS_CHECK, DEFAULT_ENABLE_EXISTS_CHECK);
+
+    this.filePathsFetcher = filePathsFetcher;
+    this.fileDataFetcher = fileDataFetcher;
+
+    addGcsAccessConfs(jsc);
+
+    LOG.info("srcPath: " + srcPath);
+    LOG.info("fileFormat: " + fileFormat);
+    LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
+    LOG.info("numInstantsPerFetch: " + numInstantsPerFetch);
+    LOG.info("checkIfFileExists: " + checkIfFileExists);
+  }
+
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
+    QueryInfo queryInfo = getQueryInfo(lastCkptStr);
+
+    if (queryInfo.areStartAndEndInstantsEqual()) {
+      LOG.info("Already caught up. Begin Checkpoint was: " + queryInfo.getStartInstant());
+      return Pair.of(Option.empty(), queryInfo.getStartInstant());
+    }
+
+    Dataset<Row> sourceForFilenames = queryInfo.initializeSourceForFilenames(srcPath, sparkSession);
+
+    if (sourceForFilenames.isEmpty()) {
+      LOG.info("Source of file names is empty. Returning empty result and endInstant: "
+              + queryInfo.getEndInstant());
+      return Pair.of(Option.empty(), queryInfo.getEndInstant());
+    }
+
+    return extractData(queryInfo, sourceForFilenames);
+  }
+
+  private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, Dataset<Row> sourceForFilenames) {
+    List<String> filepaths = filePathsFetcher.getGcsFilePaths(sparkContext, sourceForFilenames, checkIfFileExists);
+
+    LOG.info("Extracted " + filepaths.size() + " distinct files."
+            + " Some samples " + filepaths.stream().limit(10).collect(Collectors.toList()));
+
+    Option<Dataset<Row>> fileDataDs = fileDataFetcher.fetchFileDataFromGcs(sparkSession, filepaths);
+    return Pair.of(fileDataDs, queryInfo.getEndInstant());
+  }
+
+  private QueryInfo getQueryInfo(Option<String> lastCkptStr) {
+    Option<String> beginInstant = getBeginInstant(lastCkptStr);
+
+    QueryInfo queryInfo = new QueryInfo(
+            calculateBeginAndEndInstants(
+                    sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy
+            )
+    );
+
+    queryInfo.logDetails();

Review Comment:
   Is it necessary? If it is then better to log in debug mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org