You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/06/19 14:12:25 UTC

[hudi] branch master updated: [HUDI-251] Adds JDBC source support for DeltaStreamer (#2915)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cbdb49  [HUDI-251] Adds JDBC source support for DeltaStreamer (#2915)
1cbdb49 is described below

commit 1cbdb49816ac779f803d0b7397f911a2f81ecca5
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Sat Jun 19 19:42:11 2021 +0530

    [HUDI-251] Adds JDBC source support for DeltaStreamer (#2915)
    
    As discussed in RFC-14, this change implements the first phase of JDBC incremental puller.
    It consists following changes:
    
    - JdbcSource: This class extends RowSource and implements
      fetchNextBatch(Option<String> lastCkptStr, long sourceLimit)
    
    - SqlQueryBuilder: A simple utility class to build sql queries fluently.
    
    - Implements two modes of fetching: full and incremental.
      Full is a complete scan of RDBMS table.
      Incremental is delta since last checkpoint.
      Incremental mode falls back to full fetch in case of any exception.
---
 .../org/apache/hudi/utilities/SqlQueryBuilder.java | 146 +++++++
 .../apache/hudi/utilities/sources/JdbcSource.java  | 343 ++++++++++++++++
 .../apache/hudi/utilities/TestSqlQueryBuilder.java |  64 +++
 .../functional/TestHoodieDeltaStreamer.java        |  44 ++
 .../hudi/utilities/sources/TestJdbcSource.java     | 443 +++++++++++++++++++++
 .../hudi/utilities/testutils/JdbcTestUtils.java    | 195 +++++++++
 6 files changed, 1235 insertions(+)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/SqlQueryBuilder.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/SqlQueryBuilder.java
new file mode 100644
index 0000000..a333f44
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/SqlQueryBuilder.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.common.util.StringUtils;
+
+/**
+ * SQL query builder. Current support for: SELECT, FROM, JOIN, ON, WHERE, ORDER BY, LIMIT clauses.
+ */
+public class SqlQueryBuilder {
+
+  private final StringBuilder sqlBuilder;
+
+  private SqlQueryBuilder(StringBuilder sqlBuilder) {
+    this.sqlBuilder = sqlBuilder;
+  }
+
+  /**
+   * Creates a SELECT query.
+   *
+   * @param columns The column names to select.
+   * @return The new {@link SqlQueryBuilder} instance.
+   */
+  public static SqlQueryBuilder select(String... columns) {
+    if (columns == null || columns.length == 0) {
+      throw new IllegalArgumentException("No columns provided with SELECT statement. Please mention column names or '*' to select all columns.");
+    }
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("select ");
+    sqlBuilder.append(String.join(", ", columns));
+    return new SqlQueryBuilder(sqlBuilder);
+  }
+
+  /**
+   * Appends a FROM clause to a query.
+   *
+   * @param tables The table names to select from.
+   * @return The {@link SqlQueryBuilder} instance.
+   */
+  public SqlQueryBuilder from(String... tables) {
+    if (tables == null || tables.length == 0) {
+      throw new IllegalArgumentException("No table name provided with FROM clause. Please provide a table name to select from.");
+    }
+    sqlBuilder.append(" from ");
+    sqlBuilder.append(String.join(", ", tables));
+    return this;
+  }
+
+  /**
+   * Appends a JOIN clause to a query.
+   *
+   * @param table The table to join with.
+   * @return The {@link SqlQueryBuilder} instance.
+   */
+  public SqlQueryBuilder join(String table) {
+    if (StringUtils.isNullOrEmpty(table)) {
+      throw new IllegalArgumentException("No table name provided with JOIN clause. Please provide a table name to join with.");
+    }
+    sqlBuilder.append(" join ");
+    sqlBuilder.append(table);
+    return this;
+  }
+
+  /**
+   * Appends an ON clause to a query.
+   *
+   * @param predicate The predicate to join on.
+   * @return The {@link SqlQueryBuilder} instance.
+   */
+  public SqlQueryBuilder on(String predicate) {
+    if (StringUtils.isNullOrEmpty(predicate)) {
+      throw new IllegalArgumentException();
+    }
+    sqlBuilder.append(" on ");
+    sqlBuilder.append(predicate);
+    return this;
+  }
+
+  /**
+   * Appends a WHERE clause to a query.
+   *
+   * @param predicate The predicate for WHERE clause.
+   * @return The {@link SqlQueryBuilder} instance.
+   */
+  public SqlQueryBuilder where(String predicate) {
+    if (StringUtils.isNullOrEmpty(predicate)) {
+      throw new IllegalArgumentException("No predicate provided with WHERE clause. Please provide a predicate to filter records.");
+    }
+    sqlBuilder.append(" where ");
+    sqlBuilder.append(predicate);
+    return this;
+  }
+
+  /**
+   * Appends an ORDER BY clause to a query. By default, records are ordered in ascending order by the given column.
+   * To order in descending order use DESC after the column name, e.g. queryBuilder.orderBy("update_time desc").
+   *
+   * @param columns Column names to order by.
+   * @return The {@link SqlQueryBuilder} instance.
+   */
+  public SqlQueryBuilder orderBy(String... columns) {
+    if (columns == null || columns.length == 0) {
+      throw new IllegalArgumentException("No columns provided with ORDER BY clause. Please provide a column name to order records.");
+    }
+    sqlBuilder.append(" order by ");
+    sqlBuilder.append(String.join(", ", columns));
+    return this;
+  }
+
+  /**
+   * Appends a "limit" clause to a query.
+   *
+   * @param count The limit count.
+   * @return The {@link SqlQueryBuilder} instance.
+   */
+  public SqlQueryBuilder limit(long count) {
+    if (count < 0) {
+      throw new IllegalArgumentException("Please provide a positive integer for the LIMIT clause.");
+    }
+    sqlBuilder.append(" limit ");
+    sqlBuilder.append(count);
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return sqlBuilder.toString();
+  }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
new file mode 100644
index 0000000..58b970f
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.SqlQueryBuilder;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Reads data from RDBMS data sources.
+ */
+
+public class JdbcSource extends RowSource {
+
+  private static final Logger LOG = LogManager.getLogger(JdbcSource.class);
+  private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2");
+  private static final String URI_JDBC_PREFIX = "jdbc:";
+
+  public JdbcSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+                    SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+  }
+
+  /**
+   * Validates all user properties and prepares the {@link DataFrameReader} to read from RDBMS.
+   *
+   * @param session    The {@link SparkSession}.
+   * @param properties The JDBC connection properties and data source options.
+   * @return The {@link DataFrameReader} to read from RDBMS
+   * @throws HoodieException
+   */
+  private static DataFrameReader validatePropsAndGetDataFrameReader(final SparkSession session,
+                                                                    final TypedProperties properties)
+      throws HoodieException {
+    DataFrameReader dataFrameReader;
+    FSDataInputStream passwordFileStream = null;
+    try {
+      dataFrameReader = session.read().format("jdbc");
+      dataFrameReader = dataFrameReader.option(Config.URL_PROP, properties.getString(Config.URL));
+      dataFrameReader = dataFrameReader.option(Config.USER_PROP, properties.getString(Config.USER));
+      dataFrameReader = dataFrameReader.option(Config.DRIVER_PROP, properties.getString(Config.DRIVER_CLASS));
+      dataFrameReader = dataFrameReader
+          .option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME));
+
+      if (properties.containsKey(Config.PASSWORD)) {
+        LOG.info("Reading JDBC password from properties file....");
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, properties.getString(Config.PASSWORD));
+      } else if (properties.containsKey(Config.PASSWORD_FILE)
+          && !StringUtils.isNullOrEmpty(properties.getString(Config.PASSWORD_FILE))) {
+        LOG.info(String.format("Reading JDBC password from password file %s", properties.getString(Config.PASSWORD_FILE)));
+        FileSystem fileSystem = FileSystem.get(session.sparkContext().hadoopConfiguration());
+        passwordFileStream = fileSystem.open(new Path(properties.getString(Config.PASSWORD_FILE)));
+        byte[] bytes = new byte[passwordFileStream.available()];
+        passwordFileStream.read(bytes);
+        dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, new String(bytes));
+      } else {
+        throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS "
+            + "datasource", Config.PASSWORD_FILE, Config.PASSWORD));
+      }
+
+      addExtraJdbcOptions(properties, dataFrameReader);
+
+      if (properties.getBoolean(Config.IS_INCREMENTAL)) {
+        DataSourceUtils.checkRequiredProperties(properties, Collections.singletonList(Config.INCREMENTAL_COLUMN));
+      }
+      return dataFrameReader;
+    } catch (Exception e) {
+      throw new HoodieException("Failed to validate properties", e);
+    } finally {
+      IOUtils.closeStream(passwordFileStream);
+    }
+  }
+
+  /**
+   * Accepts spark JDBC options from the user in terms of EXTRA_OPTIONS adds them to {@link DataFrameReader} Example: In
+   * a normal spark code you would do something like: session.read.format('jdbc') .option(fetchSize,1000)
+   * .option(timestampFormat,"yyyy-mm-dd hh:mm:ss")
+   * <p>
+   * The way to pass these properties to HUDI is through the config file. Any property starting with
+   * hoodie.deltastreamer.jdbc.extra.options. will be added.
+   * <p>
+   * Example: hoodie.deltastreamer.jdbc.extra.options.fetchSize=100
+   * hoodie.deltastreamer.jdbc.extra.options.upperBound=1
+   * hoodie.deltastreamer.jdbc.extra.options.lowerBound=100
+   *
+   * @param properties      The JDBC connection properties and data source options.
+   * @param dataFrameReader The {@link DataFrameReader} to which data source options will be added.
+   */
+  private static void addExtraJdbcOptions(TypedProperties properties, DataFrameReader dataFrameReader) {
+    Set<Object> objects = properties.keySet();
+    for (Object property : objects) {
+      String prop = property.toString();
+      if (prop.startsWith(Config.EXTRA_OPTIONS)) {
+        String key = String.join("", prop.split(Config.EXTRA_OPTIONS));
+        String value = properties.getString(prop);
+        if (!StringUtils.isNullOrEmpty(value)) {
+          LOG.info(String.format("Adding %s -> %s to jdbc options", key, value));
+          dataFrameReader.option(key, value);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) throws HoodieException {
+    try {
+      DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.URL, Config.DRIVER_CLASS, Config.USER, Config.RDBMS_TABLE_NAME, Config.IS_INCREMENTAL));
+      return fetch(lastCkptStr, sourceLimit);
+    } catch (HoodieException e) {
+      LOG.error("Exception while running JDBCSource ", e);
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Exception while running JDBCSource ", e);
+      throw new HoodieException("Error fetching next batch from JDBC source. Last checkpoint: " + lastCkptStr.orElse(null), e);
+    }
+  }
+
+  /**
+   * Decide to do a full RDBMS table scan or an incremental scan based on the lastCkptStr. If previous checkpoint
+   * value exists then we do an incremental scan with a PPD query or else we do a full scan. In certain cases where the
+   * incremental query fails, we fallback to a full scan.
+   *
+   * @param lastCkptStr Last checkpoint.
+   * @return The pair of {@link Dataset} and current checkpoint.
+   */
+  private Pair<Option<Dataset<Row>>, String> fetch(Option<String> lastCkptStr, long sourceLimit) {
+    Dataset<Row> dataset;
+    if (lastCkptStr.isPresent() && !StringUtils.isNullOrEmpty(lastCkptStr.get())) {
+      dataset = incrementalFetch(lastCkptStr, sourceLimit);
+    } else {
+      LOG.info("No checkpoint references found. Doing a full rdbms table fetch");
+      dataset = fullFetch(sourceLimit);
+    }
+    dataset.persist(StorageLevel.fromString(props.getString(Config.STORAGE_LEVEL, "MEMORY_AND_DISK_SER")));
+    boolean isIncremental = props.getBoolean(Config.IS_INCREMENTAL);
+    Pair<Option<Dataset<Row>>, String> pair = Pair.of(Option.of(dataset), checkpoint(dataset, isIncremental, lastCkptStr));
+    dataset.unpersist();
+    return pair;
+  }
+
+  /**
+   * Does an incremental scan with PPQ query prepared on the bases of previous checkpoint.
+   *
+   * @param lastCheckpoint Last checkpoint.
+   *                       Note that the records fetched will be exclusive of the last checkpoint (i.e. incremental column value > lastCheckpoint).
+   * @return The {@link Dataset} after incremental fetch from RDBMS.
+   */
+  private Dataset<Row> incrementalFetch(Option<String> lastCheckpoint, long sourceLimit) {
+    try {
+      final String ppdQuery = "(%s) rdbms_table";
+      final SqlQueryBuilder queryBuilder = SqlQueryBuilder.select("*")
+          .from(props.getString(Config.RDBMS_TABLE_NAME))
+          .where(String.format(" %s > '%s'", props.getString(Config.INCREMENTAL_COLUMN), lastCheckpoint.get()));
+
+      if (sourceLimit > 0) {
+        URI jdbcURI = URI.create(props.getString(Config.URL).substring(URI_JDBC_PREFIX.length()));
+        if (DB_LIMIT_CLAUSE.contains(jdbcURI.getScheme())) {
+          queryBuilder.orderBy(props.getString(Config.INCREMENTAL_COLUMN)).limit(sourceLimit);
+        }
+      }
+      String query = String.format(ppdQuery, queryBuilder.toString());
+      LOG.info("PPD QUERY: " + query);
+      LOG.info(String.format("Referenced last checkpoint and prepared new predicate pushdown query for jdbc pull %s", query));
+      return validatePropsAndGetDataFrameReader(sparkSession, props).option(Config.RDBMS_TABLE_PROP, query).load();
+    } catch (Exception e) {
+      LOG.error("Error while performing an incremental fetch. Not all database support the PPD query we generate to do an incremental scan", e);
+      if (props.containsKey(Config.FALLBACK_TO_FULL_FETCH) && props.getBoolean(Config.FALLBACK_TO_FULL_FETCH)) {
+        LOG.warn("Falling back to full scan.");
+        return fullFetch(sourceLimit);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Does a full scan on the RDBMS data source.
+   *
+   * @return The {@link Dataset} after running full scan.
+   */
+  private Dataset<Row> fullFetch(long sourceLimit) {
+    final String ppdQuery = "(%s) rdbms_table";
+    final SqlQueryBuilder queryBuilder = SqlQueryBuilder.select("*")
+        .from(props.getString(Config.RDBMS_TABLE_NAME));
+    if (sourceLimit > 0) {
+      URI jdbcURI = URI.create(props.getString(Config.URL).substring(URI_JDBC_PREFIX.length()));
+      if (DB_LIMIT_CLAUSE.contains(jdbcURI.getScheme())) {
+        if (props.containsKey(Config.INCREMENTAL_COLUMN)) {
+          queryBuilder.orderBy(props.getString(Config.INCREMENTAL_COLUMN)).limit(sourceLimit);
+        } else {
+          queryBuilder.limit(sourceLimit);
+        }
+      }
+    }
+    String query = String.format(ppdQuery, queryBuilder.toString());
+    return validatePropsAndGetDataFrameReader(sparkSession, props).option(Config.RDBMS_TABLE_PROP, query).load();
+  }
+
+  private String checkpoint(Dataset<Row> rowDataset, boolean isIncremental, Option<String> lastCkptStr) {
+    try {
+      if (isIncremental) {
+        Column incrementalColumn = rowDataset.col(props.getString(Config.INCREMENTAL_COLUMN));
+        final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first().getString(0);
+        LOG.info(String.format("Checkpointing column %s with value: %s ", incrementalColumn, max));
+        if (max != null) {
+          return max;
+        }
+        return lastCkptStr.isPresent() && !StringUtils.isNullOrEmpty(lastCkptStr.get()) ? lastCkptStr.get() : StringUtils.EMPTY_STRING;
+      } else {
+        return StringUtils.EMPTY_STRING;
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to checkpoint");
+      throw new HoodieException("Failed to checkpoint. Last checkpoint: " + lastCkptStr.orElse(null), e);
+    }
+  }
+
+  /**
+   * Inner class with config keys.
+   */
+  protected static class Config {
+
+    /**
+     * {@value #URL} is the jdbc url for the Hoodie datasource.
+     */
+    private static final String URL = "hoodie.deltastreamer.jdbc.url";
+
+    private static final String URL_PROP = "url";
+
+    /**
+     * {@value #USER} is the username used for JDBC connection.
+     */
+    private static final String USER = "hoodie.deltastreamer.jdbc.user";
+
+    /**
+     * {@value #USER_PROP} used internally to build jdbc params.
+     */
+    private static final String USER_PROP = "user";
+
+    /**
+     * {@value #PASSWORD} is the password used for JDBC connection.
+     */
+    private static final String PASSWORD = "hoodie.deltastreamer.jdbc.password";
+
+    /**
+     * {@value #PASSWORD_FILE} is the base-path for the JDBC password file.
+     */
+    private static final String PASSWORD_FILE = "hoodie.deltastreamer.jdbc.password.file";
+
+    /**
+     * {@value #PASSWORD_PROP} used internally to build jdbc params.
+     */
+    private static final String PASSWORD_PROP = "password";
+
+    /**
+     * {@value #DRIVER_CLASS} used for JDBC connection.
+     */
+    private static final String DRIVER_CLASS = "hoodie.deltastreamer.jdbc.driver.class";
+
+    /**
+     * {@value #DRIVER_PROP} used internally to build jdbc params.
+     */
+    private static final String DRIVER_PROP = "driver";
+
+    /**
+     * {@value #RDBMS_TABLE_NAME} RDBMS table to pull.
+     */
+    private static final String RDBMS_TABLE_NAME = "hoodie.deltastreamer.jdbc.table.name";
+
+    /**
+     * {@value #RDBMS_TABLE_PROP} used internally for jdbc.
+     */
+    private static final String RDBMS_TABLE_PROP = "dbtable";
+
+    /**
+     * {@value #INCREMENTAL_COLUMN} if ran in incremental mode, this field will be used to pull new data incrementally.
+     */
+    private static final String INCREMENTAL_COLUMN = "hoodie.deltastreamer.jdbc.table.incr.column.name";
+
+    /**
+     * {@value #IS_INCREMENTAL} will the JDBC source do an incremental pull?
+     */
+    private static final String IS_INCREMENTAL = "hoodie.deltastreamer.jdbc.incr.pull";
+
+    /**
+     * {@value #EXTRA_OPTIONS} used to set any extra options the user specifies for jdbc.
+     */
+    private static final String EXTRA_OPTIONS = "hoodie.deltastreamer.jdbc.extra.options.";
+
+    /**
+     * {@value #STORAGE_LEVEL} is used to control the persistence level. Default value: MEMORY_AND_DISK_SER.
+     */
+    private static final String STORAGE_LEVEL = "hoodie.deltastreamer.jdbc.storage.level";
+
+    /**
+     * {@value #FALLBACK_TO_FULL_FETCH} is a boolean, which if set true, makes incremental fetch to fallback to full fetch in case of any error.
+     */
+    private static final String FALLBACK_TO_FULL_FETCH = "hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch";
+  }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSqlQueryBuilder.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSqlQueryBuilder.java
new file mode 100644
index 0000000..bfc66ae
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSqlQueryBuilder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests {@link SqlQueryBuilder}.
+ */
+public class TestSqlQueryBuilder {
+
+  @Test
+  public void testSelect() {
+    String sql = SqlQueryBuilder.select("id", "rider", "time")
+        .from("trips")
+        .join("users").on("trips.rider = users.id")
+        .where("(trips.time > 100 or trips.time < 200)")
+        .orderBy("id", "time")
+        .limit(10).toString();
+
+    assertEquals("select id, rider, time from trips "
+        + "join users on trips.rider = users.id "
+        + "where (trips.time > 100 or trips.time < 200) "
+        + "order by id, time "
+        + "limit 10", sql);
+  }
+
+  @Test
+  public void testIncorrectQueries() {
+    assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select().toString());
+
+    assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("*").from().toString());
+
+    assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("id").from("trips").where("").toString());
+
+    assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("id").from("trips").join("").toString());
+
+    assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("id").from("trips").join("riders").on("").toString());
+
+    assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("id").from("trips").join("riders").where("id > 0").orderBy().toString());
+
+    assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("id").from("trips").join("riders").where("id > 0").orderBy("id").limit(-1).toString());
+  }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index eb3de6d..64fc531 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.utilities.functional;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
 import java.util.ConcurrentModificationException;
 import java.util.concurrent.ExecutorService;
 import org.apache.hudi.DataSourceWriteOptions;
