You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/09/13 15:51:10 UTC

[pulsar] branch master updated: Debezium Oracle Source (#11520)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e2bc52d  Debezium Oracle Source (#11520)
e2bc52d is described below

commit e2bc52d40450fa00af258c4432a5b71d50a5c6e0
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Mon Sep 13 08:50:18 2021 -0700

    Debezium Oracle Source (#11520)
---
 .../workflows/ci-integration-pulsar-io-ora.yaml    | 118 ++++++++++
 build/run_integration_group.sh                     |  10 +-
 distribution/io/src/assemble/io.xml                |   1 +
 pulsar-io/debezium/{ => oracle}/pom.xml            |  39 +++-
 .../io/debezium/oracle/DebeziumOracleSource.java   |  37 +++
 .../resources/META-INF/services/pulsar-io.yaml     |  22 ++
 .../resources/debezium-oracle-source-config.yaml   |  38 ++++
 pulsar-io/debezium/pom.xml                         |   1 +
 .../docker-images/latest-version-image/Dockerfile  |  15 ++
 .../integration/containers/ChaosContainer.java     |  12 +
 .../containers/DebeziumOracleDbContainer.java      |  62 +++++
 .../tests/integration/io/sources/SourceTester.java |  13 +-
 .../debezium/DebeziumOracleDbSourceTester.java     | 253 +++++++++++++++++++++
 .../debezium/PulsarDebeziumOracleSourceTest.java   | 101 ++++++++
 .../debezium/PulsarDebeziumSourcesTest.java        |  17 +-
 .../debezium/PulsarIODebeziumSourceRunner.java     |   2 +-
 .../tests/integration/utils/DockerUtils.java       |  38 +++-
 ...ulsar-io-suite.xml => pulsar-io-ora-source.xml} |  13 +-
 .../src/test/resources/pulsar-io-suite.xml         |   1 +
 tests/integration/src/test/resources/pulsar.xml    |   1 +
 20 files changed, 764 insertions(+), 30 deletions(-)

diff --git a/.github/workflows/ci-integration-pulsar-io-ora.yaml b/.github/workflows/ci-integration-pulsar-io-ora.yaml
new file mode 100644
index 0000000..8bb1cba
--- /dev/null
+++ b/.github/workflows/ci-integration-pulsar-io-ora.yaml
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: CI - Integration - Pulsar-IO Oracle Source
+on:
+  pull_request:
+    branches:
+      - master
+  push:
+    branches:
+      - branch-*
+
+env:
+  MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
+
+jobs:
+
+  pulsar-io:
+    name:
+    runs-on: ubuntu-latest
+    timeout-minutes: 120
+
+    steps:
+      - name: checkout
+        uses: actions/checkout@v2
+
+      - name: Tune Runner VM
+        uses: ./.github/actions/tune-runner-vm
+
+      - name: Detect changed files
+        id:   changes
+        uses: apache/pulsar-test-infra/paths-filter@master
+        with:
+          filters: .github/changes-filter.yaml
+
+      - name: Check changed files
+        id: check_changes
+        run: echo "::set-output name=docs_only::${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}"
+
+      - name: Cache local Maven repository
+        if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+        uses: actions/cache@v2
+        with:
+          path: |
+            ~/.m2/repository/*/*/*
+            !~/.m2/repository/org/apache/pulsar
+          key: ${{ runner.os }}-m2-dependencies-all-${{ hashFiles('**/pom.xml') }}
+          restore-keys: |
+            ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }}
+            ${{ runner.os }}-m2-dependencies-core-modules-
+
+      - name: Set up JDK 11
+        uses: actions/setup-java@v2
+        if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+        with:
+          distribution: 'adopt'
+          java-version: 11
+
+      - name: clean disk
+        if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+        run: |
+          sudo swapoff -a
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo apt clean
+          docker rmi $(docker images -q) -f
+          df -h
+
+      - name: run install by skip tests
+        if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+        run: mvn -q -B -ntp clean install -DskipTests
+
+      - name: build pulsar image
+        if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+        run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker,-main -DskipTests -Ddocker.nocache=true
+
+      - name: build pulsar-all image
+        if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+        run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker,-main -DskipTests -Ddocker.nocache=true
+
+      - name: build artifacts and docker image
+        if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+        run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker,-main -DskipTests
+
+      - name: run integration tests
+        if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+        run: ./build/run_integration_group.sh PULSAR_IO_ORA
+
+      - name: Upload container logs
+        uses: actions/upload-artifact@v2
+        if: ${{ cancelled() || failure() }}
+        continue-on-error: true
+        with:
+          name: container-logs
+          path: tests/integration/target/container-logs
+
+      - name: Upload surefire-reports
+        uses: actions/upload-artifact@v2
+        if: ${{ cancelled() || failure() }}
+        continue-on-error: true
+        with:
+          name: surefire-reports
+          path: tests/integration/target/surefire-reports
diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh
index b589202..8639c43 100755
--- a/build/run_integration_group.sh
+++ b/build/run_integration_group.sh
@@ -129,10 +129,16 @@ test_group_sql() {
 }
 
 test_group_pulsar_io() {
-  mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-io-suite.xml -DintegrationTests -Dgroups=source
-  #mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-io-suite.xml -DintegrationTests -Dgroups=sink
+  mvn_run_integration_test --no-retry "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -DintegrationTests -Dgroups=source
+  #mvn_run_integration_test --no-retry "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -DintegrationTests -Dgroups=sink
 }
 
