You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/05/07 01:02:35 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #2915: [HUDI-251] Adds JDBC source support for DeltaStreamer

nsivabalan commented on a change in pull request #2915:
URL: https://github.com/apache/hudi/pull/2915#discussion_r627846007



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.SqlQueryBuilder;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.jetbrains.annotations.NotNull;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Reads data from RDBMS data sources.
+ */
+
+public class JdbcSource extends RowSource {
+
+  private static final Logger LOG = LogManager.getLogger(JdbcSource.class);
+  private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2");
+  private static final String URI_JDBC_PREFIX = "jdbc:";
+
+  public JdbcSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+                    SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+  }
+
+  /**
+   * Validates all user properties and prepares the {@link DataFrameReader} to read from RDBMS.
+   *
+   * @param session    The {@link SparkSession}.
+   * @param properties The JDBC connection properties and data source options.
+   * @return The {@link DataFrameReader} to read from RDBMS
+   * @throws HoodieException
+   */
+  private static DataFrameReader validatePropsAndGetDataFrameReader(final SparkSession session,
+                                                                    final TypedProperties properties)
+      throws HoodieException {
+    DataFrameReader dataFrameReader;
+    FSDataInputStream passwordFileStream = null;
+    try {
+      dataFrameReader = session.read().format("jdbc");
+      dataFrameReader = dataFrameReader.option(Config.URL_PROP, properties.getString(Config.URL));
+      dataFrameReader = dataFrameReader.option(Config.USER_PROP, properties.getString(Config.USER));
+      dataFrameReader = dataFrameReader.option(Config.DRIVER_PROP, properties.getString(Config.DRIVER_CLASS));
+      dataFrameReader = dataFrameReader
+          .option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME));
+
+      if (properties.containsKey(Config.PASSWORD)) {
+        LOG.info("Reading JDBC password from properties file....");
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, properties.getString(Config.PASSWORD));
+      } else if (properties.containsKey(Config.PASSWORD_FILE)
+          && !StringUtils.isNullOrEmpty(properties.getString(Config.PASSWORD_FILE))) {
+        LOG.info(String.format("Reading JDBC password from password file %s", properties.getString(Config.PASSWORD_FILE)));
+        FileSystem fileSystem = FileSystem.get(session.sparkContext().hadoopConfiguration());
+        passwordFileStream = fileSystem.open(new Path(properties.getString(Config.PASSWORD_FILE)));
+        byte[] bytes = new byte[passwordFileStream.available()];
+        passwordFileStream.read(bytes);
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, new String(bytes));
+      } else {
+        throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS "
+            + "datasource", Config.PASSWORD_FILE, Config.PASSWORD));
+      }
+
+      addExtraJdbcOptions(properties, dataFrameReader);
+
+      if (properties.getBoolean(Config.IS_INCREMENTAL)) {
+        DataSourceUtils.checkRequiredProperties(properties, Collections.singletonList(Config.INCREMENTAL_COLUMN));
+      }
+      return dataFrameReader;
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    } finally {
+      IOUtils.closeStream(passwordFileStream);
+    }
+  }
+
+  /**
+   * Accepts spark JDBC options from the user in terms of EXTRA_OPTIONS adds them to {@link DataFrameReader} Example: In
+   * a normal spark code you would do something like: session.read.format('jdbc') .option(fetchSize,1000)
+   * .option(timestampFormat,"yyyy-mm-dd hh:mm:ss")
+   * <p>
+   * The way to pass these properties to HUDI is through the config file. Any property starting with
+   * hoodie.datasource.jdbc.extra.options. will be added.
+   * <p>
+   * Example: hoodie.datasource.jdbc.extra.options.fetchSize=100
+   * hoodie.datasource.jdbc.extra.options.upperBound=1

Review comment:
       can you help me understand how these upper bounds/lowerbounds will be used. We have sourceLimit and auto checkpointing mechanism right. so not sure how this will pan out along with that. 
   Not asking to get it fixed in this PR. just to clarify things.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.SqlQueryBuilder;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.jetbrains.annotations.NotNull;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Reads data from RDBMS data sources.
+ */
+
+public class JdbcSource extends RowSource {
+
+  private static final Logger LOG = LogManager.getLogger(JdbcSource.class);
+  private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2");
+  private static final String URI_JDBC_PREFIX = "jdbc:";
+
+  public JdbcSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+                    SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+  }
+
+  /**
+   * Validates all user properties and prepares the {@link DataFrameReader} to read from RDBMS.
+   *
+   * @param session    The {@link SparkSession}.
+   * @param properties The JDBC connection properties and data source options.
+   * @return The {@link DataFrameReader} to read from RDBMS
+   * @throws HoodieException
+   */
+  private static DataFrameReader validatePropsAndGetDataFrameReader(final SparkSession session,
+                                                                    final TypedProperties properties)
+      throws HoodieException {
+    DataFrameReader dataFrameReader;
+    FSDataInputStream passwordFileStream = null;
+    try {
+      dataFrameReader = session.read().format("jdbc");
+      dataFrameReader = dataFrameReader.option(Config.URL_PROP, properties.getString(Config.URL));
+      dataFrameReader = dataFrameReader.option(Config.USER_PROP, properties.getString(Config.USER));
+      dataFrameReader = dataFrameReader.option(Config.DRIVER_PROP, properties.getString(Config.DRIVER_CLASS));
+      dataFrameReader = dataFrameReader
+          .option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME));
+
+      if (properties.containsKey(Config.PASSWORD)) {
+        LOG.info("Reading JDBC password from properties file....");
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, properties.getString(Config.PASSWORD));
+      } else if (properties.containsKey(Config.PASSWORD_FILE)
+          && !StringUtils.isNullOrEmpty(properties.getString(Config.PASSWORD_FILE))) {
+        LOG.info(String.format("Reading JDBC password from password file %s", properties.getString(Config.PASSWORD_FILE)));
+        FileSystem fileSystem = FileSystem.get(session.sparkContext().hadoopConfiguration());
+        passwordFileStream = fileSystem.open(new Path(properties.getString(Config.PASSWORD_FILE)));
+        byte[] bytes = new byte[passwordFileStream.available()];
+        passwordFileStream.read(bytes);
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, new String(bytes));
+      } else {
+        throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS "
+            + "datasource", Config.PASSWORD_FILE, Config.PASSWORD));
+      }
+
+      addExtraJdbcOptions(properties, dataFrameReader);
+
+      if (properties.getBoolean(Config.IS_INCREMENTAL)) {
+        DataSourceUtils.checkRequiredProperties(properties, Collections.singletonList(Config.INCREMENTAL_COLUMN));
+      }
+      return dataFrameReader;
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    } finally {
+      IOUtils.closeStream(passwordFileStream);
+    }
+  }
+
+  /**
+   * Accepts spark JDBC options from the user in terms of EXTRA_OPTIONS adds them to {@link DataFrameReader} Example: In
+   * a normal spark code you would do something like: session.read.format('jdbc') .option(fetchSize,1000)
+   * .option(timestampFormat,"yyyy-mm-dd hh:mm:ss")
+   * <p>
+   * The way to pass these properties to HUDI is through the config file. Any property starting with
+   * hoodie.datasource.jdbc.extra.options. will be added.
+   * <p>
+   * Example: hoodie.datasource.jdbc.extra.options.fetchSize=100
+   * hoodie.datasource.jdbc.extra.options.upperBound=1
+   * hoodie.datasource.jdbc.extra.options.lowerBound=100
+   *
+   * @param properties      The JDBC connection properties and data source options.
+   * @param dataFrameReader The {@link DataFrameReader} to which data source options will be added.
+   */
+  private static void addExtraJdbcOptions(TypedProperties properties, DataFrameReader dataFrameReader) {
+    Set<Object> objects = properties.keySet();
+    for (Object property : objects) {
+      String prop = (String) property;
+      if (prop.startsWith(Config.EXTRA_OPTIONS)) {
+        String key = String.join("", prop.split(Config.EXTRA_OPTIONS));
+        String value = properties.getString(prop);
+        if (!StringUtils.isNullOrEmpty(value)) {
+          LOG.info(String.format("Adding %s -> %s to jdbc options", key, value));
+          dataFrameReader.option(key, value);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
+    try {
+      DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.URL, Config.DRIVER_CLASS, Config.USER, Config.RDBMS_TABLE_NAME, Config.IS_INCREMENTAL));
+      return fetch(lastCkptStr, sourceLimit);
+    } catch (Exception e) {
+      LOG.error("Exception while running JDBCSource ", e);
+      return Pair.of(Option.empty(), null);
+    }
+  }
+
+  /**
+   * Decide to do a full RDBMS table scan or an incremental scan based on the lastCkptStr. If previous checkpoint
+   * value exists then we do an incremental scan with a PPD query or else we do a full scan. In certain cases where the
+   * incremental query fails, we fallback to a full scan.
+   *
+   * @param lastCkptStr Last checkpoint.
+   * @return The pair of {@link Dataset} and current checkpoint.
+   */
+  @NotNull
+  private Pair<Option<Dataset<Row>>, String> fetch(Option<String> lastCkptStr, long sourceLimit) {
+    Dataset<Row> dataset;
+    if (lastCkptStr.isPresent() && !StringUtils.isNullOrEmpty(lastCkptStr.get())) {

Review comment:
       if a user wants to start an incremental fetch for the first time, is he/she expected to set checkpoint string? if not, this falls back to full scan right. Did you think if we can automatically choose default vals depending on column types. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.SqlQueryBuilder;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.jetbrains.annotations.NotNull;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Reads data from RDBMS data sources.
+ */
+
+public class JdbcSource extends RowSource {
+
+  private static final Logger LOG = LogManager.getLogger(JdbcSource.class);
+  private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2");
+  private static final String URI_JDBC_PREFIX = "jdbc:";
+
+  public JdbcSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+                    SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+  }
+
+  /**
+   * Validates all user properties and prepares the {@link DataFrameReader} to read from RDBMS.
+   *
+   * @param session    The {@link SparkSession}.
+   * @param properties The JDBC connection properties and data source options.
+   * @return The {@link DataFrameReader} to read from RDBMS
+   * @throws HoodieException
+   */
+  private static DataFrameReader validatePropsAndGetDataFrameReader(final SparkSession session,
+                                                                    final TypedProperties properties)
+      throws HoodieException {
+    DataFrameReader dataFrameReader;
+    FSDataInputStream passwordFileStream = null;
+    try {
+      dataFrameReader = session.read().format("jdbc");
+      dataFrameReader = dataFrameReader.option(Config.URL_PROP, properties.getString(Config.URL));
+      dataFrameReader = dataFrameReader.option(Config.USER_PROP, properties.getString(Config.USER));
+      dataFrameReader = dataFrameReader.option(Config.DRIVER_PROP, properties.getString(Config.DRIVER_CLASS));
+      dataFrameReader = dataFrameReader
+          .option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME));
+
+      if (properties.containsKey(Config.PASSWORD)) {
+        LOG.info("Reading JDBC password from properties file....");
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, properties.getString(Config.PASSWORD));
+      } else if (properties.containsKey(Config.PASSWORD_FILE)
+          && !StringUtils.isNullOrEmpty(properties.getString(Config.PASSWORD_FILE))) {
+        LOG.info(String.format("Reading JDBC password from password file %s", properties.getString(Config.PASSWORD_FILE)));
+        FileSystem fileSystem = FileSystem.get(session.sparkContext().hadoopConfiguration());
+        passwordFileStream = fileSystem.open(new Path(properties.getString(Config.PASSWORD_FILE)));
+        byte[] bytes = new byte[passwordFileStream.available()];
+        passwordFileStream.read(bytes);
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, new String(bytes));
+      } else {
+        throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS "
+            + "datasource", Config.PASSWORD_FILE, Config.PASSWORD));
+      }
+
+      addExtraJdbcOptions(properties, dataFrameReader);
+
+      if (properties.getBoolean(Config.IS_INCREMENTAL)) {
+        DataSourceUtils.checkRequiredProperties(properties, Collections.singletonList(Config.INCREMENTAL_COLUMN));

Review comment:
       not sure if this will be an overkill. Do we need to validate the incremental column for its datatype. for eg, what incase a byte[] column was chosen as incremental column. Also, another validation is to check if the column exists in the table. 

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Tests {@link JdbcSource}.
+ */
+public class TestJdbcSource extends UtilitiesTestBase {
+
+  private static final Logger LOG = LogManager.getLogger(TestJdbcSource.class);
+  private static final TypedProperties PROPS = new TypedProperties();
+
+  private static Connection connection;
+  private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator();
+
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+    PROPS.setProperty("hoodie.datasource.jdbc.url", "jdbc:h2:mem:test_mem");
+    PROPS.setProperty("hoodie.datasource.jdbc.driver.class", "org.h2.Driver");
+    PROPS.setProperty("hoodie.datasource.jdbc.user", "test");
+    PROPS.setProperty("hoodie.datasource.jdbc.password", "jdbc");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.name", "triprec");
+    connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc");
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+    close(connection);
+  }
+
+  @Test
+  public void testSingleCommit() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      int numRecords = 100;
+      String commitTime = "000";
+
+      // Insert 100 records with commit time
+      clearAndInsert(commitTime, numRecords);
+
+      // Validate if we have specified records in db
+      assertEquals(numRecords, count());
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), numRecords);
+      assertEquals(numRecords, rowDataset.count());
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testInsertAndUpdate() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      final String commitTime = "000";
+      final int numRecords = 100;
+
+      // Add 100 records. Update half of them with commit time "007".
+      update("007",
+          clearAndInsert(commitTime, numRecords)
+              .stream()
+              .limit(50)
+              .collect(Collectors.toList())
+      );
+      // Check if database has 100 records
+      assertEquals(numRecords, count());
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 100);
+      assertEquals(100, rowDataset.count());
+
+      Dataset<Row> firstCommit = rowDataset.where("commit_time=000");
+      assertEquals(50, firstCommit.count());
+
+      Dataset<Row> secondCommit = rowDataset.where("commit_time=007");
+      assertEquals(50, secondCommit.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testTwoCommits() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+
+      // Add 10 records with commit time 001
+      insert("001", 5);
+      rowDataset = runSource(Option.empty(), 15);
+      assertEquals(15, rowDataset.count());
+      assertEquals(5, rowDataset.where("commit_time=001").count());
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+
+      // Start second commit and check if all records are pulled
+      rowDataset = runSource(Option.empty(), 15);
+      assertEquals(15, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchWithCommitTime() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+
+      // Get max of incremental column
+      Column incrementalColumn = rowDataset
+          .col(PROPS.getString("hoodie.datasource.jdbc.table.incremental.column.name"));
+      final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
+          .getString(0);
+      LOG.info(String.format("Incremental max value: %s", max));

Review comment:
       was this added while testing locally. can we remove if not required.

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Tests {@link JdbcSource}.
+ */
+public class TestJdbcSource extends UtilitiesTestBase {
+
+  private static final Logger LOG = LogManager.getLogger(TestJdbcSource.class);
+  private static final TypedProperties PROPS = new TypedProperties();
+
+  private static Connection connection;
+  private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator();
+
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+    PROPS.setProperty("hoodie.datasource.jdbc.url", "jdbc:h2:mem:test_mem");
+    PROPS.setProperty("hoodie.datasource.jdbc.driver.class", "org.h2.Driver");
+    PROPS.setProperty("hoodie.datasource.jdbc.user", "test");
+    PROPS.setProperty("hoodie.datasource.jdbc.password", "jdbc");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.name", "triprec");
+    connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc");
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+    close(connection);
+  }
+
+  @Test
+  public void testSingleCommit() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      int numRecords = 100;
+      String commitTime = "000";
+
+      // Insert 100 records with commit time
+      clearAndInsert(commitTime, numRecords);
+
+      // Validate if we have specified records in db
+      assertEquals(numRecords, count());
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), numRecords);
+      assertEquals(numRecords, rowDataset.count());
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testInsertAndUpdate() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      final String commitTime = "000";
+      final int numRecords = 100;
+
+      // Add 100 records. Update half of them with commit time "007".
+      update("007",
+          clearAndInsert(commitTime, numRecords)
+              .stream()
+              .limit(50)
+              .collect(Collectors.toList())
+      );
+      // Check if database has 100 records
+      assertEquals(numRecords, count());
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 100);
+      assertEquals(100, rowDataset.count());
+
+      Dataset<Row> firstCommit = rowDataset.where("commit_time=000");
+      assertEquals(50, firstCommit.count());
+
+      Dataset<Row> secondCommit = rowDataset.where("commit_time=007");
+      assertEquals(50, secondCommit.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testTwoCommits() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+
+      // Add 10 records with commit time 001
+      insert("001", 5);
+      rowDataset = runSource(Option.empty(), 15);
+      assertEquals(15, rowDataset.count());
+      assertEquals(5, rowDataset.where("commit_time=001").count());
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+
+      // Start second commit and check if all records are pulled
+      rowDataset = runSource(Option.empty(), 15);
+      assertEquals(15, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchWithCommitTime() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+
+      // Get max of incremental column
+      Column incrementalColumn = rowDataset
+          .col(PROPS.getString("hoodie.datasource.jdbc.table.incremental.column.name"));
+      final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
+          .getString(0);
+      LOG.info(String.format("Incremental max value: %s", max));
+
+      // Add 10 records with commit time "001"
+      insert("001", 10);
+
+      // Start incremental scan
+      Dataset<Row> rowDataset1 = runSource(Option.of(max), 10);
+      assertEquals(10, rowDataset1.count());
+      assertEquals(10, rowDataset1.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchFallbackToFullFetchWhenError() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+
+      // Get max of incremental column
+      Column incrementalColumn = rowDataset
+          .col(PROPS.getString("hoodie.datasource.jdbc.table.incremental.column.name"));
+      final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
+          .getString(0);
+      LOG.info(String.format("Incremental max value: %s", max));
+
+      // Add 10 records with commit time "001"
+      insert("001", 10);
+
+      PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "dummy_col");
+
+      // Start incremental scan with negative sourceLimit.
+      // This will throw an exception as limit clause does not accept negative values.
+      Dataset<Row> rowDataset1 = runSource(Option.of(max), -1);
+      assertEquals(20, rowDataset1.count());
+      assertEquals(10, rowDataset1.where("commit_time=000").count());
+      assertEquals(10, rowDataset1.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testFullFetchWithCommitTime() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "false");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+      // Add 10 records with commit time "001"
+      insert("001", 10);
+
+      // Start full fetch
+      rowDataset = runSource(Option.empty(), 20);
+      assertEquals(20, rowDataset.count());
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+      assertEquals(10, rowDataset.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testFullFetchWithCheckpoint() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "false");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+
+      // Get max of incremental column
+      Column incrementalColumn = rowDataset

Review comment:
       you could move this to a private method (fetching incremental column max value)

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Tests {@link JdbcSource}.
+ */
+public class TestJdbcSource extends UtilitiesTestBase {
+
+  private static final Logger LOG = LogManager.getLogger(TestJdbcSource.class);
+  private static final TypedProperties PROPS = new TypedProperties();
+
+  private static Connection connection;
+  private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator();
+
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+    PROPS.setProperty("hoodie.datasource.jdbc.url", "jdbc:h2:mem:test_mem");
+    PROPS.setProperty("hoodie.datasource.jdbc.driver.class", "org.h2.Driver");
+    PROPS.setProperty("hoodie.datasource.jdbc.user", "test");
+    PROPS.setProperty("hoodie.datasource.jdbc.password", "jdbc");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.name", "triprec");
+    connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc");
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+    close(connection);
+  }
+
+  @Test
+  public void testSingleCommit() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      int numRecords = 100;
+      String commitTime = "000";
+
+      // Insert 100 records with commit time
+      clearAndInsert(commitTime, numRecords);
+
+      // Validate if we have specified records in db
+      assertEquals(numRecords, count());
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), numRecords);
+      assertEquals(numRecords, rowDataset.count());
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testInsertAndUpdate() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      final String commitTime = "000";
+      final int numRecords = 100;
+
+      // Add 100 records. Update half of them with commit time "007".
+      update("007",
+          clearAndInsert(commitTime, numRecords)
+              .stream()
+              .limit(50)
+              .collect(Collectors.toList())
+      );
+      // Check if database has 100 records
+      assertEquals(numRecords, count());
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 100);
+      assertEquals(100, rowDataset.count());
+
+      Dataset<Row> firstCommit = rowDataset.where("commit_time=000");
+      assertEquals(50, firstCommit.count());
+
+      Dataset<Row> secondCommit = rowDataset.where("commit_time=007");
+      assertEquals(50, secondCommit.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testTwoCommits() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+
+      // Add 10 records with commit time 001
+      insert("001", 5);
+      rowDataset = runSource(Option.empty(), 15);
+      assertEquals(15, rowDataset.count());
+      assertEquals(5, rowDataset.where("commit_time=001").count());
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+
+      // Start second commit and check if all records are pulled
+      rowDataset = runSource(Option.empty(), 15);
+      assertEquals(15, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchWithCommitTime() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+
+      // Get max of incremental column
+      Column incrementalColumn = rowDataset
+          .col(PROPS.getString("hoodie.datasource.jdbc.table.incremental.column.name"));
+      final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
+          .getString(0);
+      LOG.info(String.format("Incremental max value: %s", max));
+
+      // Add 10 records with commit time "001"
+      insert("001", 10);
+
+      // Start incremental scan
+      Dataset<Row> rowDataset1 = runSource(Option.of(max), 10);
+      assertEquals(10, rowDataset1.count());
+      assertEquals(10, rowDataset1.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchFallbackToFullFetchWhenError() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+
+      // Get max of incremental column
+      Column incrementalColumn = rowDataset
+          .col(PROPS.getString("hoodie.datasource.jdbc.table.incremental.column.name"));
+      final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
+          .getString(0);
+      LOG.info(String.format("Incremental max value: %s", max));
+
+      // Add 10 records with commit time "001"
+      insert("001", 10);
+
+      PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "dummy_col");
+
+      // Start incremental scan with negative sourceLimit.
+      // This will throw an exception as limit clause does not accept negative values.
+      Dataset<Row> rowDataset1 = runSource(Option.of(max), -1);
+      assertEquals(20, rowDataset1.count());
+      assertEquals(10, rowDataset1.where("commit_time=000").count());
+      assertEquals(10, rowDataset1.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testFullFetchWithCommitTime() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "false");

Review comment:
       you could make a private method and reuse across diff tests. for eg, you could
   make this test and testIncrementalFetchWithCommitTime could reuse some common code. Similarly try to see if you can reuse code wherever possible even in tests. In fact, tests are critical as it would make adding more tests easier instead of monolith tests. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.SqlQueryBuilder;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.jetbrains.annotations.NotNull;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Reads data from RDBMS data sources.
+ */
+
+public class JdbcSource extends RowSource {
+
+  private static final Logger LOG = LogManager.getLogger(JdbcSource.class);
+  private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2");
+  private static final String URI_JDBC_PREFIX = "jdbc:";
+
+  public JdbcSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+                    SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+  }
+
+  /**
+   * Validates all user properties and prepares the {@link DataFrameReader} to read from RDBMS.
+   *
+   * @param session    The {@link SparkSession}.
+   * @param properties The JDBC connection properties and data source options.
+   * @return The {@link DataFrameReader} to read from RDBMS
+   * @throws HoodieException
+   */
+  private static DataFrameReader validatePropsAndGetDataFrameReader(final SparkSession session,
+                                                                    final TypedProperties properties)
+      throws HoodieException {
+    DataFrameReader dataFrameReader;
+    FSDataInputStream passwordFileStream = null;
+    try {
+      dataFrameReader = session.read().format("jdbc");
+      dataFrameReader = dataFrameReader.option(Config.URL_PROP, properties.getString(Config.URL));
+      dataFrameReader = dataFrameReader.option(Config.USER_PROP, properties.getString(Config.USER));
+      dataFrameReader = dataFrameReader.option(Config.DRIVER_PROP, properties.getString(Config.DRIVER_CLASS));
+      dataFrameReader = dataFrameReader
+          .option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME));
+
+      if (properties.containsKey(Config.PASSWORD)) {
+        LOG.info("Reading JDBC password from properties file....");
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, properties.getString(Config.PASSWORD));
+      } else if (properties.containsKey(Config.PASSWORD_FILE)
+          && !StringUtils.isNullOrEmpty(properties.getString(Config.PASSWORD_FILE))) {
+        LOG.info(String.format("Reading JDBC password from password file %s", properties.getString(Config.PASSWORD_FILE)));
+        FileSystem fileSystem = FileSystem.get(session.sparkContext().hadoopConfiguration());
+        passwordFileStream = fileSystem.open(new Path(properties.getString(Config.PASSWORD_FILE)));
+        byte[] bytes = new byte[passwordFileStream.available()];
+        passwordFileStream.read(bytes);
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, new String(bytes));
+      } else {
+        throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS "
+            + "datasource", Config.PASSWORD_FILE, Config.PASSWORD));
+      }
+
+      addExtraJdbcOptions(properties, dataFrameReader);
+
+      if (properties.getBoolean(Config.IS_INCREMENTAL)) {
+        DataSourceUtils.checkRequiredProperties(properties, Collections.singletonList(Config.INCREMENTAL_COLUMN));
+      }
+      return dataFrameReader;
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    } finally {
+      IOUtils.closeStream(passwordFileStream);
+    }
+  }
+
+  /**
+   * Accepts spark JDBC options from the user in terms of EXTRA_OPTIONS adds them to {@link DataFrameReader} Example: In
+   * a normal spark code you would do something like: session.read.format('jdbc') .option(fetchSize,1000)
+   * .option(timestampFormat,"yyyy-mm-dd hh:mm:ss")
+   * <p>
+   * The way to pass these properties to HUDI is through the config file. Any property starting with
+   * hoodie.datasource.jdbc.extra.options. will be added.
+   * <p>
+   * Example: hoodie.datasource.jdbc.extra.options.fetchSize=100
+   * hoodie.datasource.jdbc.extra.options.upperBound=1
+   * hoodie.datasource.jdbc.extra.options.lowerBound=100
+   *
+   * @param properties      The JDBC connection properties and data source options.
+   * @param dataFrameReader The {@link DataFrameReader} to which data source options will be added.
+   */
+  private static void addExtraJdbcOptions(TypedProperties properties, DataFrameReader dataFrameReader) {
+    Set<Object> objects = properties.keySet();
+    for (Object property : objects) {
+      String prop = (String) property;
+      if (prop.startsWith(Config.EXTRA_OPTIONS)) {
+        String key = String.join("", prop.split(Config.EXTRA_OPTIONS));
+        String value = properties.getString(prop);
+        if (!StringUtils.isNullOrEmpty(value)) {
+          LOG.info(String.format("Adding %s -> %s to jdbc options", key, value));
+          dataFrameReader.option(key, value);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
+    try {
+      DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.URL, Config.DRIVER_CLASS, Config.USER, Config.RDBMS_TABLE_NAME, Config.IS_INCREMENTAL));
+      return fetch(lastCkptStr, sourceLimit);
+    } catch (Exception e) {
+      LOG.error("Exception while running JDBCSource ", e);
+      return Pair.of(Option.empty(), null);
+    }
+  }
+
+  /**
+   * Decide to do a full RDBMS table scan or an incremental scan based on the lastCkptStr. If previous checkpoint
+   * value exists then we do an incremental scan with a PPD query or else we do a full scan. In certain cases where the
+   * incremental query fails, we fallback to a full scan.
+   *
+   * @param lastCkptStr Last checkpoint.
+   * @return The pair of {@link Dataset} and current checkpoint.
+   */
+  @NotNull
+  private Pair<Option<Dataset<Row>>, String> fetch(Option<String> lastCkptStr, long sourceLimit) {
+    Dataset<Row> dataset;
+    if (lastCkptStr.isPresent() && !StringUtils.isNullOrEmpty(lastCkptStr.get())) {
+      dataset = incrementalFetch(lastCkptStr, sourceLimit);
+    } else {
+      LOG.info("No checkpoint references found. Doing a full rdbms table fetch");
+      dataset = fullFetch();
+    }
+
+    if (props.containsKey(Config.STORAGE_LEVEL) && !StringUtils.isNullOrEmpty(props.getString(Config.STORAGE_LEVEL))) {
+      dataset.persist(StorageLevel.fromString(props.getString(Config.STORAGE_LEVEL)));
+    } else {
+      dataset.persist(StorageLevel.MEMORY_AND_DISK_SER());
+    }
+
+    boolean isIncremental = props.getBoolean(Config.IS_INCREMENTAL);
+    Pair<Option<Dataset<Row>>, String> pair = Pair.of(Option.of(dataset), checkpoint(dataset, isIncremental));
+    dataset.unpersist();
+
+    return pair;
+  }
+
+  /**
+   * Does an incremental scan with PPQ query prepared on the bases of previous checkpoint.
+   *
+   * @param lastCheckpoint Last checkpoint
+   * @return The {@link Dataset} after incremental fetch from RDBMS.
+   */
+  @NotNull
+  private Dataset<Row> incrementalFetch(Option<String> lastCheckpoint, long sourceLimit) {
+    try {
+      final String ppdQuery = "(%s) rdbms_table";
+      final SqlQueryBuilder queryBuilder = SqlQueryBuilder.select("*")
+          .from(props.getString(Config.RDBMS_TABLE_NAME))
+          .where(String.format(" %s > '%s'", props.getString(Config.INCREMENTAL_COLUMN), lastCheckpoint.get()));
+
+      if (sourceLimit > 0) {
+        URI jdbcURI = URI.create(props.getString(Config.URL).substring(URI_JDBC_PREFIX.length()));
+        if (DB_LIMIT_CLAUSE.contains(jdbcURI.getScheme())) {
+          queryBuilder.orderBy(props.getString(Config.INCREMENTAL_COLUMN)).limit(sourceLimit);
+        }
+      }
+
+      String query = String.format(ppdQuery, queryBuilder.toString());
+      LOG.error("PPD QUERY: " + query);
+      LOG.info(String.format("Referenced last checkpoint and prepared new predicate pushdown query for jdbc pull %s", query));
+
+      return validatePropsAndGetDataFrameReader(sparkSession, props).option(Config.RDBMS_TABLE_PROP, query).load();
+    } catch (Exception e) {
+      LOG.error("Error while performing an incremental fetch. Not all database support the PPD query we generate to do an incremental scan", e);
+      LOG.warn("Falling back to full scan.");
+
+      return fullFetch();
+    }
+  }
+
+  /**
+   * Does a full scan on the RDBMS data source.
+   *
+   * @return The {@link Dataset} after running full scan.
+   */
+  private Dataset<Row> fullFetch() {
+    return validatePropsAndGetDataFrameReader(sparkSession, props).load();
+  }
+
+  private String checkpoint(Dataset<Row> rowDataset, boolean isIncremental) {
+    try {
+      if (isIncremental) {
+        Column incrementalColumn = rowDataset.col(props.getString(Config.INCREMENTAL_COLUMN));
+        final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first().getString(0);
+        LOG.info(String.format("Checkpointing column %s with value: %s ", incrementalColumn, max));
+        return max;
+      } else {
+        return null;
+      }
+    } catch (Exception e) {
+      return null;

Review comment:
       would prefer throw on any unexpected exceptions. (log and throw)

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.SqlQueryBuilder;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.jetbrains.annotations.NotNull;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Reads data from RDBMS data sources.
+ */
+
+public class JdbcSource extends RowSource {
+
+  private static final Logger LOG = LogManager.getLogger(JdbcSource.class);
+  private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2");
+  private static final String URI_JDBC_PREFIX = "jdbc:";
+
+  public JdbcSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+                    SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+  }
+
+  /**
+   * Validates all user properties and prepares the {@link DataFrameReader} to read from RDBMS.
+   *
+   * @param session    The {@link SparkSession}.
+   * @param properties The JDBC connection properties and data source options.
+   * @return The {@link DataFrameReader} to read from RDBMS
+   * @throws HoodieException
+   */
+  private static DataFrameReader validatePropsAndGetDataFrameReader(final SparkSession session,
+                                                                    final TypedProperties properties)
+      throws HoodieException {
+    DataFrameReader dataFrameReader;
+    FSDataInputStream passwordFileStream = null;
+    try {
+      dataFrameReader = session.read().format("jdbc");
+      dataFrameReader = dataFrameReader.option(Config.URL_PROP, properties.getString(Config.URL));
+      dataFrameReader = dataFrameReader.option(Config.USER_PROP, properties.getString(Config.USER));
+      dataFrameReader = dataFrameReader.option(Config.DRIVER_PROP, properties.getString(Config.DRIVER_CLASS));
+      dataFrameReader = dataFrameReader
+          .option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME));
+
+      if (properties.containsKey(Config.PASSWORD)) {
+        LOG.info("Reading JDBC password from properties file....");
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, properties.getString(Config.PASSWORD));
+      } else if (properties.containsKey(Config.PASSWORD_FILE)
+          && !StringUtils.isNullOrEmpty(properties.getString(Config.PASSWORD_FILE))) {
+        LOG.info(String.format("Reading JDBC password from password file %s", properties.getString(Config.PASSWORD_FILE)));
+        FileSystem fileSystem = FileSystem.get(session.sparkContext().hadoopConfiguration());
+        passwordFileStream = fileSystem.open(new Path(properties.getString(Config.PASSWORD_FILE)));
+        byte[] bytes = new byte[passwordFileStream.available()];
+        passwordFileStream.read(bytes);
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, new String(bytes));
+      } else {
+        throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS "
+            + "datasource", Config.PASSWORD_FILE, Config.PASSWORD));
+      }
+
+      addExtraJdbcOptions(properties, dataFrameReader);
+
+      if (properties.getBoolean(Config.IS_INCREMENTAL)) {
+        DataSourceUtils.checkRequiredProperties(properties, Collections.singletonList(Config.INCREMENTAL_COLUMN));
+      }
+      return dataFrameReader;
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    } finally {
+      IOUtils.closeStream(passwordFileStream);
+    }
+  }
+
+  /**
+   * Accepts spark JDBC options from the user in terms of EXTRA_OPTIONS adds them to {@link DataFrameReader} Example: In
+   * a normal spark code you would do something like: session.read.format('jdbc') .option(fetchSize,1000)
+   * .option(timestampFormat,"yyyy-mm-dd hh:mm:ss")
+   * <p>
+   * The way to pass these properties to HUDI is through the config file. Any property starting with
+   * hoodie.datasource.jdbc.extra.options. will be added.
+   * <p>
+   * Example: hoodie.datasource.jdbc.extra.options.fetchSize=100
+   * hoodie.datasource.jdbc.extra.options.upperBound=1
+   * hoodie.datasource.jdbc.extra.options.lowerBound=100
+   *
+   * @param properties      The JDBC connection properties and data source options.
+   * @param dataFrameReader The {@link DataFrameReader} to which data source options will be added.
+   */
+  private static void addExtraJdbcOptions(TypedProperties properties, DataFrameReader dataFrameReader) {
+    Set<Object> objects = properties.keySet();
+    for (Object property : objects) {
+      String prop = (String) property;
+      if (prop.startsWith(Config.EXTRA_OPTIONS)) {
+        String key = String.join("", prop.split(Config.EXTRA_OPTIONS));
+        String value = properties.getString(prop);
+        if (!StringUtils.isNullOrEmpty(value)) {
+          LOG.info(String.format("Adding %s -> %s to jdbc options", key, value));
+          dataFrameReader.option(key, value);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
+    try {
+      DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.URL, Config.DRIVER_CLASS, Config.USER, Config.RDBMS_TABLE_NAME, Config.IS_INCREMENTAL));
+      return fetch(lastCkptStr, sourceLimit);
+    } catch (Exception e) {
+      LOG.error("Exception while running JDBCSource ", e);
+      return Pair.of(Option.empty(), null);
+    }
+  }
+
+  /**
+   * Decide to do a full RDBMS table scan or an incremental scan based on the lastCkptStr. If previous checkpoint
+   * value exists then we do an incremental scan with a PPD query or else we do a full scan. In certain cases where the
+   * incremental query fails, we fallback to a full scan.
+   *
+   * @param lastCkptStr Last checkpoint.
+   * @return The pair of {@link Dataset} and current checkpoint.
+   */
+  @NotNull
+  private Pair<Option<Dataset<Row>>, String> fetch(Option<String> lastCkptStr, long sourceLimit) {
+    Dataset<Row> dataset;
+    if (lastCkptStr.isPresent() && !StringUtils.isNullOrEmpty(lastCkptStr.get())) {
+      dataset = incrementalFetch(lastCkptStr, sourceLimit);
+    } else {
+      LOG.info("No checkpoint references found. Doing a full rdbms table fetch");
+      dataset = fullFetch();
+    }
+
+    if (props.containsKey(Config.STORAGE_LEVEL) && !StringUtils.isNullOrEmpty(props.getString(Config.STORAGE_LEVEL))) {
+      dataset.persist(StorageLevel.fromString(props.getString(Config.STORAGE_LEVEL)));
+    } else {
+      dataset.persist(StorageLevel.MEMORY_AND_DISK_SER());
+    }
+
+    boolean isIncremental = props.getBoolean(Config.IS_INCREMENTAL);
+    Pair<Option<Dataset<Row>>, String> pair = Pair.of(Option.of(dataset), checkpoint(dataset, isIncremental));
+    dataset.unpersist();
+
+    return pair;
+  }
+
+  /**
+   * Does an incremental scan with PPQ query prepared on the bases of previous checkpoint.
+   *
+   * @param lastCheckpoint Last checkpoint
+   * @return The {@link Dataset} after incremental fetch from RDBMS.
+   */
+  @NotNull
+  private Dataset<Row> incrementalFetch(Option<String> lastCheckpoint, long sourceLimit) {
+    try {
+      final String ppdQuery = "(%s) rdbms_table";
+      final SqlQueryBuilder queryBuilder = SqlQueryBuilder.select("*")
+          .from(props.getString(Config.RDBMS_TABLE_NAME))
+          .where(String.format(" %s > '%s'", props.getString(Config.INCREMENTAL_COLUMN), lastCheckpoint.get()));
+
+      if (sourceLimit > 0) {
+        URI jdbcURI = URI.create(props.getString(Config.URL).substring(URI_JDBC_PREFIX.length()));
+        if (DB_LIMIT_CLAUSE.contains(jdbcURI.getScheme())) {
+          queryBuilder.orderBy(props.getString(Config.INCREMENTAL_COLUMN)).limit(sourceLimit);
+        }
+      }
+
+      String query = String.format(ppdQuery, queryBuilder.toString());
+      LOG.error("PPD QUERY: " + query);
+      LOG.info(String.format("Referenced last checkpoint and prepared new predicate pushdown query for jdbc pull %s", query));
+
+      return validatePropsAndGetDataFrameReader(sparkSession, props).option(Config.RDBMS_TABLE_PROP, query).load();
+    } catch (Exception e) {
+      LOG.error("Error while performing an incremental fetch. Not all database support the PPD query we generate to do an incremental scan", e);
+      LOG.warn("Falling back to full scan.");

Review comment:
       Open to discussion. wondering if we should just throw the exception and let user resubmit the job w/ full scan if required. These tables could be large. not sure if we can automatically falling back to full scan when incremental config is set. Or at least introduce a config "full_scan_on_exception_with_incremental" or something. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.SqlQueryBuilder;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.jetbrains.annotations.NotNull;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Reads data from RDBMS data sources.
+ */
+
+public class JdbcSource extends RowSource {
+
+  private static final Logger LOG = LogManager.getLogger(JdbcSource.class);
+  private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2");
+  private static final String URI_JDBC_PREFIX = "jdbc:";
+
+  public JdbcSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+                    SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+  }
+
+  /**
+   * Validates all user properties and prepares the {@link DataFrameReader} to read from RDBMS.
+   *
+   * @param session    The {@link SparkSession}.
+   * @param properties The JDBC connection properties and data source options.
+   * @return The {@link DataFrameReader} to read from RDBMS
+   * @throws HoodieException
+   */
+  private static DataFrameReader validatePropsAndGetDataFrameReader(final SparkSession session,
+                                                                    final TypedProperties properties)
+      throws HoodieException {
+    DataFrameReader dataFrameReader;
+    FSDataInputStream passwordFileStream = null;
+    try {
+      dataFrameReader = session.read().format("jdbc");
+      dataFrameReader = dataFrameReader.option(Config.URL_PROP, properties.getString(Config.URL));
+      dataFrameReader = dataFrameReader.option(Config.USER_PROP, properties.getString(Config.USER));
+      dataFrameReader = dataFrameReader.option(Config.DRIVER_PROP, properties.getString(Config.DRIVER_CLASS));
+      dataFrameReader = dataFrameReader
+          .option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME));
+
+      if (properties.containsKey(Config.PASSWORD)) {
+        LOG.info("Reading JDBC password from properties file....");
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, properties.getString(Config.PASSWORD));
+      } else if (properties.containsKey(Config.PASSWORD_FILE)
+          && !StringUtils.isNullOrEmpty(properties.getString(Config.PASSWORD_FILE))) {
+        LOG.info(String.format("Reading JDBC password from password file %s", properties.getString(Config.PASSWORD_FILE)));
+        FileSystem fileSystem = FileSystem.get(session.sparkContext().hadoopConfiguration());
+        passwordFileStream = fileSystem.open(new Path(properties.getString(Config.PASSWORD_FILE)));
+        byte[] bytes = new byte[passwordFileStream.available()];
+        passwordFileStream.read(bytes);
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, new String(bytes));
+      } else {
+        throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS "
+            + "datasource", Config.PASSWORD_FILE, Config.PASSWORD));
+      }
+
+      addExtraJdbcOptions(properties, dataFrameReader);
+
+      if (properties.getBoolean(Config.IS_INCREMENTAL)) {
+        DataSourceUtils.checkRequiredProperties(properties, Collections.singletonList(Config.INCREMENTAL_COLUMN));
+      }
+      return dataFrameReader;
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    } finally {
+      IOUtils.closeStream(passwordFileStream);
+    }
+  }
+
+  /**
+   * Accepts spark JDBC options from the user in terms of EXTRA_OPTIONS adds them to {@link DataFrameReader} Example: In
+   * a normal spark code you would do something like: session.read.format('jdbc') .option(fetchSize,1000)
+   * .option(timestampFormat,"yyyy-mm-dd hh:mm:ss")
+   * <p>
+   * The way to pass these properties to HUDI is through the config file. Any property starting with
+   * hoodie.datasource.jdbc.extra.options. will be added.
+   * <p>
+   * Example: hoodie.datasource.jdbc.extra.options.fetchSize=100
+   * hoodie.datasource.jdbc.extra.options.upperBound=1
+   * hoodie.datasource.jdbc.extra.options.lowerBound=100
+   *
+   * @param properties      The JDBC connection properties and data source options.
+   * @param dataFrameReader The {@link DataFrameReader} to which data source options will be added.
+   */
+  private static void addExtraJdbcOptions(TypedProperties properties, DataFrameReader dataFrameReader) {
+    Set<Object> objects = properties.keySet();
+    for (Object property : objects) {
+      String prop = (String) property;
+      if (prop.startsWith(Config.EXTRA_OPTIONS)) {
+        String key = String.join("", prop.split(Config.EXTRA_OPTIONS));
+        String value = properties.getString(prop);
+        if (!StringUtils.isNullOrEmpty(value)) {
+          LOG.info(String.format("Adding %s -> %s to jdbc options", key, value));
+          dataFrameReader.option(key, value);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
+    try {
+      DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.URL, Config.DRIVER_CLASS, Config.USER, Config.RDBMS_TABLE_NAME, Config.IS_INCREMENTAL));
+      return fetch(lastCkptStr, sourceLimit);
+    } catch (Exception e) {
+      LOG.error("Exception while running JDBCSource ", e);
+      return Pair.of(Option.empty(), null);
+    }
+  }
+
+  /**
+   * Decide to do a full RDBMS table scan or an incremental scan based on the lastCkptStr. If previous checkpoint
+   * value exists then we do an incremental scan with a PPD query or else we do a full scan. In certain cases where the
+   * incremental query fails, we fallback to a full scan.
+   *
+   * @param lastCkptStr Last checkpoint.
+   * @return The pair of {@link Dataset} and current checkpoint.
+   */
+  @NotNull
+  private Pair<Option<Dataset<Row>>, String> fetch(Option<String> lastCkptStr, long sourceLimit) {
+    Dataset<Row> dataset;
+    if (lastCkptStr.isPresent() && !StringUtils.isNullOrEmpty(lastCkptStr.get())) {
+      dataset = incrementalFetch(lastCkptStr, sourceLimit);
+    } else {
+      LOG.info("No checkpoint references found. Doing a full rdbms table fetch");
+      dataset = fullFetch();
+    }
+
+    if (props.containsKey(Config.STORAGE_LEVEL) && !StringUtils.isNullOrEmpty(props.getString(Config.STORAGE_LEVEL))) {
+      dataset.persist(StorageLevel.fromString(props.getString(Config.STORAGE_LEVEL)));
+    } else {
+      dataset.persist(StorageLevel.MEMORY_AND_DISK_SER());
+    }
+
+    boolean isIncremental = props.getBoolean(Config.IS_INCREMENTAL);
+    Pair<Option<Dataset<Row>>, String> pair = Pair.of(Option.of(dataset), checkpoint(dataset, isIncremental));
+    dataset.unpersist();
+
+    return pair;
+  }
+
+  /**
+   * Does an incremental scan with PPQ query prepared on the bases of previous checkpoint.
+   *
+   * @param lastCheckpoint Last checkpoint
+   * @return The {@link Dataset} after incremental fetch from RDBMS.
+   */
+  @NotNull
+  private Dataset<Row> incrementalFetch(Option<String> lastCheckpoint, long sourceLimit) {
+    try {
+      final String ppdQuery = "(%s) rdbms_table";
+      final SqlQueryBuilder queryBuilder = SqlQueryBuilder.select("*")
+          .from(props.getString(Config.RDBMS_TABLE_NAME))
+          .where(String.format(" %s > '%s'", props.getString(Config.INCREMENTAL_COLUMN), lastCheckpoint.get()));
+
+      if (sourceLimit > 0) {
+        URI jdbcURI = URI.create(props.getString(Config.URL).substring(URI_JDBC_PREFIX.length()));
+        if (DB_LIMIT_CLAUSE.contains(jdbcURI.getScheme())) {
+          queryBuilder.orderBy(props.getString(Config.INCREMENTAL_COLUMN)).limit(sourceLimit);
+        }
+      }
+
+      String query = String.format(ppdQuery, queryBuilder.toString());
+      LOG.error("PPD QUERY: " + query);
+      LOG.info(String.format("Referenced last checkpoint and prepared new predicate pushdown query for jdbc pull %s", query));
+
+      return validatePropsAndGetDataFrameReader(sparkSession, props).option(Config.RDBMS_TABLE_PROP, query).load();
+    } catch (Exception e) {
+      LOG.error("Error while performing an incremental fetch. Not all database support the PPD query we generate to do an incremental scan", e);
+      LOG.warn("Falling back to full scan.");
+
+      return fullFetch();
+    }
+  }
+
+  /**
+   * Does a full scan on the RDBMS data source.
+   *
+   * @return The {@link Dataset} after running full scan.
+   */
+  private Dataset<Row> fullFetch() {
+    return validatePropsAndGetDataFrameReader(sparkSession, props).load();
+  }
+
+  private String checkpoint(Dataset<Row> rowDataset, boolean isIncremental) {
+    try {
+      if (isIncremental) {
+        Column incrementalColumn = rowDataset.col(props.getString(Config.INCREMENTAL_COLUMN));
+        final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first().getString(0);
+        LOG.info(String.format("Checkpointing column %s with value: %s ", incrementalColumn, max));
+        return max;
+      } else {
+        return null;
+      }
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  /**
+   * Inner class with config keys.
+   */
+  protected static class Config {
+
+    /**
+     * {@value #URL} is the jdbc url for the Hoodie datasource.
+     */
+    private static final String URL = "hoodie.datasource.jdbc.url";
+
+    private static final String URL_PROP = "url";
+
+    /**
+     * {@value #USER} is the username used for JDBC connection.
+     */
+    private static final String USER = "hoodie.datasource.jdbc.user";
+
+    /**
+     * {@value #USER_PROP} used internally to build jdbc params.
+     */
+    private static final String USER_PROP = "user";
+
+    /**
+     * {@value #PASSWORD} is the password used for JDBC connection.
+     */
+    private static final String PASSWORD = "hoodie.datasource.jdbc.password";
+
+    /**
+     * {@value #PASSWORD_FILE} is the base-path for the JDBC password file.
+     */
+    private static final String PASSWORD_FILE = "hoodie.datasource.jdbc.password.file";
+
+    /**
+     * {@value #PASSWORD_PROP} used internally to build jdbc params.
+     */
+    private static final String PASSWORD_PROP = "password";
+
+    /**
+     * {@value #DRIVER_CLASS} used for JDBC connection.
+     */
+    private static final String DRIVER_CLASS = "hoodie.datasource.jdbc.driver.class";
+
+    /**
+     * {@value #DRIVER_PROP} used internally to build jdbc params.
+     */
+    private static final String DRIVER_PROP = "driver";
+
+    /**
+     * {@value #RDBMS_TABLE_NAME} RDBMS table to pull.
+     */
+    private static final String RDBMS_TABLE_NAME = "hoodie.datasource.jdbc.table.name";
+
+    /**
+     * {@value #RDBMS_TABLE_PROP} used internally for jdbc.
+     */
+    private static final String RDBMS_TABLE_PROP = "dbtable";
+
+    /**
+     * {@value #INCREMENTAL_COLUMN} if ran in incremental mode, this field will be used to pull new data incrementally.
+     */
+    private static final String INCREMENTAL_COLUMN = "hoodie.datasource.jdbc.table.incremental.column.name";
+
+    /**
+     * {@value #IS_INCREMENTAL} will the JDBC source do an incremental pull?
+     */
+    private static final String IS_INCREMENTAL = "hoodie.datasource.jdbc.incremental.pull";
+
+    /**
+     * {@value #EXTRA_OPTIONS} used to set any extra options the user specifies for jdbc.
+     */
+    private static final String EXTRA_OPTIONS = "hoodie.datasource.jdbc.extra.options.";
+
+    /**
+     * {@value #STORAGE_LEVEL} is used to control the persistence level. Default value: MEMORY_AND_DISK_SER.
+     */
+    private static final String STORAGE_LEVEL = "hoodie.datasource.jdbc.storage.level";
+  }
+}

Review comment:
       line break at EOF

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.SqlQueryBuilder;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.jetbrains.annotations.NotNull;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Reads data from RDBMS data sources.
+ */
+
+public class JdbcSource extends RowSource {
+
+  private static final Logger LOG = LogManager.getLogger(JdbcSource.class);
+  private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2");
+  private static final String URI_JDBC_PREFIX = "jdbc:";
+
+  public JdbcSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+                    SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+  }
+
+  /**
+   * Validates all user properties and prepares the {@link DataFrameReader} to read from RDBMS.
+   *
+   * @param session    The {@link SparkSession}.
+   * @param properties The JDBC connection properties and data source options.
+   * @return The {@link DataFrameReader} to read from RDBMS
+   * @throws HoodieException
+   */
+  private static DataFrameReader validatePropsAndGetDataFrameReader(final SparkSession session,
+                                                                    final TypedProperties properties)
+      throws HoodieException {
+    DataFrameReader dataFrameReader;
+    FSDataInputStream passwordFileStream = null;
+    try {
+      dataFrameReader = session.read().format("jdbc");
+      dataFrameReader = dataFrameReader.option(Config.URL_PROP, properties.getString(Config.URL));
+      dataFrameReader = dataFrameReader.option(Config.USER_PROP, properties.getString(Config.USER));
+      dataFrameReader = dataFrameReader.option(Config.DRIVER_PROP, properties.getString(Config.DRIVER_CLASS));
+      dataFrameReader = dataFrameReader
+          .option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME));
+
+      if (properties.containsKey(Config.PASSWORD)) {
+        LOG.info("Reading JDBC password from properties file....");
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, properties.getString(Config.PASSWORD));
+      } else if (properties.containsKey(Config.PASSWORD_FILE)
+          && !StringUtils.isNullOrEmpty(properties.getString(Config.PASSWORD_FILE))) {
+        LOG.info(String.format("Reading JDBC password from password file %s", properties.getString(Config.PASSWORD_FILE)));
+        FileSystem fileSystem = FileSystem.get(session.sparkContext().hadoopConfiguration());
+        passwordFileStream = fileSystem.open(new Path(properties.getString(Config.PASSWORD_FILE)));
+        byte[] bytes = new byte[passwordFileStream.available()];
+        passwordFileStream.read(bytes);
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, new String(bytes));
+      } else {
+        throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS "
+            + "datasource", Config.PASSWORD_FILE, Config.PASSWORD));
+      }
+
+      addExtraJdbcOptions(properties, dataFrameReader);
+
+      if (properties.getBoolean(Config.IS_INCREMENTAL)) {
+        DataSourceUtils.checkRequiredProperties(properties, Collections.singletonList(Config.INCREMENTAL_COLUMN));
+      }
+      return dataFrameReader;
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    } finally {
+      IOUtils.closeStream(passwordFileStream);
+    }
+  }
+
+  /**
+   * Accepts spark JDBC options from the user in terms of EXTRA_OPTIONS adds them to {@link DataFrameReader} Example: In
+   * a normal spark code you would do something like: session.read.format('jdbc') .option(fetchSize,1000)
+   * .option(timestampFormat,"yyyy-mm-dd hh:mm:ss")
+   * <p>
+   * The way to pass these properties to HUDI is through the config file. Any property starting with
+   * hoodie.datasource.jdbc.extra.options. will be added.
+   * <p>
+   * Example: hoodie.datasource.jdbc.extra.options.fetchSize=100
+   * hoodie.datasource.jdbc.extra.options.upperBound=1
+   * hoodie.datasource.jdbc.extra.options.lowerBound=100
+   *
+   * @param properties      The JDBC connection properties and data source options.
+   * @param dataFrameReader The {@link DataFrameReader} to which data source options will be added.
+   */
+  private static void addExtraJdbcOptions(TypedProperties properties, DataFrameReader dataFrameReader) {
+    Set<Object> objects = properties.keySet();
+    for (Object property : objects) {
+      String prop = (String) property;
+      if (prop.startsWith(Config.EXTRA_OPTIONS)) {
+        String key = String.join("", prop.split(Config.EXTRA_OPTIONS));
+        String value = properties.getString(prop);
+        if (!StringUtils.isNullOrEmpty(value)) {
+          LOG.info(String.format("Adding %s -> %s to jdbc options", key, value));
+          dataFrameReader.option(key, value);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
+    try {
+      DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.URL, Config.DRIVER_CLASS, Config.USER, Config.RDBMS_TABLE_NAME, Config.IS_INCREMENTAL));
+      return fetch(lastCkptStr, sourceLimit);
+    } catch (Exception e) {
+      LOG.error("Exception while running JDBCSource ", e);

Review comment:
       I guess we should throw exception there rather than swallowing. Whats the next step here, if one plans to run deltastreamer in continuous mode(and if exception is thrown). It will just keep on running in a loop w/o doing any work. 
   

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Tests {@link JdbcSource}.
+ */
+public class TestJdbcSource extends UtilitiesTestBase {
+
+  private static final Logger LOG = LogManager.getLogger(TestJdbcSource.class);
+  private static final TypedProperties PROPS = new TypedProperties();
+
+  private static Connection connection;
+  private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator();
+
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+    PROPS.setProperty("hoodie.datasource.jdbc.url", "jdbc:h2:mem:test_mem");
+    PROPS.setProperty("hoodie.datasource.jdbc.driver.class", "org.h2.Driver");
+    PROPS.setProperty("hoodie.datasource.jdbc.user", "test");
+    PROPS.setProperty("hoodie.datasource.jdbc.password", "jdbc");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.name", "triprec");
+    connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc");
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+    close(connection);
+  }
+
+  @Test
+  public void testSingleCommit() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      int numRecords = 100;
+      String commitTime = "000";
+
+      // Insert 100 records with commit time
+      clearAndInsert(commitTime, numRecords);
+
+      // Validate if we have specified records in db
+      assertEquals(numRecords, count());
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), numRecords);
+      assertEquals(numRecords, rowDataset.count());
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testInsertAndUpdate() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      final String commitTime = "000";
+      final int numRecords = 100;
+
+      // Add 100 records. Update half of them with commit time "007".
+      update("007",
+          clearAndInsert(commitTime, numRecords)
+              .stream()
+              .limit(50)
+              .collect(Collectors.toList())
+      );
+      // Check if database has 100 records
+      assertEquals(numRecords, count());
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 100);
+      assertEquals(100, rowDataset.count());
+
+      Dataset<Row> firstCommit = rowDataset.where("commit_time=000");
+      assertEquals(50, firstCommit.count());
+
+      Dataset<Row> secondCommit = rowDataset.where("commit_time=007");
+      assertEquals(50, secondCommit.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testTwoCommits() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+
+      // Add 10 records with commit time 001
+      insert("001", 5);
+      rowDataset = runSource(Option.empty(), 15);
+      assertEquals(15, rowDataset.count());
+      assertEquals(5, rowDataset.where("commit_time=001").count());
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+
+      // Start second commit and check if all records are pulled
+      rowDataset = runSource(Option.empty(), 15);
+      assertEquals(15, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchWithCommitTime() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+
+      // Get max of incremental column
+      Column incrementalColumn = rowDataset
+          .col(PROPS.getString("hoodie.datasource.jdbc.table.incremental.column.name"));
+      final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
+          .getString(0);
+      LOG.info(String.format("Incremental max value: %s", max));
+
+      // Add 10 records with commit time "001"
+      insert("001", 10);
+
+      // Start incremental scan
+      Dataset<Row> rowDataset1 = runSource(Option.of(max), 10);
+      assertEquals(10, rowDataset1.count());
+      assertEquals(10, rowDataset1.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchFallbackToFullFetchWhenError() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+
+      // Get max of incremental column
+      Column incrementalColumn = rowDataset
+          .col(PROPS.getString("hoodie.datasource.jdbc.table.incremental.column.name"));
+      final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
+          .getString(0);
+      LOG.info(String.format("Incremental max value: %s", max));
+
+      // Add 10 records with commit time "001"
+      insert("001", 10);
+
+      PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "dummy_col");
+
+      // Start incremental scan with negative sourceLimit.
+      // This will throw an exception as limit clause does not accept negative values.
+      Dataset<Row> rowDataset1 = runSource(Option.of(max), -1);
+      assertEquals(20, rowDataset1.count());
+      assertEquals(10, rowDataset1.where("commit_time=000").count());
+      assertEquals(10, rowDataset1.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testFullFetchWithCommitTime() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "false");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+      // Add 10 records with commit time "001"
+      insert("001", 10);
+
+      // Start full fetch
+      rowDataset = runSource(Option.empty(), 20);
+      assertEquals(20, rowDataset.count());
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+      assertEquals(10, rowDataset.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testFullFetchWithCheckpoint() {
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "false");
+    PROPS.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+
+      // Get max of incremental column
+      Column incrementalColumn = rowDataset
+          .col(PROPS.getString("hoodie.datasource.jdbc.table.incremental.column.name"));
+      final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
+          .getString(0);
+      LOG.info(String.format("Incremental max value: %s", max));
+
+      // Add 10 records with commit time "001"
+      insert("001", 10);
+
+      // Start incremental scan
+      rowDataset = runSource(Option.of(max), 10);
+      assertEquals(10, rowDataset.count());
+      assertEquals(10, rowDataset.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSourceWithPasswordOnFs() {
+    try {
+      // Write secret string to fs in a file
+      writeSecretToFs();
+      // Remove secret string from props
+      PROPS.remove("hoodie.datasource.jdbc.password");
+      // Set property to read secret from fs file
+      PROPS.setProperty("hoodie.datasource.jdbc.password.file", "file:///tmp/hudi/config/secret");
+      PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "false");
+      // Add 10 records with commit time 000
+      clearAndInsert("000", 10);
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSourceWithNoPasswordThrowsException() {
+    assertThrows(NoSuchElementException.class, () -> {
+      // Write secret string to fs in a file
+      writeSecretToFs();
+      // Remove secret string from props
+      PROPS.remove("hoodie.datasource.jdbc.password");
+      PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "false");
+      // Add 10 records with commit time 000
+      clearAndInsert("000", 10);
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+    });
+  }
+
+  @Test
+  public void testSourceWithExtraOptions() {
+    PROPS.setProperty("hoodie.datasource.jdbc.extra.options.fetchsize", "10");
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "false");
+    try {
+      // Add 20 records with commit time 000
+      clearAndInsert("000", 20);
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(20, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSourceWithStorageLevel() {
+    PROPS.setProperty("hoodie.datasource.jdbc.storage.level", "NONE");
+    PROPS.setProperty("hoodie.datasource.jdbc.incremental.pull", "false");
+    try {
+      // Add 10 records with commit time 000
+      clearAndInsert("000", 10);
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10);
+      assertEquals(10, rowDataset.count());
+      LOG.error("Storage Level: " + rowDataset.storageLevel().toString());
+      assertEquals(StorageLevel.NONE(), rowDataset.storageLevel());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  private void writeSecretToFs() throws IOException {
+    FileSystem fs = FileSystem.get(new Configuration());
+    FSDataOutputStream outputStream = fs.create(new Path("file:///tmp/hudi/config/secret"));
+    outputStream.writeBytes("jdbc");
+    outputStream.close();
+  }
+
+  private static List<HoodieRecord> clearAndInsert(String commitTime, int numRecords)
+      throws SQLException {
+    execute("DROP TABLE triprec", "Table does not exists");
+    execute("CREATE TABLE triprec ("
+        + "id INT NOT NULL AUTO_INCREMENT(1, 1),"
+        + "commit_time VARCHAR(50),"
+        + "row_key VARCHAR(50),"
+        + "rider VARCHAR(50),"
+        + "driver VARCHAR(50),"
+        + "begin_lat DOUBLE PRECISION,"
+        + "begin_lon DOUBLE PRECISION,"
+        + "end_lat DOUBLE PRECISION,"
+        + "end_lon DOUBLE PRECISION,"
+        + "fare DOUBLE PRECISION,"
+        + "last_insert TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP)", "Table already exists");
+
+    return insert(commitTime, numRecords);
+  }
+
+  private static List<HoodieRecord> insert(String commitTime, int numRecords) throws SQLException {
+    PreparedStatement insertStatement =
+        connection.prepareStatement("INSERT INTO triprec ("
+            + "commit_time,"
+            + "row_key,"
+            + "rider,"
+            + "driver,"
+            + "begin_lat,"
+            + "begin_lon,"
+            + "end_lat,"
+            + "end_lon,"
+            + "fare) "
+            + "values(?,?,?,?,?,?,?,?,?)");
+    List<HoodieRecord> hoodieRecords = DATA_GENERATOR.generateInserts(commitTime, numRecords);
+
+    hoodieRecords
+        .stream()
+        .map(r -> {
+          try {
+            return ((GenericRecord) r.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA, PROPS).get());
+          } catch (IOException e) {
+            return null;
+          }
+        })
+        .filter(Objects::nonNull)
+        .forEach(record -> {
+          try {
+            insertStatement.setString(1, commitTime);
+            insertStatement.setString(2, record.get("_row_key").toString());
+            insertStatement.setString(3, record.get("rider").toString());
+            insertStatement.setString(4, record.get("driver").toString());
+            insertStatement.setDouble(5, Double.parseDouble(record.get("begin_lat").toString()));
+            insertStatement.setDouble(6, Double.parseDouble(record.get("begin_lon").toString()));
+            insertStatement.setDouble(7, Double.parseDouble(record.get("end_lat").toString()));
+            insertStatement.setDouble(8, Double.parseDouble(record.get("end_lon").toString()));
+            insertStatement.setDouble(9, Double.parseDouble(((GenericRecord) record.get("fare")).get("amount").toString()));
+            insertStatement.addBatch();
+          } catch (SQLException e) {
+            LOG.warn(e.getMessage());
+          }
+        });
+    insertStatement.executeBatch();
+    close(insertStatement);
+    return hoodieRecords;
+  }
+
+  private static List<HoodieRecord> update(String commitTime, List<HoodieRecord> inserts) throws SQLException, IOException {
+    PreparedStatement updateStatement =
+        connection.prepareStatement("UPDATE triprec set commit_time=?,"
+            + "row_key=?,"
+            + "rider=?,"
+            + "driver=?,"
+            + "begin_lat=?,"
+            + "begin_lon=?,"
+            + "end_lat=?,"
+            + "end_lon=?,"
+            + "fare=?"
+            + "where row_key=?");
+
+    List<HoodieRecord> updateRecords = DATA_GENERATOR.generateUpdates(commitTime, inserts);
+    updateRecords.stream().map(m -> {
+      try {
+        return m.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA, PROPS).get();
+      } catch (IOException e) {
+        return null;
+      }
+    }).filter(Objects::nonNull)
+        .map(r -> ((GenericRecord) r))
+        .sequential()
+        .forEach(r -> {
+          try {
+            updateStatement.setString(1, commitTime);
+            updateStatement.setString(2, r.get("_row_key").toString());
+            updateStatement.setString(3, r.get("rider").toString());
+            updateStatement.setString(4, r.get("driver").toString());
+            updateStatement.setDouble(5, Double.parseDouble(r.get("begin_lat").toString()));
+            updateStatement.setDouble(6, Double.parseDouble(r.get("begin_lon").toString()));
+            updateStatement.setDouble(7, Double.parseDouble(r.get("end_lat").toString()));
+            updateStatement.setDouble(8, Double.parseDouble(r.get("end_lon").toString()));
+            updateStatement.setDouble(9, Double.parseDouble(((GenericRecord) r.get("fare")).get("amount").toString()));
+            updateStatement.setString(10, r.get("_row_key").toString());
+            updateStatement.addBatch();
+          } catch (SQLException e) {
+            LOG.warn(e.getMessage());
+          }
+        });
+    updateStatement.executeBatch();
+    close(updateStatement);
+    return updateRecords;
+  }
+
+  private static void execute(String query, String message) {
+    try (Statement statement = connection.createStatement()) {
+      statement.executeUpdate(query);
+    } catch (SQLException e) {
+      LOG.error(message);

Review comment:
       shouldn't the test fail here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org