@@ -50,10 +52,12 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.CsvDFSSource;
 import org.apache.hudi.utilities.sources.HoodieIncrSource;
 import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.JdbcSource;
 import org.apache.hudi.utilities.sources.JsonKafkaSource;
 import org.apache.hudi.utilities.sources.ParquetDFSSource;
 import org.apache.hudi.utilities.sources.TestDataSource;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
+import org.apache.hudi.utilities.testutils.JdbcTestUtils;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
 import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
@@ -109,6 +113,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end.
@@ -1594,6 +1599,45 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
     testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
+  @Test
+  public void testJdbcSourceIncrementalFetchInContinuousMode() {
+    try (Connection connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc")) {
+      TypedProperties props = new TypedProperties();
+      props.setProperty("hoodie.deltastreamer.jdbc.url", "jdbc:h2:mem:test_mem");
+      props.setProperty("hoodie.deltastreamer.jdbc.driver.class", "org.h2.Driver");
+      props.setProperty("hoodie.deltastreamer.jdbc.user", "test");
+      props.setProperty("hoodie.deltastreamer.jdbc.password", "jdbc");
+      props.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec");
+      props.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
+      props.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id");
+
+      props.setProperty("hoodie.datasource.write.keygenerator.class", SimpleKeyGenerator.class.getName());
+      props.setProperty("hoodie.datasource.write.recordkey.field", "ID");
+      props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+
+      UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/test-jdbc-source.properties");
+
+      int numRecords = 1000;
+      int sourceLimit = 100;
+      String tableBasePath = dfsBasePath + "/triprec";
+      HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, JdbcSource.class.getName(),
+          null, "test-jdbc-source.properties", false,
+          false, sourceLimit, false, null, null, "timestamp");
+      cfg.continuousMode = true;
+      // Add 1000 records
+      JdbcTestUtils.clearAndInsert("000", numRecords, connection, new HoodieTestDataGenerator(), props);
+
+      HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
+      deltaStreamerTestRunner(deltaStreamer, cfg, (r) -> {
+        TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + ((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath, dfs);
+        TestHelpers.assertRecordCount(numRecords, tableBasePath + "/*/*.parquet", sqlContext);
+        return true;
+      });
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
   /**
    * UDF to calculate Haversine distance.
    */
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
new file mode 100644
index 0000000..62aebe3
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.storage.StorageLevel;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.utilities.testutils.JdbcTestUtils.clearAndInsert;
+import static org.apache.hudi.utilities.testutils.JdbcTestUtils.close;
+import static org.apache.hudi.utilities.testutils.JdbcTestUtils.count;
+import static org.apache.hudi.utilities.testutils.JdbcTestUtils.insert;
+import static org.apache.hudi.utilities.testutils.JdbcTestUtils.update;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Tests {@link JdbcSource}.
+ */
+public class TestJdbcSource extends UtilitiesTestBase {
+
+  private static final TypedProperties PROPS = new TypedProperties();
+  private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator();
+  private static Connection connection;
+
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.url", "jdbc:h2:mem:test_mem");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.driver.class", "org.h2.Driver");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.user", "test");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.password", "jdbc");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec");
+    connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc");
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+    close(connection);
+  }
+
+  @Test
+  public void testSingleCommit() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
+
+    try {
+      int numRecords = 100;
+      String commitTime = "000";
+
+      // Insert 100 records with commit time
+      clearAndInsert(commitTime, numRecords, connection, DATA_GENERATOR, PROPS);
+
+      // Validate if we have specified records in db
+      assertEquals(numRecords, count(connection, "triprec"));
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), numRecords).getBatch().get();
+      assertEquals(numRecords, rowDataset.count());
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testInsertAndUpdate() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
+
+    try {
+      final String commitTime = "000";
+      final int numRecords = 100;
+
+      // Add 100 records. Update half of them with commit time "007".
+      update("007",
+          clearAndInsert(commitTime, numRecords, connection, DATA_GENERATOR, PROPS)
+              .stream()
+              .limit(50)
+              .collect(Collectors.toList()),
+          connection, DATA_GENERATOR, PROPS
+      );
+      // Check if database has 100 records
+      assertEquals(numRecords, count(connection, "triprec"));
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 100).getBatch().get();
+      assertEquals(100, rowDataset.count());
+
+      Dataset<Row> firstCommit = rowDataset.where("commit_time=000");
+      assertEquals(50, firstCommit.count());
+
+      Dataset<Row> secondCommit = rowDataset.where("commit_time=007");
+      assertEquals(50, secondCommit.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testTwoCommits() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get();
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+
+      // Add 10 records with commit time 001
+      insert("001", 5, connection, DATA_GENERATOR, PROPS);
+      rowDataset = runSource(Option.empty(), 15).getBatch().get();
+      assertEquals(15, rowDataset.count());
+      assertEquals(5, rowDataset.where("commit_time=001").count());
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+
+      // Start second commit and check if all records are pulled
+      rowDataset = runSource(Option.empty(), 15).getBatch().get();
+      assertEquals(15, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchWithCommitTime() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
+
+      // Start JdbcSource
+      InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 10);
+      Dataset<Row> rowDataset = batch.getBatch().get();
+      assertEquals(10, rowDataset.count());
+
+      // Add 10 records with commit time "001"
+      insert("001", 10, connection, DATA_GENERATOR, PROPS);
+
+      // Start incremental scan
+      rowDataset = runSource(Option.of(batch.getCheckpointForNextBatch()), 10).getBatch().get();
+      assertEquals(10, rowDataset.count());
+      assertEquals(10, rowDataset.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchWithNoMatchingRows() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
+
+      // Start JdbcSource
+      InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 10);
+      Dataset<Row> rowDataset = batch.getBatch().get();
+      assertEquals(10, rowDataset.count());
+
+      // Start incremental scan
+      rowDataset = runSource(Option.of(batch.getCheckpointForNextBatch()), 10).getBatch().get();
+      assertEquals(0, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchWhenTableRecordsMoreThanSourceLimit() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id");
+
+    try {
+      // Add 100 records with commit time "000"
+      clearAndInsert("000", 100, connection, DATA_GENERATOR, PROPS);
+
+      // Start JdbcSource
+      InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 100);
+      Dataset<Row> rowDataset = batch.getBatch().get();
+      assertEquals(100, rowDataset.count());
+
+      // Add 100 records with commit time "001"
+      insert("001", 100, connection, DATA_GENERATOR, PROPS);
+
+      // Start incremental scan. Now there are 100 more records but with sourceLimit set to 60, only fetch 60 records should be fetched.
+      // Those 50 records should be of the commit_time=001 because records with commit_time=000 have already been processed.
+      batch = runSource(Option.of(batch.getCheckpointForNextBatch()), 60);
+      rowDataset = batch.getBatch().get();
+      assertEquals(60, rowDataset.count());
+      assertEquals(60, rowDataset.where("commit_time=001").count());
+      // No more records added, but sourceLimit is now set to 75. Still, only the remaining 40 records should be fetched.
+      rowDataset = runSource(Option.of(batch.getCheckpointForNextBatch()), 75).getBatch().get();
+      assertEquals(40, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchWhenLastCheckpointMoreThanTableRecords() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id");
+
+    try {
+      // Add 100 records with commit time "000"
+      clearAndInsert("000", 100, connection, DATA_GENERATOR, PROPS);
+
+      // Start JdbcSource
+      InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 100);
+      Dataset<Row> rowDataset = batch.getBatch().get();
+      assertEquals(100, rowDataset.count());
+      assertEquals("100", batch.getCheckpointForNextBatch());
+
+      // Add 100 records with commit time "001"
+      insert("001", 100, connection, DATA_GENERATOR, PROPS);
+
+      // Start incremental scan. With checkpoint greater than the number of records, there should not be any dataset to fetch.
+      batch = runSource(Option.of("200"), 50);
+      rowDataset = batch.getBatch().get();
+      assertEquals(0, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncrementalFetchFallbackToFullFetchWhenError() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
+
+      // Start JdbcSource
+      InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 10);
+      Dataset<Row> rowDataset = batch.getBatch().get();
+      assertEquals(10, rowDataset.count());
+
+      // Add 10 records with commit time "001"
+      insert("001", 10, connection, DATA_GENERATOR, PROPS);
+
+      PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "dummy_col");
+      assertThrows(HoodieException.class, () -> {
+        // Start incremental scan with a dummy column that does not exist.
+        // This will throw an exception as the default behavior is to not fallback to full fetch.
+        runSource(Option.of(batch.getCheckpointForNextBatch()), -1);
+      });
+
+      PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch", "true");
+
+      // Start incremental scan with a dummy column that does not exist.
+      // This will fallback to full fetch mode but still throw an exception checkpointing will fail.
+      Exception exception = assertThrows(HoodieException.class, () -> {
+        runSource(Option.of(batch.getCheckpointForNextBatch()), -1);
+      });
+      assertTrue(exception.getMessage().contains("Failed to checkpoint"));
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testFullFetchWithCommitTime() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
+
+      // Start JdbcSource
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get();
+      assertEquals(10, rowDataset.count());
+      // Add 10 records with commit time "001"
+      insert("001", 10, connection, DATA_GENERATOR, PROPS);
+
+      // Start full fetch
+      rowDataset = runSource(Option.empty(), 20).getBatch().get();
+      assertEquals(20, rowDataset.count());
+      assertEquals(10, rowDataset.where("commit_time=000").count());
+      assertEquals(10, rowDataset.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testFullFetchWithCheckpoint() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
+
+    try {
+      // Add 10 records with commit time "000"
+      clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
+
+      // Start JdbcSource
+      InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 10);
+      Dataset<Row> rowDataset = batch.getBatch().get();
+      assertEquals(10, rowDataset.count());
+      assertEquals("", batch.getCheckpointForNextBatch());
+
+      // Get max of incremental column
+      Column incrementalColumn = rowDataset
+          .col(PROPS.getString("hoodie.deltastreamer.jdbc.table.incr.column.name"));
+      final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
+          .getString(0);
+
+      // Add 10 records with commit time "001"
+      insert("001", 10, connection, DATA_GENERATOR, PROPS);
+
+      // Start incremental scan
+      rowDataset = runSource(Option.of(max), 10).getBatch().get();
+      assertEquals(10, rowDataset.count());
+      assertEquals(10, rowDataset.where("commit_time=001").count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSourceWithPasswordOnFs() {
+    try {
+      // Write secret string to fs in a file
+      writeSecretToFs();
+      // Remove secret string from props
+      PROPS.remove("hoodie.deltastreamer.jdbc.password");
+      // Set property to read secret from fs file
+      PROPS.setProperty("hoodie.deltastreamer.jdbc.password.file", "file:///tmp/hudi/config/secret");
+      PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+      // Add 10 records with commit time 000
+      clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get();
+      assertEquals(10, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSourceWithNoPasswordThrowsException() {
+    assertThrows(HoodieException.class, () -> {
+      // Write secret string to fs in a file
+      writeSecretToFs();
+      // Remove secret string from props
+      PROPS.remove("hoodie.deltastreamer.jdbc.password");
+      PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+      // Add 10 records with commit time 000
+      clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
+      runSource(Option.empty(), 10);
+    });
+  }
+
+  @Test
+  public void testSourceWithExtraOptions() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.extra.options.fetchsize", "10");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+    PROPS.remove("hoodie.deltastreamer.jdbc.table.incr.column.name");
+    try {
+      // Add 20 records with commit time 000
+      clearAndInsert("000", 20, connection, DATA_GENERATOR, PROPS);
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get();
+      assertEquals(10, rowDataset.count());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSourceWithStorageLevel() {
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.storage.level", "NONE");
+    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+    try {
+      // Add 10 records with commit time 000
+      clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
+      Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get();
+      assertEquals(10, rowDataset.count());
+      assertEquals(StorageLevel.NONE(), rowDataset.storageLevel());
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  private void writeSecretToFs() throws IOException {
+    FileSystem fs = FileSystem.get(new Configuration());
+    FSDataOutputStream outputStream = fs.create(new Path("file:///tmp/hudi/config/secret"));
+    outputStream.writeBytes("jdbc");
+    outputStream.close();
+  }
+
+  private InputBatch<Dataset<Row>> runSource(Option<String> lastCkptStr, long sourceLimit) {
+    Source<Dataset<Row>> jdbcSource = new JdbcSource(PROPS, jsc, sparkSession, null);
+    return jdbcSource.fetchNewData(lastCkptStr, sourceLimit);
+  }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/JdbcTestUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/JdbcTestUtils.java
new file mode 100644
index 0000000..377063e
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/JdbcTestUtils.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.testutils;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Helper class used in testing {@link org.apache.hudi.utilities.sources.JdbcSource}.
+ */
+public class JdbcTestUtils {
+
+  private static final Logger LOG = LogManager.getLogger(JdbcTestUtils.class);
+
+  public static List<HoodieRecord> clearAndInsert(String commitTime, int numRecords, Connection connection, HoodieTestDataGenerator dataGenerator, TypedProperties props)
+      throws SQLException {
+    execute(connection, "DROP TABLE triprec", "Table does not exists");
+    execute(connection, "CREATE TABLE triprec ("
+        + "id INT NOT NULL AUTO_INCREMENT(1, 1),"
+        + "commit_time VARCHAR(50),"
+        + "_row_key VARCHAR(50),"
+        + "rider VARCHAR(50),"
+        + "driver VARCHAR(50),"
+        + "begin_lat DOUBLE PRECISION,"
+        + "begin_lon DOUBLE PRECISION,"
+        + "end_lat DOUBLE PRECISION,"
+        + "end_lon DOUBLE PRECISION,"
+        + "fare DOUBLE PRECISION,"
+        + "last_insert TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP)", "Table already exists");
+
+    return insert(commitTime, numRecords, connection, dataGenerator, props);
+  }
+
+  public static List<HoodieRecord> insert(String commitTime, int numRecords, Connection connection, HoodieTestDataGenerator dataGenerator, TypedProperties props) throws SQLException {
+    PreparedStatement insertStatement =
+        connection.prepareStatement("INSERT INTO triprec ("
+            + "commit_time,"
+            + "_row_key,"
+            + "rider,"
+            + "driver,"
+            + "begin_lat,"
+            + "begin_lon,"
+            + "end_lat,"
+            + "end_lon,"
+            + "fare) "
+            + "values(?,?,?,?,?,?,?,?,?)");
+    List<HoodieRecord> hoodieRecords = dataGenerator.generateInserts(commitTime, numRecords);
+
+    hoodieRecords
+        .stream()
+        .map(r -> {
+          try {
+            return ((GenericRecord) r.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA, props).get());
+          } catch (IOException e) {
+            return null;
+          }
+        })
+        .filter(Objects::nonNull)
+        .forEach(record -> {
+          try {
+            insertStatement.setString(1, commitTime);
+            insertStatement.setString(2, record.get("_row_key").toString());
+            insertStatement.setString(3, record.get("rider").toString());
+            insertStatement.setString(4, record.get("driver").toString());
+            insertStatement.setDouble(5, Double.parseDouble(record.get("begin_lat").toString()));
+            insertStatement.setDouble(6, Double.parseDouble(record.get("begin_lon").toString()));
+            insertStatement.setDouble(7, Double.parseDouble(record.get("end_lat").toString()));
+            insertStatement.setDouble(8, Double.parseDouble(record.get("end_lon").toString()));
+            insertStatement.setDouble(9, Double.parseDouble(((GenericRecord) record.get("fare")).get("amount").toString()));
+            insertStatement.addBatch();
+          } catch (SQLException e) {
+            LOG.warn(e.getMessage());
+          }
+        });
+    insertStatement.executeBatch();
+    close(insertStatement);
+    return hoodieRecords;
+  }
+
+  public static List<HoodieRecord> update(String commitTime, List<HoodieRecord> inserts, Connection connection, HoodieTestDataGenerator dataGenerator, TypedProperties props)
+      throws SQLException, IOException {
+    PreparedStatement updateStatement =
+        connection.prepareStatement("UPDATE triprec set commit_time=?,"
+            + "_row_key=?,"
+            + "rider=?,"
+            + "driver=?,"
+            + "begin_lat=?,"
+            + "begin_lon=?,"
+            + "end_lat=?,"
+            + "end_lon=?,"
+            + "fare=?"
+            + "where _row_key=?");
+
+    List<HoodieRecord> updateRecords = dataGenerator.generateUpdates(commitTime, inserts);
+    updateRecords.stream().map(m -> {
+      try {
+        return m.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA, props).get();
+      } catch (IOException e) {
+        return null;
+      }
+    }).filter(Objects::nonNull)
+        .map(r -> ((GenericRecord) r))
+        .sequential()
+        .forEach(r -> {
+          try {
+            updateStatement.setString(1, commitTime);
+            updateStatement.setString(2, r.get("_row_key").toString());
+            updateStatement.setString(3, r.get("rider").toString());
+            updateStatement.setString(4, r.get("driver").toString());
+            updateStatement.setDouble(5, Double.parseDouble(r.get("begin_lat").toString()));
+            updateStatement.setDouble(6, Double.parseDouble(r.get("begin_lon").toString()));
+            updateStatement.setDouble(7, Double.parseDouble(r.get("end_lat").toString()));
+            updateStatement.setDouble(8, Double.parseDouble(r.get("end_lon").toString()));
+            updateStatement.setDouble(9, Double.parseDouble(((GenericRecord) r.get("fare")).get("amount").toString()));
+            updateStatement.setString(10, r.get("_row_key").toString());
+            updateStatement.addBatch();
+          } catch (SQLException e) {
+            LOG.warn(e.getMessage());
+          }
+        });
+    updateStatement.executeBatch();
+    close(updateStatement);
+    return updateRecords;
+  }
+
+  private static void execute(Connection connection, String query, String message) {
+    try (Statement statement = connection.createStatement()) {
+      statement.executeUpdate(query);
+    } catch (SQLException e) {
+      LOG.error(message);
+    }
+  }
+
+  private static void close(Statement statement) {
+    try {
+      if (statement != null) {
+        statement.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Error while closing statement. " + e.getMessage());
+    }
+  }
+
+  public static void close(Connection connection) {
+    try {
+      if (connection != null) {
+        connection.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Error while closing connection. " + e.getMessage());
+    }
+  }
+
+  public static int count(Connection connection, String tableName) {
+    try (Statement statement = connection.createStatement()) {
+      ResultSet rs = statement.executeQuery(String.format("select count(*) from %s", tableName));
+      rs.next();
+      return rs.getInt(1);
+    } catch (SQLException e) {
+      LOG.warn("Error while counting records. " + e.getMessage());
+      return 0;
+    }
+  }
+}