+test_group_pulsar_io_ora() {
+  mvn_run_integration_test --no-retry "$@" -DintegrationTestSuiteFile=pulsar-io-ora-source.xml -DintegrationTests -Dgroups=source -DtestRetryCount=0
+}
+
+
+
 echo "Test Group : $TEST_GROUP"
 test_group_function_name="test_group_$(echo "$TEST_GROUP" | tr '[:upper:]' '[:lower:]')"
 if [[ "$(LC_ALL=C type -t $test_group_function_name)" == "function" ]]; then
diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index b2f6608..8a82754 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -70,6 +70,7 @@
     <file><source>${basedir}/../../pulsar-io/mongo/target/pulsar-io-mongo-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/debezium/mysql/target/pulsar-io-debezium-mysql-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/debezium/postgres/target/pulsar-io-debezium-postgres-${project.version}.nar</source></file>
+    <file><source>${basedir}/../../pulsar-io/debezium/oracle/target/pulsar-io-debezium-oracle-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/debezium/mongodb/target/pulsar-io-debezium-mongodb-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/influxdb/target/pulsar-io-influxdb-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/redis/target/pulsar-io-redis-${project.version}.nar</source></file>
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/oracle/pom.xml
similarity index 56%
copy from pulsar-io/debezium/pom.xml
copy to pulsar-io/debezium/oracle/pom.xml
index 6600a7d..950c46a 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/oracle/pom.xml
@@ -19,23 +19,40 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <packaging>pom</packaging>
   <parent>
     <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar-io</artifactId>
+    <artifactId>pulsar-io-debezium</artifactId>
     <version>2.9.0-SNAPSHOT</version>
   </parent>
 
-  <artifactId>pulsar-io-debezium</artifactId>
-  <name>Pulsar IO :: Debezium</name>
+  <artifactId>pulsar-io-debezium-oracle</artifactId>
+  <name>Pulsar IO :: Debezium :: oracle</name>
 
-  <modules>
-    <module>core</module>
-    <module>mysql</module>
-    <module>postgres</module>
-    <module>mongodb</module>
-  </modules>
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-debezium-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.debezium</groupId>
+      <artifactId>debezium-connector-oracle</artifactId>
+      <version>${debezium.version}</version>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 
 </project>
