You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/24 06:16:08 UTC
[pulsar] branch master updated: Debezium: add PulsarDatabaseHistory
for debezium (#2614)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new adaae57 Debezium: add PulsarDatabaseHistory for debezium (#2614)
adaae57 is described below
commit adaae57d45123d3fb49fc5d7e11c08e8be1f6f68
Author: Jia Zhai <ji...@users.noreply.github.com>
AuthorDate: Mon Sep 24 14:16:02 2018 +0800
Debezium: add PulsarDatabaseHistory for debezium (#2614)
### Motivation
add PulsarDatabaseHistory for debezium
### Modifications
add PulsarDatabaseHistory for debezium and test for it.
### Result
ut pass
---
pom.xml | 3 +-
pulsar-io/debezium/pom.xml | 102 ++++++++
.../pulsar/io/debezium/PulsarDatabaseHistory.java | 256 +++++++++++++++++++++
.../io/debezium/PulsarDatabaseHistoryTest.java | 222 ++++++++++++++++++
pulsar-io/pom.xml | 1 +
5 files changed, 583 insertions(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index a8d68a4..83b83fe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -177,6 +177,7 @@ flexible messaging model and an intuitive client API.</description>
<presto.version>0.206</presto.version>
<flink.version>1.6.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
+ <debezium-core.version>0.8.2</debezium-core.version>
<!-- test dependencies -->
<arquillian-cube.version>1.15.1</arquillian-cube.version>
@@ -1348,7 +1349,7 @@ flexible messaging model and an intuitive client API.</description>
<profile>
<id>docker</id>
</profile>
-
+
<profile>
<!-- Checks style and licensing requirements. This is a good
idea to run for contributions and for the release process. While it would
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
new file mode 100644
index 0000000..5fefa43
--- /dev/null
+++ b/pulsar-io/debezium/pom.xml
@@ -0,0 +1,102 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-io-debezium</artifactId>
+ <name>Pulsar IO :: Debezium</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ <version>${debezium-core.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>${kafka-client.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-zookeeper-utils</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
new file mode 100644
index 0000000..bc97fc6
--- /dev/null
+++ b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+import io.debezium.annotation.ThreadSafe;
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.document.DocumentReader;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified topic,
+ * and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic.
+ */
+@Slf4j
+@ThreadSafe
+public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
+
+ public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic")
+ .withDisplayName("Database history topic name")
+ .withType(Type.STRING)
+ .withWidth(Width.LONG)
+ .withImportance(Importance.HIGH)
+ .withDescription("The name of the topic for the database schema history")
+ .withValidation(Field::isRequired);
+
+ public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url")
+ .withDisplayName("Kafka broker addresses")
+ .withType(Type.STRING)
+ .withWidth(Width.LONG)
+ .withImportance(Importance.HIGH)
+ .withDescription("Pulsar service url")
+ .withValidation(Field::isRequired);
+
+ public static Field.Set ALL_FIELDS = Field.setOf(
+ TOPIC,
+ SERVICE_URL,
+ DatabaseHistory.NAME);
+
+ private final DocumentReader reader = DocumentReader.defaultReader();
+ private String topicName;
+ private String serviceUrl;
+ private String dbHistoryName;
+ private volatile PulsarClient pulsarClient;
+ private volatile Producer<String> producer;
+
+
+ @Override
+ public void configure(Configuration config, HistoryRecordComparator comparator) {
+ super.configure(config, comparator);
+ if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
+ throw new IllegalArgumentException("Error configuring an instance of "
+ + getClass().getSimpleName() + "; check the logs for details");
+ }
+ this.topicName = config.getString(TOPIC);
+ this.serviceUrl = config.getString(SERVICE_URL);
+ // Copy the relevant portions of the configuration and add useful defaults ...
+ this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
+
+ log.info("Configure to store the debezium database history {} to pulsar topic {} at {}",
+ dbHistoryName, topicName, serviceUrl);
+ }
+
+ @Override
+ public void initializeStorage() {
+ super.initializeStorage();
+
+ // try simple to publish an empty string to create topic
+ try (Producer<String> p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
+ p.send("");
+ } catch (PulsarClientException pce) {
+ log.error("Failed to initialize storage", pce);
+ throw new RuntimeException("Failed to initialize storage", pce);
+ }
+ }
+
+ void setupClientIfNeeded() {
+ if (null == this.pulsarClient) {
+ try {
+ pulsarClient = PulsarClient.builder()
+ .serviceUrl(serviceUrl)
+ .build();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException("Failed to create pulsar client to pulsar cluster at "
+ + serviceUrl, e);
+ }
+ }
+ }
+
+ void setupProducerIfNeeded() {
+ setupClientIfNeeded();
+ if (null == this.producer) {
+ try {
+ this.producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .producerName(dbHistoryName)
+ .blockIfQueueFull(true)
+ .create();
+ } catch (PulsarClientException e) {
+ log.error("Failed to create pulsar producer to topic '{}' at cluster '{}'", topicName, serviceUrl);
+ throw new RuntimeException("Failed to create pulsar producer to topic '"
+ + topicName + "' at cluster '" + serviceUrl + "'", e);
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ setupProducerIfNeeded();
+ }
+
+ @Override
+ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
+ if (this.producer == null) {
+ throw new IllegalStateException("No producer is available. Ensure that 'start()'" +
+ " is called before storing database history records.");
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Storing record into database history: {}", record);
+ }
+ try {
+ producer.send(record.toString());
+ } catch (PulsarClientException e) {
+ throw new DatabaseHistoryException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ if (this.producer != null) {
+ try {
+ producer.flush();
+ } catch (PulsarClientException pce) {
+ // ignore the error to ensure the client is eventually closed
+ } finally {
+ this.producer.close();
+ }
+ this.producer = null;
+ }
+ if (this.pulsarClient != null) {
+ pulsarClient.close();
+ this.pulsarClient = null;
+ }
+ } catch (PulsarClientException pe) {
+ log.warn("Failed to closing pulsar client", pe);
+ }
+ }
+
+ @Override
+ protected void recoverRecords(Consumer<HistoryRecord> records) {
+ setupClientIfNeeded();
+ try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
+ .topic(topicName)
+ .startMessageId(MessageId.earliest)
+ .create()
+ ) {
+ log.info("Scanning the database history topic '{}'", topicName);
+
+ // Read all messages in the topic ...
+ MessageId lastProcessedMessageId = null;
+
+ // read the topic until the end
+ while (historyReader.hasMessageAvailable()) {
+ Message<String> msg = historyReader.readNext();
+ try {
+ if (null == lastProcessedMessageId || lastProcessedMessageId.compareTo(msg.getMessageId()) < 0) {
+ if (!isBlank(msg.getValue())) {
+ HistoryRecord recordObj = new HistoryRecord(reader.read(msg.getValue()));
+ if (log.isTraceEnabled()) {
+ log.trace("Recovering database history: {}", recordObj);
+ }
+ if (recordObj == null || !recordObj.isValid()) {
+ log.warn("Skipping invalid database history record '{}'. " +
+ "This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
+ recordObj, topicName);
+ } else {
+ records.accept(recordObj);
+ log.trace("Recovered database history: {}", recordObj);
+ }
+ }
+ lastProcessedMessageId = msg.getMessageId();
+ }
+ } catch (IOException ioe) {
+ log.error("Error while deserializing history record '{}'", msg.getValue(), ioe);
+ } catch (final Exception e) {
+ throw e;
+ }
+ }
+ log.info("Successfully completed scanning the database history topic '{}'", topicName);
+ } catch (IOException ioe) {
+ log.error("Encountered issues on recovering history records", ioe);
+ throw new RuntimeException("Encountered issues on recovering history records", ioe);
+ }
+ }
+
+ @Override
+ public boolean exists() {
+ setupClientIfNeeded();
+ try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
+ .topic(topicName)
+ .startMessageId(MessageId.earliest)
+ .create()
+ ) {
+ return historyReader.hasMessageAvailable();
+ } catch (IOException e) {
+ log.error("Encountered issues on checking existence of database history", e);
+ throw new RuntimeException("Encountered issues on checking existence of database history", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (topicName != null) {
+ return "Pulsar topic (" + topicName + ") at " + serviceUrl;
+ }
+ return "Pulsar topic";
+ }
+}
diff --git a/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
new file mode 100644
index 0000000..e3b4fd9
--- /dev/null
+++ b/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium;
+
+import static org.junit.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import io.debezium.config.Configuration;
+import io.debezium.relational.Tables;
+import io.debezium.relational.ddl.DdlParserSql2003;
+import io.debezium.relational.ddl.LegacyDdlParser;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.text.ParsingException;
+import io.debezium.util.Collect;
+import java.util.Map;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test the implementation of {@link PulsarDatabaseHistory}.
+ */
+public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
+
+ private PulsarDatabaseHistory history;
+ private Map<String, Object> position;
+ private Map<String, String> source;
+ private String topicName;
+ private String ddl;
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+
+ source = Collect.hashMapOf("server", "my-server");
+ setLogPosition(0);
+ this.topicName = "persistent://my-property/my-ns/schema-changes-topic";
+ this.history = new PulsarDatabaseHistory();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ private void testHistoryTopicContent(boolean skipUnparseableDDL) {
+ // Start up the history ...
+ Configuration config = Configuration.create()
+ .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
+ .with(PulsarDatabaseHistory.TOPIC, topicName)
+ .with(DatabaseHistory.NAME, "my-db-history")
+ .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL)
+ .build();
+ history.configure(config, null);
+ history.start();
+
+ // Should be able to call start more than once ...
+ history.start();
+
+ history.initializeStorage();
+
+ // Calling it another time to ensure we can work with the DB history topic already existing
+ history.initializeStorage();
+
+ LegacyDdlParser recoveryParser = new DdlParserSql2003();
+ LegacyDdlParser ddlParser = new DdlParserSql2003();
+ ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well
+ Tables tables1 = new Tables();
+ Tables tables2 = new Tables();
+ Tables tables3 = new Tables();
+
+ // Recover from the very beginning ...
+ setLogPosition(0);
+ history.recover(source, position, tables1, recoveryParser);
+
+ // There should have been nothing to recover ...
+ assertEquals(tables1.size(), 0);
+
+ // Now record schema changes, which writes out to kafka but doesn't actually change the Tables ...
+ setLogPosition(10);
+ ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" +
+ "CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
+ "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n";
+ history.record(source, position, "db1", ddl);
+
+ // Parse the DDL statement 3x and each time update a different Tables object ...
+ ddlParser.parse(ddl, tables1);
+ assertEquals(3, tables1.size());
+ ddlParser.parse(ddl, tables2);
+ assertEquals(3, tables2.size());
+ ddlParser.parse(ddl, tables3);
+ assertEquals(3, tables3.size());
+
+ // Record a drop statement and parse it for 2 of our 3 Tables...
+ setLogPosition(39);
+ ddl = "DROP TABLE foo;";
+ history.record(source, position, "db1", ddl);
+ ddlParser.parse(ddl, tables2);
+ assertEquals(2, tables2.size());
+ ddlParser.parse(ddl, tables3);
+ assertEquals(2, tables3.size());
+
+ // Record another DDL statement and parse it for 1 of our 3 Tables...
+ setLogPosition(10003);
+ ddl = "CREATE TABLE suppliers ( supplierId INTEGER NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL);";
+ history.record(source, position, "db1", ddl);
+ ddlParser.parse(ddl, tables3);
+ assertEquals(3, tables3.size());
+
+ // Stop the history (which should stop the producer) ...
+ history.stop();
+ history = new PulsarDatabaseHistory();
+ history.configure(config, null);
+ // no need to start
+
+ // Recover from the very beginning to just past the first change ...
+ Tables recoveredTables = new Tables();
+ setLogPosition(15);
+ history.recover(source, position, recoveredTables, recoveryParser);
+ assertEquals(recoveredTables, tables1);
+
+ // Recover from the very beginning to just past the second change ...
+ recoveredTables = new Tables();
+ setLogPosition(50);
+ history.recover(source, position, recoveredTables, recoveryParser);
+ assertEquals(recoveredTables, tables2);
+
+ // Recover from the very beginning to just past the third change ...
+ recoveredTables = new Tables();
+ setLogPosition(10010);
+ history.recover(source, position, recoveredTables, recoveryParser);
+ assertEquals(recoveredTables, tables3);
+
+ // Recover from the very beginning to way past the third change ...
+ recoveredTables = new Tables();
+ setLogPosition(100000010);
+ history.recover(source, position, recoveredTables, recoveryParser);
+ assertEquals(recoveredTables, tables3);
+ }
+
+ protected void setLogPosition(int index) {
+ this.position = Collect.hashMapOf("filename", "my-txn-file.log",
+ "position", index);
+ }
+
+ @Test
+ public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exception {
+ // Create the empty topic ...
+ testHistoryTopicContent(false);
+ }
+
+ @Test
+ public void shouldIgnoreUnparseableMessages() throws Exception {
+ try (final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create()
+ ) {
+ producer.send("");
+ producer.send("{\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
+ producer.send("{\"source\":{\"server\":\"my-server\"},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
+ producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"");
+ producer.send("\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
+ producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}");
+ }
+
+ testHistoryTopicContent(true);
+ }
+
+ @Test(expectedExceptions = ParsingException.class)
+ public void shouldStopOnUnparseableSQL() throws Exception {
+ try (final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
+ producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}");
+ }
+
+ testHistoryTopicContent(false);
+ }
+
+
+ @Test
+ public void testExists() {
+ // happy path
+ testHistoryTopicContent(true);
+ assertTrue(history.exists());
+
+ // Set history to use dummy topic
+ Configuration config = Configuration.create()
+ .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
+ .with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic")
+ .with(DatabaseHistory.NAME, "my-db-history")
+ .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
+ .build();
+
+ history.configure(config, null);
+ history.start();
+
+ // dummytopic should not exist yet
+ assertFalse(history.exists());
+ }
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 10c67c9..5956d62 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -43,6 +43,7 @@
<module>jdbc</module>
<module>data-genenator</module>
<module>elastic-search</module>
+ <module>debezium</module>
</modules>
</project>