You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/09/13 15:51:10 UTC
[pulsar] branch master updated: Debezium Oracle Source (#11520)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e2bc52d Debezium Oracle Source (#11520)
e2bc52d is described below
commit e2bc52d40450fa00af258c4432a5b71d50a5c6e0
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Mon Sep 13 08:50:18 2021 -0700
Debezium Oracle Source (#11520)
---
.../workflows/ci-integration-pulsar-io-ora.yaml | 118 ++++++++++
build/run_integration_group.sh | 10 +-
distribution/io/src/assemble/io.xml | 1 +
pulsar-io/debezium/{ => oracle}/pom.xml | 39 +++-
.../io/debezium/oracle/DebeziumOracleSource.java | 37 +++
.../resources/META-INF/services/pulsar-io.yaml | 22 ++
.../resources/debezium-oracle-source-config.yaml | 38 ++++
pulsar-io/debezium/pom.xml | 1 +
.../docker-images/latest-version-image/Dockerfile | 15 ++
.../integration/containers/ChaosContainer.java | 12 +
.../containers/DebeziumOracleDbContainer.java | 62 +++++
.../tests/integration/io/sources/SourceTester.java | 13 +-
.../debezium/DebeziumOracleDbSourceTester.java | 253 +++++++++++++++++++++
.../debezium/PulsarDebeziumOracleSourceTest.java | 101 ++++++++
.../debezium/PulsarDebeziumSourcesTest.java | 17 +-
.../debezium/PulsarIODebeziumSourceRunner.java | 2 +-
.../tests/integration/utils/DockerUtils.java | 38 +++-
...ulsar-io-suite.xml => pulsar-io-ora-source.xml} | 13 +-
.../src/test/resources/pulsar-io-suite.xml | 1 +
tests/integration/src/test/resources/pulsar.xml | 1 +
20 files changed, 764 insertions(+), 30 deletions(-)
diff --git a/.github/workflows/ci-integration-pulsar-io-ora.yaml b/.github/workflows/ci-integration-pulsar-io-ora.yaml
new file mode 100644
index 0000000..8bb1cba
--- /dev/null
+++ b/.github/workflows/ci-integration-pulsar-io-ora.yaml
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: CI - Integration - Pulsar-IO Oracle Source
+on:
+ pull_request:
+ branches:
+ - master
+ push:
+ branches:
+ - branch-*
+
+env:
+ MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
+
+jobs:
+
+ pulsar-io:
+ name:
+ runs-on: ubuntu-latest
+ timeout-minutes: 120
+
+ steps:
+ - name: checkout
+ uses: actions/checkout@v2
+
+ - name: Tune Runner VM
+ uses: ./.github/actions/tune-runner-vm
+
+ - name: Detect changed files
+ id: changes
+ uses: apache/pulsar-test-infra/paths-filter@master
+ with:
+ filters: .github/changes-filter.yaml
+
+ - name: Check changed files
+ id: check_changes
+ run: echo "::set-output name=docs_only::${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}"
+
+ - name: Cache local Maven repository
+ if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+ uses: actions/cache@v2
+ with:
+ path: |
+ ~/.m2/repository/*/*/*
+ !~/.m2/repository/org/apache/pulsar
+ key: ${{ runner.os }}-m2-dependencies-all-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }}
+ ${{ runner.os }}-m2-dependencies-core-modules-
+
+ - name: Set up JDK 11
+ uses: actions/setup-java@v2
+ if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+ with:
+ distribution: 'adopt'
+ java-version: 11
+
+ - name: clean disk
+ if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+ run: |
+ sudo swapoff -a
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+ sudo apt clean
+ docker rmi $(docker images -q) -f
+ df -h
+
+ - name: run install by skip tests
+ if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+ run: mvn -q -B -ntp clean install -DskipTests
+
+ - name: build pulsar image
+ if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+ run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker,-main -DskipTests -Ddocker.nocache=true
+
+ - name: build pulsar-all image
+ if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+ run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker,-main -DskipTests -Ddocker.nocache=true
+
+ - name: build artifacts and docker image
+ if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+ run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker,-main -DskipTests
+
+ - name: run integration tests
+ if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+ run: ./build/run_integration_group.sh PULSAR_IO_ORA
+
+ - name: Upload container logs
+ uses: actions/upload-artifact@v2
+ if: ${{ cancelled() || failure() }}
+ continue-on-error: true
+ with:
+ name: container-logs
+ path: tests/integration/target/container-logs
+
+ - name: Upload surefire-reports
+ uses: actions/upload-artifact@v2
+ if: ${{ cancelled() || failure() }}
+ continue-on-error: true
+ with:
+ name: surefire-reports
+ path: tests/integration/target/surefire-reports
diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh
index b589202..8639c43 100755
--- a/build/run_integration_group.sh
+++ b/build/run_integration_group.sh
@@ -129,10 +129,16 @@ test_group_sql() {
}
test_group_pulsar_io() {
- mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-io-suite.xml -DintegrationTests -Dgroups=source
- #mvn_run_integration_test --retry "$@" -DintegrationTestSuiteFile=pulsar-io-suite.xml -DintegrationTests -Dgroups=sink
+ mvn_run_integration_test --no-retry "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -DintegrationTests -Dgroups=source
+ #mvn_run_integration_test --no-retry "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -DintegrationTests -Dgroups=sink
}
+test_group_pulsar_io_ora() {
+ mvn_run_integration_test --no-retry "$@" -DintegrationTestSuiteFile=pulsar-io-ora-source.xml -DintegrationTests -Dgroups=source -DtestRetryCount=0
+}
+
+
+
echo "Test Group : $TEST_GROUP"
test_group_function_name="test_group_$(echo "$TEST_GROUP" | tr '[:upper:]' '[:lower:]')"
if [[ "$(LC_ALL=C type -t $test_group_function_name)" == "function" ]]; then
diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index b2f6608..8a82754 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -70,6 +70,7 @@
<file><source>${basedir}/../../pulsar-io/mongo/target/pulsar-io-mongo-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/mysql/target/pulsar-io-debezium-mysql-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/postgres/target/pulsar-io-debezium-postgres-${project.version}.nar</source></file>
+ <file><source>${basedir}/../../pulsar-io/debezium/oracle/target/pulsar-io-debezium-oracle-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/mongodb/target/pulsar-io-debezium-mongodb-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/influxdb/target/pulsar-io-influxdb-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/redis/target/pulsar-io-redis-${project.version}.nar</source></file>
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/oracle/pom.xml
similarity index 56%
copy from pulsar-io/debezium/pom.xml
copy to pulsar-io/debezium/oracle/pom.xml
index 6600a7d..950c46a 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/oracle/pom.xml
@@ -19,23 +19,40 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
<parent>
<groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-io</artifactId>
+ <artifactId>pulsar-io-debezium</artifactId>
<version>2.9.0-SNAPSHOT</version>
</parent>
- <artifactId>pulsar-io-debezium</artifactId>
- <name>Pulsar IO :: Debezium</name>
+ <artifactId>pulsar-io-debezium-oracle</artifactId>
+ <name>Pulsar IO :: Debezium :: oracle</name>
- <modules>
- <module>core</module>
- <module>mysql</module>
- <module>postgres</module>
- <module>mongodb</module>
- </modules>
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-debezium-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-oracle</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
new file mode 100644
index 0000000..74a25fb
--- /dev/null
+++ b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium.oracle;
+
+import java.util.Map;
+
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.pulsar.io.debezium.DebeziumSource;
+
+
+/**
+ * A pulsar source that runs debezium oracle source
+ */
+public class DebeziumOracleSource extends DebeziumSource {
+ private static final String DEFAULT_TASK = "io.debezium.connector.oracle.OracleConnectorTask";
+
+ @Override
+ public void setDbConnectorTask(Map<String, Object> config) throws Exception {
+ throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
+ }
+}
diff --git a/pulsar-io/debezium/oracle/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/debezium/oracle/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..de1c0b4
--- /dev/null
+++ b/pulsar-io/debezium/oracle/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: debezium-oracle
+description: Debezium Oracle Source
+sourceClass: org.apache.pulsar.io.debezium.oracle.DebeziumOracleSource
diff --git a/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml b/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
new file mode 100644
index 0000000..94173d6
--- /dev/null
+++ b/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+tenant: "public"
+namespace: "default"
+name: "debezium-oracle-source"
+topicName: "debezium-oracle-topic"
+archive: "connectors/pulsar-io-debezium-oracle-2.9.0-SNAPSHOT.nar"
+
+parallelism: 1
+
+configs:
+ ## config for Oracle XE, docker image: https://github.com/MaksymBilenko/docker-oracle-12c
+ ## docker run -d -p 1521:1521 quay.io/maksymbilenko/oracle-12c
+ database.hostname: "localhost"
+ database.port: "1521"
+ database.user: "sysdba"
+ database.password: "oracle"
+ database.dbname: "XE"
+ database.server.name: "XE"
+
+ database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index 6600a7d..330ccbe 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -36,6 +36,7 @@
<module>mysql</module>
<module>postgres</module>
<module>mongodb</module>
+ <module>oracle</module>
</modules>
</project>
diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile
index 720bcdf..04823ea 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -107,4 +107,19 @@ COPY --from=pulsar-all /pulsar/connectors/pulsar-io-kafka-*.nar /pulsar/connecto
COPY --from=pulsar-all /pulsar/connectors/pulsar-io-rabbitmq-*.nar /pulsar/connectors/
COPY --from=pulsar-all /pulsar/connectors/pulsar-io-batch-data-generator-*.nar /pulsar/connectors/
+# download Oracle JDBC driver for Oracle Debezium Connector tests
+RUN mkdir -p META-INF/bundled-dependencies
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/ojdbc8/19.3.0.0/ojdbc8-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/ucp/19.3.0.0/ucp-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/oraclepki/19.3.0.0/oraclepki-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/osdt_cert/19.3.0.0/osdt_cert-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/osdt_core/19.3.0.0/osdt_core-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/simplefan/19.3.0.0/simplefan-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/orai18n/19.3.0.0/orai18n-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/xdb/19.3.0.0/xdb-19.3.0.0.jar
+RUN cd META-INF/bundled-dependencies && curl -sSLO https://search.maven.org/remotecontent?filepath=com/oracle/ojdbc/xmlparserv2/19.3.0.0/xmlparserv2-19.3.0.0.jar
+
+RUN jar uf connectors/pulsar-io-debezium-oracle-*.nar META-INF/bundled-dependencies/ojdbc8-19.3.0.0.jar META-INF/bundled-dependencies/ucp-19.3.0.0.jar META-INF/bundled-dependencies/oraclepki-19.3.0.0.jar META-INF/bundled-dependencies/osdt_cert-19.3.0.0.jar META-INF/bundled-dependencies/osdt_core-19.3.0.0.jar META-INF/bundled-dependencies/simplefan-19.3.0.0.jar META-INF/bundled-dependencies/orai18n-19.3.0.0.jar META-INF/bundled-dependencies/xdb-19.3.0.0.jar META-INF/bundled-dependencies/x [...]
+
+
CMD bash
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 3896777..06f8561 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -92,6 +92,18 @@ public class ChaosContainer<SelfT extends ChaosContainer<SelfT>> extends Generic
return DockerUtils.runCommandAsync(client, dockerId, commands);
}
+ public ContainerExecResult execCmdAsUser(String userId, String... commands) throws Exception {
+ DockerClient client = this.getDockerClient();
+ String dockerId = this.getContainerId();
+ return DockerUtils.runCommandAsUser(userId, client, dockerId, commands);
+ }
+
+ public CompletableFuture<ContainerExecResult> execCmdAsyncAsUser(String userId, String... commands) throws Exception {
+ DockerClient client = this.getDockerClient();
+ String dockerId = this.getContainerId();
+ return DockerUtils.runCommandAsyncAsUser(userId, client, dockerId, commands);
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof ChaosContainer)) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java
new file mode 100644
index 0000000..93fdad6
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+public class DebeziumOracleDbContainer extends ChaosContainer<DebeziumOracleDbContainer> {
+
+ public static final String NAME = "debezium-oracledb-12c";
+ static final Integer[] PORTS = { 1521 };
+
+ // https://github.com/MaksymBilenko/docker-oracle-12c
+ // Apache 2.0 license.
+ // Newer versions don't have LigMiner in XE (Standard) Edition and require Enterprise.
+ // Debezium 1.5 didn't work with 11g out of the box
+ // and it is not tested with 11.g according to https://debezium.io/releases/1.5/
+ private static final String IMAGE_NAME = "quay.io/maksymbilenko/oracle-12c:master";
+
+ public DebeziumOracleDbContainer(String clusterName) {
+ super(clusterName, IMAGE_NAME);
+ }
+
+ @Override
+ public String getContainerName() {
+ return clusterName;
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ this.withNetworkAliases(NAME)
+ .withExposedPorts(PORTS)
+ .withEnv("DBCA_TOTAL_MEMORY", "1024")
+ .withStartupTimeout(Duration.of(300, ChronoUnit.SECONDS))
+ .withCreateContainerCmdModifier(createContainerCmd -> {
+ createContainerCmd.withHostName(NAME);
+ createContainerCmd.withName(getContainerName());
+ })
+ .waitingFor(new HostPortWaitStrategy());
+ }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
index f50d3be..97613cb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
@@ -51,6 +51,7 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
protected final Map<String, Object> sourceConfig;
protected int numEntriesToInsert = 1;
+ protected int numEntriesExpectAfterStart = 9;
public static final Set<String> DEBEZIUM_FIELD_SET = new HashSet<String>() {{
add("before");
@@ -113,7 +114,7 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
public void validateSourceResultJson(Consumer<KeyValue<byte[], byte[]>> consumer, int number, String eventType) throws Exception {
int recordsNumber = 0;
- Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
+ Message<KeyValue<byte[], byte[]>> msg = consumer.receive(initialDelayForMsgReceive(), TimeUnit.SECONDS);
while(msg != null) {
recordsNumber ++;
final String key = new String(msg.getValue().getKey());
@@ -135,7 +136,7 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
public void validateSourceResultAvro(Consumer<KeyValue<GenericRecord, GenericRecord>> consumer,
int number, String eventType) throws Exception {
int recordsNumber = 0;
- Message<KeyValue<GenericRecord, GenericRecord>> msg = consumer.receive(2, TimeUnit.SECONDS);
+ Message<KeyValue<GenericRecord, GenericRecord>> msg = consumer.receive(initialDelayForMsgReceive(), TimeUnit.SECONDS);
while(msg != null) {
recordsNumber ++;
GenericRecord keyRecord = msg.getValue().getKey();
@@ -164,11 +165,15 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
log.info("Stop {} server container. topic: {} has {} records.", getSourceType(), consumer.getTopic(), recordsNumber);
}
- public String keyContains(){
+ public int initialDelayForMsgReceive() {
+ return 2;
+ }
+
+ public String keyContains() {
return "dbserver1.inventory.products.Key";
}
- public String valueContains(){
+ public String valueContains() {
return "dbserver1.inventory.products.Value";
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
new file mode 100644
index 0000000..28fec95
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sources.debezium;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.DebeziumOracleDbContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.io.sources.SourceTester;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.shaded.com.google.common.base.Preconditions;
+import org.testng.util.Strings;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/**
+ * A tester for testing Debezium OracleDb source.
+ */
+@Slf4j
+public class DebeziumOracleDbSourceTester extends SourceTester<DebeziumOracleDbContainer> implements Closeable {
+
+ private static final String NAME = "debezium-oracle";
+
+ private final String pulsarServiceUrl;
+
+ @Getter
+ private DebeziumOracleDbContainer debeziumOracleDbContainer;
+
+ private final PulsarCluster pulsarCluster;
+
+ public DebeziumOracleDbSourceTester(PulsarCluster cluster) {
+ super(NAME);
+ this.pulsarCluster = cluster;
+ this.numEntriesToInsert = 1;
+ this.numEntriesExpectAfterStart = 0;
+
+ pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
+
+ sourceConfig.put("database.hostname", DebeziumOracleDbContainer.NAME);
+ sourceConfig.put("database.port", "1521");
+ sourceConfig.put("database.user", "dbzuser");
+ sourceConfig.put("database.password", "dbz");
+ sourceConfig.put("database.server.name", "XE");
+ sourceConfig.put("database.dbname", "XE");
+ sourceConfig.put("snapshot.mode", "schema_only");
+
+ sourceConfig.put("schema.include.list", "inv");
+ sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
+ sourceConfig.put("topic.namespace", "debezium/oracle");
+ }
+
+ @Override
+ public void setServiceContainer(DebeziumOracleDbContainer container) {
+ log.info("start debezium oracle server container.");
+ Preconditions.checkState(debeziumOracleDbContainer == null);
+ debeziumOracleDbContainer = container;
+ pulsarCluster.startService(DebeziumOracleDbContainer.NAME, debeziumOracleDbContainer);
+ }
+
+ @SneakyThrows
+ @Override
+ public void prepareSource() {
+ String[] minerCommands = {
+ "ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;",
+ "ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;",
+ "alter system switch logfile;"
+ };
+ String[] commands = {
+ "CREATE TABLESPACE inv DATAFILE 'tbs_inv01.dbf' SIZE 200M LOGGING;",
+ "CREATE USER inv identified by inv default tablespace inv;",
+ "GRANT CREATE TABLE TO inv;",
+ "GRANT LOCK ANY TABLE TO inv;",
+ "GRANT ALTER ANY TABLE TO inv;",
+ "GRANT CREATE SEQUENCE TO inv;",
+ "GRANT UNLIMITED TABLESPACE TO inv;",
+ "CREATE TABLE inv.customers (" +
+ "id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY," +
+ "first_name VARCHAR2(255) NOT NULL," +
+ "last_name VARCHAR2(255) NOT NULL," +
+ "email VARCHAR2(255) NOT NULL" +
+ ");",
+ "ALTER TABLE inv.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;",
+
+ // create user for debezium
+ "create role dbz_privs;",
+ "grant create session, execute_catalog_role, select any transaction, select any dictionary to dbz_privs;",
+ "grant select on SYSTEM.LOGMNR_COL$ to dbz_privs;",
+ "grant select on SYSTEM.LOGMNR_OBJ$ to dbz_privs;",
+ "grant select on SYSTEM.LOGMNR_USER$ to dbz_privs;",
+ "grant select on SYSTEM.LOGMNR_UID$ to dbz_privs;",
+ "create user dbzuser identified by dbz default tablespace users;",
+ "grant dbz_privs to dbzuser;",
+ "alter user dbzuser quota unlimited on users;",
+ "grant LOGMINING to dbz_privs;",
+
+ "GRANT CREATE SESSION TO dbzuser;",
+ "GRANT SET CONTAINER TO dbzuser;",
+ "GRANT SELECT ON V_$DATABASE to dbzuser;",
+ "GRANT FLASHBACK ANY TABLE TO dbzuser;",
+ "GRANT SELECT ANY TABLE TO dbzuser;",
+ "GRANT SELECT_CATALOG_ROLE TO dbzuser;",
+ "GRANT EXECUTE_CATALOG_ROLE TO dbzuser;",
+ "GRANT SELECT ANY TRANSACTION TO dbzuser;",
+ "GRANT LOGMINING TO dbzuser;",
+
+ "GRANT CREATE TABLE TO dbzuser;",
+ "GRANT LOCK ANY TABLE TO dbzuser;",
+ "GRANT ALTER ANY TABLE TO dbzuser;",
+ "GRANT CREATE SEQUENCE TO dbzuser;",
+
+ "GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;",
+ "GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;",
+
+ "GRANT SELECT ON V_$LOG TO dbzuser;",
+ "GRANT SELECT ON V_$LOG_HISTORY TO dbzuser;",
+ "GRANT SELECT ON V_$LOGMNR_LOGS TO dbzuser;",
+ "GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;",
+ "GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbzuser;",
+ "GRANT SELECT ON V_$LOGFILE TO dbzuser;",
+ "GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;",
+ "GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;"
+ };
+
+ // good first approximation but still not enough:
+ waitForOracleStatus("OPEN");
+ Thread.sleep(30000);
+
+ // configure logminer
+ runSqlCmd("shutdown immediate");
+
+ // good first approximation but still not enough:
+ waitForOracleStatus("ORACLE not available");
+ Thread.sleep(30000);
+
+ runSqlCmd("startup mount");
+ // good first approximation but still not enough:
+ waitForOracleStatus("MOUNTED");
+ Thread.sleep(30000);
+
+ runSqlCmd("alter database archivelog;");
+ runSqlCmd("alter database open;");
+ // good first approximation but still not enough:
+ waitForOracleStatus("OPEN");
+ Thread.sleep(30000);
+
+ for (String cmd: minerCommands) {
+ runSqlCmd(cmd);
+ }
+
+ // create user/tablespace/table
+ for (String cmd: commands) {
+ runSqlCmd(cmd);
+ }
+ // initial event
+ runSqlCmd("INSERT INTO inv.customers (first_name, last_name, email) VALUES ('James', 'Bond', 'jbond@null.dev');");
+ }
+
+ private void waitForOracleStatus(String status) throws Exception {
+ for (int i = 0; i < 1000; i++) {
+ ContainerExecResult response = runSqlCmd("SELECT INSTANCE_NAME, STATUS, DATABASE_STATUS FROM V$INSTANCE;");
+ if ((response.getStderr() != null && response.getStderr().contains(status))
+ || (response.getStdout() != null && response.getStdout().contains(status))) {
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ throw new IllegalStateException("Oracle did not initialize properly");
+ }
+
+ private ContainerExecResult runSqlCmd(String cmd) throws Exception {
+ log.info("Executing \"{}\"", cmd);
+ ContainerExecResult response = this.debeziumOracleDbContainer
+ .execCmdAsUser("oracle",
+ "/bin/bash", "-c",
+ "echo \"exit;\" | echo \""
+ + cmd.replace("$", "\\$")
+ + "\" | sqlplus sys/oracle as sysdba"
+ );
+ if (Strings.isNullOrEmpty(response.getStderr())) {
+ log.info("Result of \"{}\":\n{}", cmd, response.getStdout());
+ } else {
+ log.warn("Result of \"{}\":\n{}\n{}", cmd, response.getStdout(), response.getStderr());
+ }
+ return response;
+ }
+
+ @Override
+ public void prepareInsertEvent() throws Exception {
+ runSqlCmd("INSERT INTO inv.customers (first_name, last_name, email) VALUES ('John', 'Doe', 'jdoe@null.dev');");
+ runSqlCmd("SELECT * FROM inv.customers WHERE last_name='Doe';");
+ }
+
+ @Override
+ public void prepareDeleteEvent() throws Exception {
+ runSqlCmd("DELETE FROM inv.customers WHERE last_name='Doe';");
+ runSqlCmd("SELECT * FROM inv.customers WHERE last_name='Doe';");
+ }
+
+ @Override
+ public void prepareUpdateEvent() throws Exception {
+ runSqlCmd("UPDATE inv.customers SET first_name='Jack' WHERE last_name='Doe';");
+ runSqlCmd("SELECT * FROM inv.customers WHERE last_name='Doe';");
+ }
+
+ @Override
+ public Map<String, String> produceSourceMessages(int numMessages) {
+ log.info("debezium oracle server already contains preconfigured data.");
+ return null;
+ }
+
+ @Override
+ public int initialDelayForMsgReceive() {
+ // LogMiner takes a lot of time to get messages out
+ return 30;
+ }
+
+ @Override
+ public String keyContains() {
+ return "XE.INV.CUSTOMERS.Key";
+ }
+
+ @Override
+ public String valueContains() {
+ return "XE.INV.CUSTOMERS.Value";
+ }
+
+ @Override
+ public void close() {
+ if (pulsarCluster != null) {
+ PulsarCluster.stopService(DebeziumOracleDbContainer.NAME, debeziumOracleDbContainer);
+ }
+ }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
new file mode 100644
index 0000000..b2f686f
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sources.debezium;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.tests.integration.containers.DebeziumOracleDbContainer;
+import org.apache.pulsar.tests.integration.io.PulsarIOTestBase;
+import org.testcontainers.shaded.com.google.common.collect.Sets;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class PulsarDebeziumOracleSourceTest extends PulsarIOTestBase {
+
+ protected final AtomicInteger testId = new AtomicInteger(0);
+
+ @Test(groups = "source", timeOut = 1800000)
+ public void testDebeziumOracleDbSource() throws Exception{
+ testDebeziumOracleDbConnect("org.apache.kafka.connect.json.JsonConverter", true);
+ }
+
+ private void testDebeziumOracleDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
+ final String tenant = TopicName.PUBLIC_TENANT;
+ final String namespace = TopicName.DEFAULT_NAMESPACE;
+ final String outputTopicName = "debe-output-topic-name-" + testId.getAndIncrement();
+ final String consumeTopicName = "debezium/oracle/XE.INV.CUSTOMERS";
+ final String sourceName = "test-source-debezium-oracle-" + functionRuntimeType + "-" + randomName(8);
+
+ // This is the event count to be created by prepareSource.
+ final int numMessages = 1;
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+ initNamespace(admin);
+
+ admin.topics().createNonPartitionedTopic(consumeTopicName);
+ admin.topics().createNonPartitionedTopic(outputTopicName);
+
+ @Cleanup
+ DebeziumOracleDbSourceTester sourceTester = new DebeziumOracleDbSourceTester(pulsarCluster);
+ sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
+
+ // setup debezium oracle server
+ DebeziumOracleDbContainer debeziumOracleDbContainer = new DebeziumOracleDbContainer(pulsarCluster.getClusterName());
+ sourceTester.setServiceContainer(debeziumOracleDbContainer);
+
+ PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
+ converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
+ consumeTopicName, client);
+
+ runner.testSource(sourceTester);
+ }
+
+ protected void initNamespace(PulsarAdmin admin) {
+ log.info("[initNamespace] start.");
+ try {
+ admin.tenants().createTenant("debezium", new TenantInfoImpl(Sets.newHashSet(),
+ Sets.newHashSet(pulsarCluster.getClusterName())));
+ String [] namespaces = {
+ "debezium/oracle"
+ };
+ Policies policies = new Policies();
+ policies.retention_policies = new RetentionPolicies(-1, 50);
+ for (String ns: namespaces) {
+ admin.namespaces().createNamespace(ns, policies);
+ }
+ } catch (Exception e) {
+ log.info("[initNamespace] msg: {}", e.getMessage());
+ }
+ log.info("[initNamespace] finish.");
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
index 2e88dad..7136b89 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
@@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
@@ -192,10 +194,17 @@ public class PulsarDebeziumSourcesTest extends PulsarIOTestBase {
try {
admin.tenants().createTenant("debezium", new TenantInfoImpl(Sets.newHashSet(),
Sets.newHashSet(pulsarCluster.getClusterName())));
- admin.namespaces().createNamespace("debezium/mysql-json");
- admin.namespaces().createNamespace("debezium/mysql-avro");
- admin.namespaces().createNamespace("debezium/mongodb");
- admin.namespaces().createNamespace("debezium/postgresql");
+ String [] namespaces = {
+ "debezium/mysql-json",
+ "debezium/mysql-avro",
+ "debezium/mongodb",
+ "debezium/postgresql",
+ };
+ Policies policies = new Policies();
+ policies.retention_policies = new RetentionPolicies(-1, 50);
+ for (String ns: namespaces) {
+ admin.namespaces().createNamespace(ns, policies);
+ }
} catch (Exception e) {
log.info("[initNamespace] msg: {}", e.getMessage());
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
index 6f0bbfd..da1e759 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
@@ -89,7 +89,7 @@ public class PulsarIODebeziumSourceRunner extends PulsarIOSourceRunner {
log.info("[debezium mysql test] create consumer finish. converterName: {}", converterClassName);
// validate the source result
- sourceTester.validateSourceResult(consumer, 9, null, converterClassName);
+ sourceTester.validateSourceResult(consumer, sourceTester.getNumEntriesExpectAfterStart(), null, converterClassName);
final int numEntriesToInsert = sourceTester.getNumEntriesToInsert();
Preconditions.checkArgument(numEntriesToInsert >= 1);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index 7c90cd3..9246c58 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -186,16 +186,52 @@ public class DockerUtils {
}
}
+ public static ContainerExecResult runCommandAsUser(String userId,
+ DockerClient docker,
+ String containerId,
+ String... cmd)
+ throws ContainerExecException, ExecutionException, InterruptedException {
+ try {
+ return runCommandAsyncAsUser(userId, docker, containerId, cmd).get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof ContainerExecException) {
+ throw (ContainerExecException) e.getCause();
+ }
+ throw e;
+ }
+ }
+
+ public static CompletableFuture<ContainerExecResult> runCommandAsyncAsUser(String userId,
+ DockerClient dockerClient,
+ String containerId,
+ String... cmd) {
+ String execId = dockerClient.execCreateCmd(containerId)
+ .withCmd(cmd)
+ .withAttachStderr(true)
+ .withAttachStdout(true)
+ .withUser(userId)
+ .exec()
+ .getId();
+ return runCommandAsync(execId, dockerClient, containerId, cmd);
+ }
+
public static CompletableFuture<ContainerExecResult> runCommandAsync(DockerClient dockerClient,
String containerId,
String... cmd) {
- CompletableFuture<ContainerExecResult> future = new CompletableFuture<>();
String execId = dockerClient.execCreateCmd(containerId)
.withCmd(cmd)
.withAttachStderr(true)
.withAttachStdout(true)
.exec()
.getId();
+ return runCommandAsync(execId, dockerClient, containerId, cmd);
+ }
+
+ private static CompletableFuture<ContainerExecResult> runCommandAsync(String execId,
+ DockerClient dockerClient,
+ String containerId,
+ String... cmd) {
+ CompletableFuture<ContainerExecResult> future = new CompletableFuture<>();
final String containerName = getContainerName(dockerClient, containerId);
String cmdString = String.join(" ", cmd);
StringBuilder stdout = new StringBuilder();
diff --git a/tests/integration/src/test/resources/pulsar-io-suite.xml b/tests/integration/src/test/resources/pulsar-io-ora-source.xml
similarity index 69%
copy from tests/integration/src/test/resources/pulsar-io-suite.xml
copy to tests/integration/src/test/resources/pulsar-io-ora-source.xml
index 33b6817..1c5bb5f 100644
--- a/tests/integration/src/test/resources/pulsar-io-suite.xml
+++ b/tests/integration/src/test/resources/pulsar-io-ora-source.xml
@@ -19,11 +19,10 @@
-->
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" >
-<!-- TODO: we have to put suite files in one file to avoid executing TESTNG test suites multiple times.
- see {@link https://github.com/cbeust/testng/issues/508} -->
-<suite name="Pulsar Test Suite" parallel="instances" thread-count="1">
- <suite-files>
- <suite-file path="./pulsar-io-sinks.xml" />
- <suite-file path="./pulsar-io-sources.xml" />
- </suite-files>
+<suite name="Pulsar IO Oracle Source Integration Tests" verbose="2" annotations="JDK">
+ <test name="pulsar-function-process-test-suite" preserve-order="true" >
+ <classes>
+ <class name="org.apache.pulsar.tests.integration.io.sources.debezium.PulsarDebeziumOracleSourceTest" />
+ </classes>
+ </test>
</suite>
diff --git a/tests/integration/src/test/resources/pulsar-io-suite.xml b/tests/integration/src/test/resources/pulsar-io-suite.xml
index 33b6817..64db138 100644
--- a/tests/integration/src/test/resources/pulsar-io-suite.xml
+++ b/tests/integration/src/test/resources/pulsar-io-suite.xml
@@ -25,5 +25,6 @@
<suite-files>
<suite-file path="./pulsar-io-sinks.xml" />
<suite-file path="./pulsar-io-sources.xml" />
+ <cuite-file path="./pulsar-io-ora-source.xml" />
</suite-files>
</suite>
diff --git a/tests/integration/src/test/resources/pulsar.xml b/tests/integration/src/test/resources/pulsar.xml
index a2011e4..7993d0c 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -35,5 +35,6 @@
<suite-file path="./pulsar-backwards-compatibility.xml" />
<suite-file path="./pulsar-io-sinks.xml" />
<suite-file path="./pulsar-io-sources.xml" />
+ <suite-file path="./pulsar-io-ora-source.xml" />
</suite-files>
</suite>