diff --git a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
new file mode 100644
index 0000000..74a25fb
--- /dev/null
+++ b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium.oracle;
+
+import java.util.Map;
+
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.pulsar.io.debezium.DebeziumSource;
+
+
+/**
+ * A pulsar source that runs debezium oracle source
+ */
+public class DebeziumOracleSource extends DebeziumSource {
+    private static final String DEFAULT_TASK = "io.debezium.connector.oracle.OracleConnectorTask";
+
+    @Override
+    public void setDbConnectorTask(Map<String, Object> config) throws Exception {
+        throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
+    }
+}
diff --git a/pulsar-io/debezium/oracle/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/debezium/oracle/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..de1c0b4
--- /dev/null
+++ b/pulsar-io/debezium/oracle/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: debezium-oracle
+description: Debezium Oracle Source
+sourceClass: org.apache.pulsar.io.debezium.oracle.DebeziumOracleSource
diff --git a/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml b/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
new file mode 100644
index 0000000..94173d6
--- /dev/null
+++ b/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+tenant: "public"
+namespace: "default"
+name: "debezium-oracle-source"
+topicName: "debezium-oracle-topic"
+archive: "connectors/pulsar-io-debezium-oracle-2.9.0-SNAPSHOT.nar"
+
+parallelism: 1
+
+configs:
+  ## config for Oracle XE, docker image: https://github.com/MaksymBilenko/docker-oracle-12c
+  ## docker run -d -p 1521:1521 quay.io/maksymbilenko/oracle-12c
+  database.hostname: "localhost"
+  database.port: "1521"
+  database.user: "sysdba"
+  database.password: "oracle"
+  database.dbname: "XE"
+  database.server.name: "XE"
+
+  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index 6600a7d..330ccbe 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -36,6 +36,7 @@
     <module>mysql</module>
     <module>postgres</module>
     <module>mongodb</module>
+    <module>oracle</module>
   </modules>
 
 </project>
diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile
index 720bcdf..04823ea 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -107,4 +107,19 @@ COPY --from=pulsar-all /pulsar/connectors/pulsar-io-kafka-*.nar /pulsar/connecto
 COPY --from=pulsar-all /pulsar/connectors/pulsar-io-rabbitmq-*.nar /pulsar/connectors/
 COPY --from=pulsar-all /pulsar/connectors/pulsar-io-batch-data-generator-*.nar /pulsar/connectors/
 
+# download Oracle JDBC driver for Oracle Debezium Connector tests
+RUN mkdir -p META-INF/bundled-dependencies
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/ojdbc8/19.3.0.0/ojdbc8-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/ucp/19.3.0.0/ucp-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/oraclepki/19.3.0.0/oraclepki-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/osdt_cert/19.3.0.0/osdt_cert-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/osdt_core/19.3.0.0/osdt_core-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/simplefan/19.3.0.0/simplefan-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/orai18n/19.3.0.0/orai18n-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/xdb/19.3.0.0/xdb-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/xmlparserv2/19.3.0.0/xmlparserv2-19.3.0.0.jar
+
+RUN jar uf connectors/pulsar-io-debezium-oracle-*.nar META-INF/bundled-dependencies/ojdbc8-19.3.0.0.jar META-INF/bundled-dependencies/ucp-19.3.0.0.jar META-INF/bundled-dependencies/oraclepki-19.3.0.0.jar META-INF/bundled-dependencies/osdt_cert-19.3.0.0.jar META-INF/bundled-dependencies/osdt_core-19.3.0.0.jar META-INF/bundled-dependencies/simplefan-19.3.0.0.jar META-INF/bundled-dependencies/orai18n-19.3.0.0.jar META-INF/bundled-dependencies/xdb-19.3.0.0.jar META-INF/bundled-dependencies/x [...]
+
+
 CMD bash
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 3896777..06f8561 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -92,6 +92,18 @@ public class ChaosContainer<SelfT extends ChaosContainer<SelfT>> extends Generic
         return DockerUtils.runCommandAsync(client, dockerId, commands);
     }
 
+    public ContainerExecResult execCmdAsUser(String userId, String... commands) throws Exception {
+        DockerClient client = this.getDockerClient();
+        String dockerId = this.getContainerId();
+        return DockerUtils.runCommandAsUser(userId, client, dockerId, commands);
+    }
+
+    public CompletableFuture<ContainerExecResult> execCmdAsyncAsUser(String userId, String... commands) throws Exception {
+        DockerClient client = this.getDockerClient();
+        String dockerId = this.getContainerId();
+        return DockerUtils.runCommandAsyncAsUser(userId, client, dockerId, commands);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof ChaosContainer)) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java
new file mode 100644
index 0000000..93fdad6
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+public class DebeziumOracleDbContainer extends ChaosContainer<DebeziumOracleDbContainer> {
+
+    public static final String NAME = "debezium-oracledb-12c";
+    static final Integer[] PORTS = { 1521 };
+
+    // https://github.com/MaksymBilenko/docker-oracle-12c
+    // Apache 2.0 license.
+    // Newer versions don't have LigMiner in XE (Standard) Edition and require Enterprise.
+    // Debezium 1.5 didn't work with 11g out of the box
+    // and it is not tested with 11.g according to https://debezium.io/releases/1.5/
+    private static final String IMAGE_NAME = "quay.io/maksymbilenko/oracle-12c:master";
+
+    public DebeziumOracleDbContainer(String clusterName) {
+        super(clusterName, IMAGE_NAME);
+    }
+
+    @Override
+    public String getContainerName() {
+        return clusterName;
+    }
+
+    @Override
+    protected void configure() {
+        super.configure();
+        this.withNetworkAliases(NAME)
+            .withExposedPorts(PORTS)
+            .withEnv("DBCA_TOTAL_MEMORY", "1024")
+            .withStartupTimeout(Duration.of(300, ChronoUnit.SECONDS))
+            .withCreateContainerCmdModifier(createContainerCmd -> {
+                createContainerCmd.withHostName(NAME);
+                createContainerCmd.withName(getContainerName());
+            })
+            .waitingFor(new HostPortWaitStrategy());
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
index f50d3be..97613cb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
@@ -51,6 +51,7 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
     protected final Map<String, Object> sourceConfig;
 
     protected int numEntriesToInsert = 1;
+    protected int numEntriesExpectAfterStart = 9;
 
     public static final Set<String> DEBEZIUM_FIELD_SET = new HashSet<String>() {{
         add("before");
@@ -113,7 +114,7 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
 
     public void validateSourceResultJson(Consumer<KeyValue<byte[], byte[]>> consumer, int number, String eventType) throws Exception {
         int recordsNumber = 0;
-        Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
+        Message<KeyValue<byte[], byte[]>> msg = consumer.receive(initialDelayForMsgReceive(), TimeUnit.SECONDS);
         while(msg != null) {
             recordsNumber ++;
             final String key = new String(msg.getValue().getKey());
@@ -135,7 +136,7 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
     public void validateSourceResultAvro(Consumer<KeyValue<GenericRecord, GenericRecord>> consumer,
                                      int number, String eventType) throws Exception {
         int recordsNumber = 0;
-        Message<KeyValue<GenericRecord, GenericRecord>> msg = consumer.receive(2, TimeUnit.SECONDS);
+        Message<KeyValue<GenericRecord, GenericRecord>> msg = consumer.receive(initialDelayForMsgReceive(), TimeUnit.SECONDS);
         while(msg != null) {
             recordsNumber ++;
             GenericRecord keyRecord = msg.getValue().getKey();
@@ -164,11 +165,15 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
         log.info("Stop {} server container. topic: {} has {} records.", getSourceType(), consumer.getTopic(), recordsNumber);
     }
 
-    public String keyContains(){
+    public int initialDelayForMsgReceive() {
+        return 2;
+    }
+
+    public String keyContains() {
         return "dbserver1.inventory.products.Key";
     }
 
-    public String valueContains(){
+    public String valueContains() {
         return "dbserver1.inventory.products.Value";
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
new file mode 100644
index 0000000..28fec95
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sources.debezium;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.DebeziumOracleDbContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.io.sources.SourceTester;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.shaded.com.google.common.base.Preconditions;
+import org.testng.util.Strings;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/**
+ * A tester for testing Debezium OracleDb source.
+ */
+@Slf4j
+public class DebeziumOracleDbSourceTester extends SourceTester<DebeziumOracleDbContainer> implements Closeable {
+
+    private static final String NAME = "debezium-oracle";
+
+    private final String pulsarServiceUrl;
+
+    @Getter
+    private DebeziumOracleDbContainer debeziumOracleDbContainer;
+
+    private final PulsarCluster pulsarCluster;
+
+    public DebeziumOracleDbSourceTester(PulsarCluster cluster) {
+        super(NAME);
+        this.pulsarCluster = cluster;
+        this.numEntriesToInsert = 1;
+        this.numEntriesExpectAfterStart = 0;
+
+        pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
+
+        sourceConfig.put("database.hostname", DebeziumOracleDbContainer.NAME);
+        sourceConfig.put("database.port", "1521");
+        sourceConfig.put("database.user", "dbzuser");
+        sourceConfig.put("database.password", "dbz");
+        sourceConfig.put("database.server.name", "XE");
+        sourceConfig.put("database.dbname", "XE");
+        sourceConfig.put("snapshot.mode", "schema_only");
+
+        sourceConfig.put("schema.include.list", "inv");
+        sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
+        sourceConfig.put("topic.namespace", "debezium/oracle");
+    }
+
+    @Override
+    public void setServiceContainer(DebeziumOracleDbContainer container) {
+        log.info("start debezium oracle server container.");
+        Preconditions.checkState(debeziumOracleDbContainer == null);
+        debeziumOracleDbContainer = container;
+        pulsarCluster.startService(DebeziumOracleDbContainer.NAME, debeziumOracleDbContainer);
+    }
+
+    @SneakyThrows
+    @Override
+    public void prepareSource() {
+        String[] minerCommands = {
+            "ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;",
+            "ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;",
+            "alter system switch logfile;"
+        };
+        String[] commands = {
+            "CREATE TABLESPACE inv DATAFILE 'tbs_inv01.dbf' SIZE 200M LOGGING;",
+            "CREATE USER inv identified by inv default tablespace inv;",
+            "GRANT CREATE TABLE TO inv;",
+            "GRANT LOCK ANY TABLE TO inv;",
+            "GRANT ALTER ANY TABLE TO inv;",
+            "GRANT CREATE SEQUENCE TO inv;",
+            "GRANT UNLIMITED TABLESPACE TO inv;",
+            "CREATE TABLE inv.customers (" +
+                "id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY," +
+                "first_name VARCHAR2(255) NOT NULL," +
+                "last_name VARCHAR2(255) NOT NULL," +
+                "email VARCHAR2(255) NOT NULL" +
+                ");",
+            "ALTER TABLE inv.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;",
+
+            // create user for debezium
+            "create role dbz_privs;",
+            "grant create session, execute_catalog_role, select any transaction, select any dictionary to dbz_privs;",
+            "grant select on SYSTEM.LOGMNR_COL$ to dbz_privs;",
+            "grant select on SYSTEM.LOGMNR_OBJ$ to dbz_privs;",
+            "grant select on SYSTEM.LOGMNR_USER$ to dbz_privs;",
+            "grant select on SYSTEM.LOGMNR_UID$ to dbz_privs;",
+            "create user dbzuser identified by dbz default tablespace users;",
+            "grant dbz_privs to dbzuser;",
+            "alter user dbzuser quota unlimited on users;",
+            "grant LOGMINING to dbz_privs;",
+
+            "GRANT CREATE SESSION TO dbzuser;",
+            "GRANT SET CONTAINER TO dbzuser;",
+            "GRANT SELECT ON V_$DATABASE to dbzuser;",
+            "GRANT FLASHBACK ANY TABLE TO dbzuser;",
+            "GRANT SELECT ANY TABLE TO dbzuser;",
+            "GRANT SELECT_CATALOG_ROLE TO dbzuser;",
+            "GRANT EXECUTE_CATALOG_ROLE TO dbzuser;",
+            "GRANT SELECT ANY TRANSACTION TO dbzuser;",
+            "GRANT LOGMINING TO dbzuser;",
+
+            "GRANT CREATE TABLE TO dbzuser;",
+            "GRANT LOCK ANY TABLE TO dbzuser;",
+            "GRANT ALTER ANY TABLE TO dbzuser;",
+            "GRANT CREATE SEQUENCE TO dbzuser;",
+
+            "GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;",
+            "GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;",
+
+            "GRANT SELECT ON V_$LOG TO dbzuser;",
+            "GRANT SELECT ON V_$LOG_HISTORY TO dbzuser;",
+            "GRANT SELECT ON V_$LOGMNR_LOGS TO dbzuser;",
+            "GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;",
+            "GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbzuser;",
+            "GRANT SELECT ON V_$LOGFILE TO dbzuser;",
+            "GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;",
+            "GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;"
+        };
+
+        // good first approximation but still not enough:
+        waitForOracleStatus("OPEN");
+        Thread.sleep(30000);
+
+        // configure logminer
+        runSqlCmd("shutdown immediate");
+
+        // good first approximation but still not enough:
+        waitForOracleStatus("ORACLE not available");
+        Thread.sleep(30000);
+
+        runSqlCmd("startup mount");
+        // good first approximation but still not enough:
+        waitForOracleStatus("MOUNTED");
+        Thread.sleep(30000);
+
+        runSqlCmd("alter database archivelog;");
+        runSqlCmd("alter database open;");
+        // good first approximation but still not enough:
+        waitForOracleStatus("OPEN");
+        Thread.sleep(30000);
+
+        for (String cmd: minerCommands) {
+            runSqlCmd(cmd);
+        }
+
+        // create user/tablespace/table
+        for (String cmd: commands) {
+            runSqlCmd(cmd);
+        }
+        // initial event
+        runSqlCmd("INSERT INTO inv.customers (first_name, last_name, email) VALUES ('James', 'Bond', 'jbond@null.dev');");
+    }
+
+    private void waitForOracleStatus(String status) throws Exception {
+        for (int i = 0; i < 1000; i++) {
+            ContainerExecResult response = runSqlCmd("SELECT INSTANCE_NAME, STATUS, DATABASE_STATUS FROM V$INSTANCE;");
+            if ((response.getStderr() != null && response.getStderr().contains(status))
+                    || (response.getStdout() != null && response.getStdout().contains(status))) {
+                return;
+            }
+            Thread.sleep(1000);
+        }
+        throw new IllegalStateException("Oracle did not initialize properly");
+    }
+
+    private ContainerExecResult runSqlCmd(String cmd) throws Exception {
+        log.info("Executing \"{}\"", cmd);
+        ContainerExecResult response = this.debeziumOracleDbContainer
+                .execCmdAsUser("oracle",
+                "/bin/bash", "-c",
+                "echo \"exit;\" | echo \""
+                        + cmd.replace("$", "\\$")
+                        + "\" | sqlplus sys/oracle as sysdba"
+                );
+        if (Strings.isNullOrEmpty(response.getStderr())) {
+            log.info("Result of \"{}\":\n{}", cmd, response.getStdout());
+        } else {
+            log.warn("Result of \"{}\":\n{}\n{}", cmd, response.getStdout(), response.getStderr());
+        }
+        return response;
+    }
+
+    @Override
+    public void prepareInsertEvent() throws Exception {
+        runSqlCmd("INSERT INTO inv.customers (first_name, last_name, email) VALUES ('John', 'Doe', 'jdoe@null.dev');");
+        runSqlCmd("SELECT * FROM inv.customers WHERE last_name='Doe';");
+    }
+
+    @Override
+    public void prepareDeleteEvent() throws Exception {
+        runSqlCmd("DELETE FROM inv.customers WHERE last_name='Doe';");
+        runSqlCmd("SELECT * FROM inv.customers WHERE last_name='Doe';");
+    }
+
+    @Override
+    public void prepareUpdateEvent() throws Exception {
+        runSqlCmd("UPDATE inv.customers SET first_name='Jack' WHERE last_name='Doe';");
+        runSqlCmd("SELECT * FROM inv.customers WHERE last_name='Doe';");
+    }
+
+    @Override
+    public Map<String, String> produceSourceMessages(int numMessages) {
+        log.info("debezium oracle server already contains preconfigured data.");
+        return null;
+    }
+
+    @Override
+    public int initialDelayForMsgReceive() {
+        // LogMiner takes a lot of time to get messages out
+        return 30;
+    }
+
+    @Override
+    public String keyContains() {
+        return "XE.INV.CUSTOMERS.Key";
+    }
+
+    @Override
+    public String valueContains() {
+        return "XE.INV.CUSTOMERS.Value";
+    }
+
+    @Override
+    public void close() {
+        if (pulsarCluster != null) {
+            PulsarCluster.stopService(DebeziumOracleDbContainer.NAME, debeziumOracleDbContainer);
+        }
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
new file mode 100644
index 0000000..b2f686f
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sources.debezium;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.tests.integration.containers.DebeziumOracleDbContainer;
+import org.apache.pulsar.tests.integration.io.PulsarIOTestBase;
+import org.testcontainers.shaded.com.google.common.collect.Sets;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class PulsarDebeziumOracleSourceTest extends PulsarIOTestBase {
+
+    protected final AtomicInteger testId = new AtomicInteger(0);
+
+    @Test(groups = "source", timeOut = 1800000)
+    public void testDebeziumOracleDbSource() throws Exception{
+        testDebeziumOracleDbConnect("org.apache.kafka.connect.json.JsonConverter", true);
+    }
+
+    private void testDebeziumOracleDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
+        final String tenant = TopicName.PUBLIC_TENANT;
+        final String namespace = TopicName.DEFAULT_NAMESPACE;
+        final String outputTopicName = "debe-output-topic-name-" + testId.getAndIncrement();
+        final String consumeTopicName = "debezium/oracle/XE.INV.CUSTOMERS";
+        final String sourceName = "test-source-debezium-oracle-" + functionRuntimeType + "-" + randomName(8);
+
+        // This is the event count to be created by prepareSource.
+        final int numMessages = 1;
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
+
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+        initNamespace(admin);
+
+        admin.topics().createNonPartitionedTopic(consumeTopicName);
+        admin.topics().createNonPartitionedTopic(outputTopicName);
+
+        @Cleanup
+        DebeziumOracleDbSourceTester sourceTester = new DebeziumOracleDbSourceTester(pulsarCluster);
+        sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
+
+        // setup debezium oracle server
+        DebeziumOracleDbContainer debeziumOracleDbContainer = new DebeziumOracleDbContainer(pulsarCluster.getClusterName());
+        sourceTester.setServiceContainer(debeziumOracleDbContainer);
+
+        PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
+                converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
+                consumeTopicName, client);
+
+        runner.testSource(sourceTester);
+    }
+
+    protected void initNamespace(PulsarAdmin admin) {
+        log.info("[initNamespace] start.");
+        try {
+            admin.tenants().createTenant("debezium", new TenantInfoImpl(Sets.newHashSet(),
+                    Sets.newHashSet(pulsarCluster.getClusterName())));
+            String [] namespaces = {
+                "debezium/oracle"
+            };
+            Policies policies = new Policies();
+            policies.retention_policies = new RetentionPolicies(-1, 50);
+            for (String ns: namespaces) {
+                admin.namespaces().createNamespace(ns, policies);
+            }
+        } catch (Exception e) {
+            log.info("[initNamespace] msg: {}", e.getMessage());
+        }
+        log.info("[initNamespace] finish.");
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
index 2e88dad..7136b89 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
@@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
@@ -192,10 +194,17 @@ public class PulsarDebeziumSourcesTest extends PulsarIOTestBase {
         try {
             admin.tenants().createTenant("debezium", new TenantInfoImpl(Sets.newHashSet(),
                     Sets.newHashSet(pulsarCluster.getClusterName())));
-            admin.namespaces().createNamespace("debezium/mysql-json");
-            admin.namespaces().createNamespace("debezium/mysql-avro");
-            admin.namespaces().createNamespace("debezium/mongodb");
-            admin.namespaces().createNamespace("debezium/postgresql");
+            String [] namespaces = {
+                "debezium/mysql-json",
+                "debezium/mysql-avro",
+                "debezium/mongodb",
+                "debezium/postgresql",
+            };
+            Policies policies = new Policies();
+            policies.retention_policies = new RetentionPolicies(-1, 50);
+            for (String ns: namespaces) {
+                admin.namespaces().createNamespace(ns, policies);
+            }
         } catch (Exception e) {
             log.info("[initNamespace] msg: {}", e.getMessage());
         }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
index 6f0bbfd..da1e759 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
@@ -89,7 +89,7 @@ public class PulsarIODebeziumSourceRunner extends PulsarIOSourceRunner {
         log.info("[debezium mysql test] create consumer finish. converterName: {}", converterClassName);
 
         // validate the source result
-        sourceTester.validateSourceResult(consumer, 9, null, converterClassName);
+        sourceTester.validateSourceResult(consumer, sourceTester.getNumEntriesExpectAfterStart(), null, converterClassName);
 
         final int numEntriesToInsert = sourceTester.getNumEntriesToInsert();
         Preconditions.checkArgument(numEntriesToInsert >= 1);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index 7c90cd3..9246c58 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -186,16 +186,52 @@ public class DockerUtils {
         }
     }
 
+    public static ContainerExecResult runCommandAsUser(String userId,
+                                                       DockerClient docker,
+                                                       String containerId,
+                                                       String... cmd)
+            throws ContainerExecException, ExecutionException, InterruptedException {
+        try {
+            return runCommandAsyncAsUser(userId, docker, containerId, cmd).get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof ContainerExecException) {
+                throw (ContainerExecException) e.getCause();
+            }
+            throw e;
+        }
+    }
+
+    public static CompletableFuture<ContainerExecResult> runCommandAsyncAsUser(String userId,
+                                                                               DockerClient dockerClient,
+                                                                               String containerId,
+                                                                               String... cmd) {
+        String execId = dockerClient.execCreateCmd(containerId)
+                .withCmd(cmd)
+                .withAttachStderr(true)
+                .withAttachStdout(true)
+                .withUser(userId)
+                .exec()
+                .getId();
+        return runCommandAsync(execId, dockerClient, containerId, cmd);
+    }
+
     public static CompletableFuture<ContainerExecResult> runCommandAsync(DockerClient dockerClient,
                                                                          String containerId,
                                                                          String... cmd) {
-        CompletableFuture<ContainerExecResult> future = new CompletableFuture<>();
         String execId = dockerClient.execCreateCmd(containerId)
                 .withCmd(cmd)
                 .withAttachStderr(true)
                 .withAttachStdout(true)
                 .exec()
                 .getId();
+        return runCommandAsync(execId, dockerClient, containerId, cmd);
+    }
+
+    private static CompletableFuture<ContainerExecResult> runCommandAsync(String execId,
+                                                                          DockerClient dockerClient,
+                                                                          String containerId,
+                                                                          String... cmd) {
+        CompletableFuture<ContainerExecResult> future = new CompletableFuture<>();
         final String containerName = getContainerName(dockerClient, containerId);
         String cmdString = String.join(" ", cmd);
         StringBuilder stdout = new StringBuilder();
diff --git a/tests/integration/src/test/resources/pulsar-io-suite.xml b/tests/integration/src/test/resources/pulsar-io-ora-source.xml
similarity index 69%
copy from tests/integration/src/test/resources/pulsar-io-suite.xml
copy to tests/integration/src/test/resources/pulsar-io-ora-source.xml
index 33b6817..1c5bb5f 100644
--- a/tests/integration/src/test/resources/pulsar-io-suite.xml
+++ b/tests/integration/src/test/resources/pulsar-io-ora-source.xml
@@ -19,11 +19,10 @@
 
 -->
 <!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" >
-<!-- TODO: we have to put suite files in one file to avoid executing TESTNG test suites multiple times.
-           see {@link https://github.com/cbeust/testng/issues/508} -->
-<suite name="Pulsar Test Suite" parallel="instances" thread-count="1">
-    <suite-files>
-        <suite-file path="./pulsar-io-sinks.xml" />
-        <suite-file path="./pulsar-io-sources.xml" />
-    </suite-files>
+<suite name="Pulsar IO Oracle Source Integration Tests" verbose="2" annotations="JDK">
+    <test name="pulsar-function-process-test-suite" preserve-order="true" >
+        <classes>
+            <class name="org.apache.pulsar.tests.integration.io.sources.debezium.PulsarDebeziumOracleSourceTest" />
+        </classes>
+    </test>
 </suite>
diff --git a/tests/integration/src/test/resources/pulsar-io-suite.xml b/tests/integration/src/test/resources/pulsar-io-suite.xml
index 33b6817..64db138 100644
--- a/tests/integration/src/test/resources/pulsar-io-suite.xml
+++ b/tests/integration/src/test/resources/pulsar-io-suite.xml
@@ -25,5 +25,6 @@
     <suite-files>
         <suite-file path="./pulsar-io-sinks.xml" />
         <suite-file path="./pulsar-io-sources.xml" />
+        <cuite-file path="./pulsar-io-ora-source.xml" />
     </suite-files>
 </suite>
diff --git a/tests/integration/src/test/resources/pulsar.xml b/tests/integration/src/test/resources/pulsar.xml
index a2011e4..7993d0c 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -35,5 +35,6 @@
         <suite-file path="./pulsar-backwards-compatibility.xml" />
         <suite-file path="./pulsar-io-sinks.xml" />
         <suite-file path="./pulsar-io-sources.xml" />
+        <suite-file path="./pulsar-io-ora-source.xml" />
     </suite-files>
 </suite>