You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/24 06:16:08 UTC

[pulsar] branch master updated: Debezium: add PulsarDatabaseHistory for debezium (#2614)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new adaae57  Debezium: add PulsarDatabaseHistory for debezium (#2614)
adaae57 is described below

commit adaae57d45123d3fb49fc5d7e11c08e8be1f6f68
Author: Jia Zhai <ji...@users.noreply.github.com>
AuthorDate: Mon Sep 24 14:16:02 2018 +0800

    Debezium: add PulsarDatabaseHistory for debezium (#2614)
    
    ### Motivation
    
    add PulsarDatabaseHistory for debezium
    
    ### Modifications
    
    add PulsarDatabaseHistory for debezium and test for it.
    
    ### Result
    
    ut pass
---
 pom.xml                                            |   3 +-
 pulsar-io/debezium/pom.xml                         | 102 ++++++++
 .../pulsar/io/debezium/PulsarDatabaseHistory.java  | 256 +++++++++++++++++++++
 .../io/debezium/PulsarDatabaseHistoryTest.java     | 222 ++++++++++++++++++
 pulsar-io/pom.xml                                  |   1 +
 5 files changed, 583 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index a8d68a4..83b83fe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -177,6 +177,7 @@ flexible messaging model and an intuitive client API.</description>
     <presto.version>0.206</presto.version>
     <flink.version>1.6.0</flink.version>
     <scala.binary.version>2.11</scala.binary.version>
+    <debezium-core.version>0.8.2</debezium-core.version>
 
     <!-- test dependencies -->
     <arquillian-cube.version>1.15.1</arquillian-cube.version>
@@ -1348,7 +1349,7 @@ flexible messaging model and an intuitive client API.</description>
     <profile>
       <id>docker</id>
     </profile>
-    
+
     <profile>
       <!-- Checks style and licensing requirements. This is a good
            idea to run for contributions and for the release process. While it would
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
new file mode 100644
index 0000000..5fefa43
--- /dev/null
+++ b/pulsar-io/debezium/pom.xml
@@ -0,0 +1,102 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-debezium</artifactId>
+  <name>Pulsar IO :: Debezium</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.debezium</groupId>
+      <artifactId>debezium-core</artifactId>
+      <version>${debezium-core.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.binary.version}</artifactId>
+      <version>${kafka-client.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>managed-ledger-original</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-zookeeper-utils</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+
+</project>
diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
new file mode 100644
index 0000000..bc97fc6
--- /dev/null
+++ b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+import io.debezium.annotation.ThreadSafe;
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.document.DocumentReader;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified topic,
+ * and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic.
+ */
+@Slf4j
+@ThreadSafe
+public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
+
+    public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic")
+        .withDisplayName("Database history topic name")
+        .withType(Type.STRING)
+        .withWidth(Width.LONG)
+        .withImportance(Importance.HIGH)
+        .withDescription("The name of the topic for the database schema history")
+        .withValidation(Field::isRequired);
+
+    public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url")
+        .withDisplayName("Kafka broker addresses")
+        .withType(Type.STRING)
+        .withWidth(Width.LONG)
+        .withImportance(Importance.HIGH)
+        .withDescription("Pulsar service url")
+        .withValidation(Field::isRequired);
+
+    public static Field.Set ALL_FIELDS = Field.setOf(
+        TOPIC,
+        SERVICE_URL,
+        DatabaseHistory.NAME);
+
+    private final DocumentReader reader = DocumentReader.defaultReader();
+    private String topicName;
+    private String serviceUrl;
+    private String dbHistoryName;
+    private volatile PulsarClient pulsarClient;
+    private volatile Producer<String> producer;
+
+
+    @Override
+    public void configure(Configuration config, HistoryRecordComparator comparator) {
+        super.configure(config, comparator);
+        if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
+            throw new IllegalArgumentException("Error configuring an instance of "
+                + getClass().getSimpleName() + "; check the logs for details");
+        }
+        this.topicName = config.getString(TOPIC);
+        this.serviceUrl = config.getString(SERVICE_URL);
+        // Copy the relevant portions of the configuration and add useful defaults ...
+        this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
+
+        log.info("Configure to store the debezium database history {} to pulsar topic {} at {}",
+            dbHistoryName, topicName, serviceUrl);
+    }
+
+    @Override
+    public void initializeStorage() {
+        super.initializeStorage();
+
+        // try simple to publish an empty string to create topic
+        try (Producer<String> p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
+            p.send("");
+        } catch (PulsarClientException pce) {
+            log.error("Failed to initialize storage", pce);
+            throw new RuntimeException("Failed to initialize storage", pce);
+        }
+    }
+
+    void setupClientIfNeeded() {
+        if (null == this.pulsarClient) {
+            try {
+                pulsarClient = PulsarClient.builder()
+                    .serviceUrl(serviceUrl)
+                    .build();
+            } catch (PulsarClientException e) {
+                throw new RuntimeException("Failed to create pulsar client to pulsar cluster at "
+                    + serviceUrl, e);
+            }
+        }
+    }
+
+    void setupProducerIfNeeded() {
+        setupClientIfNeeded();
+        if (null == this.producer) {
+            try {
+                this.producer = pulsarClient.newProducer(Schema.STRING)
+                    .topic(topicName)
+                    .producerName(dbHistoryName)
+                    .blockIfQueueFull(true)
+                    .create();
+            } catch (PulsarClientException e) {
+                log.error("Failed to create pulsar producer to topic '{}' at cluster '{}'", topicName, serviceUrl);
+                throw new RuntimeException("Failed to create pulsar producer to topic '"
+                    + topicName + "' at cluster '" + serviceUrl + "'", e);
+            }
+        }
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        setupProducerIfNeeded();
+    }
+
+    @Override
+    protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
+        if (this.producer == null) {
+            throw new IllegalStateException("No producer is available. Ensure that 'start()'" +
+                " is called before storing database history records.");
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("Storing record into database history: {}", record);
+        }
+        try {
+            producer.send(record.toString());
+        } catch (PulsarClientException e) {
+            throw new DatabaseHistoryException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (this.producer != null) {
+                try {
+                    producer.flush();
+                } catch (PulsarClientException pce) {
+                    // ignore the error to ensure the client is eventually closed
+                } finally {
+                    this.producer.close();
+                }
+                this.producer = null;
+            }
+            if (this.pulsarClient != null) {
+                pulsarClient.close();
+                this.pulsarClient = null;
+            }
+        } catch (PulsarClientException pe) {
+            log.warn("Failed to closing pulsar client", pe);
+        }
+    }
+
+    @Override
+    protected void recoverRecords(Consumer<HistoryRecord> records) {
+        setupClientIfNeeded();
+        try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
+            .topic(topicName)
+            .startMessageId(MessageId.earliest)
+            .create()
+        ) {
+            log.info("Scanning the database history topic '{}'", topicName);
+
+            // Read all messages in the topic ...
+            MessageId lastProcessedMessageId = null;
+
+            // read the topic until the end
+            while (historyReader.hasMessageAvailable()) {
+                Message<String> msg = historyReader.readNext();
+                try {
+                    if (null == lastProcessedMessageId || lastProcessedMessageId.compareTo(msg.getMessageId()) < 0) {
+                        if (!isBlank(msg.getValue())) {
+                            HistoryRecord recordObj = new HistoryRecord(reader.read(msg.getValue()));
+                            if (log.isTraceEnabled()) {
+                                log.trace("Recovering database history: {}", recordObj);
+                            }
+                            if (recordObj == null || !recordObj.isValid()) {
+                                log.warn("Skipping invalid database history record '{}'. " +
+                                        "This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
+                                    recordObj, topicName);
+                            } else {
+                                records.accept(recordObj);
+                                log.trace("Recovered database history: {}", recordObj);
+                            }
+                        }
+                        lastProcessedMessageId = msg.getMessageId();
+                    }
+                } catch (IOException ioe) {
+                    log.error("Error while deserializing history record '{}'", msg.getValue(), ioe);
+                } catch (final Exception e) {
+                    throw e;
+                }
+            }
+            log.info("Successfully completed scanning the database history topic '{}'", topicName);
+        } catch (IOException ioe) {
+            log.error("Encountered issues on recovering history records", ioe);
+            throw new RuntimeException("Encountered issues on recovering history records", ioe);
+        }
+    }
+
+    @Override
+    public boolean exists() {
+        setupClientIfNeeded();
+        try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
+            .topic(topicName)
+            .startMessageId(MessageId.earliest)
+            .create()
+        ) {
+            return historyReader.hasMessageAvailable();
+        } catch (IOException e) {
+            log.error("Encountered issues on checking existence of database history", e);
+            throw new RuntimeException("Encountered issues on checking existence of database history", e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        if (topicName != null) {
+            return "Pulsar topic (" + topicName + ") at " + serviceUrl;
+        }
+        return "Pulsar topic";
+    }
+}
diff --git a/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
new file mode 100644
index 0000000..e3b4fd9
--- /dev/null
+++ b/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium;
+
+import static org.junit.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import io.debezium.config.Configuration;
+import io.debezium.relational.Tables;
+import io.debezium.relational.ddl.DdlParserSql2003;
+import io.debezium.relational.ddl.LegacyDdlParser;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.text.ParsingException;
+import io.debezium.util.Collect;
+import java.util.Map;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test the implementation of {@link PulsarDatabaseHistory}.
+ */
+public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
+
+    private PulsarDatabaseHistory history;
+    private Map<String, Object> position;
+    private Map<String, String> source;
+    private String topicName;
+    private String ddl;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        source = Collect.hashMapOf("server", "my-server");
+        setLogPosition(0);
+        this.topicName = "persistent://my-property/my-ns/schema-changes-topic";
+        this.history = new PulsarDatabaseHistory();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private void testHistoryTopicContent(boolean skipUnparseableDDL) {
+        // Start up the history ...
+        Configuration config = Configuration.create()
+            .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
+            .with(PulsarDatabaseHistory.TOPIC, topicName)
+            .with(DatabaseHistory.NAME, "my-db-history")
+            .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL)
+            .build();
+        history.configure(config, null);
+        history.start();
+
+        // Should be able to call start more than once ...
+        history.start();
+
+        history.initializeStorage();
+
+        // Calling it another time to ensure we can work with the DB history topic already existing
+        history.initializeStorage();
+
+        LegacyDdlParser recoveryParser = new DdlParserSql2003();
+        LegacyDdlParser ddlParser = new DdlParserSql2003();
+        ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well
+        Tables tables1 = new Tables();
+        Tables tables2 = new Tables();
+        Tables tables3 = new Tables();
+
+        // Recover from the very beginning ...
+        setLogPosition(0);
+        history.recover(source, position, tables1, recoveryParser);
+
+        // There should have been nothing to recover ...
+        assertEquals(tables1.size(), 0);
+
+        // Now record schema changes, which writes out to kafka but doesn't actually change the Tables ...
+        setLogPosition(10);
+        ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" +
+            "CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
+            "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n";
+        history.record(source, position, "db1", ddl);
+
+        // Parse the DDL statement 3x and each time update a different Tables object ...
+        ddlParser.parse(ddl, tables1);
+        assertEquals(3, tables1.size());
+        ddlParser.parse(ddl, tables2);
+        assertEquals(3, tables2.size());
+        ddlParser.parse(ddl, tables3);
+        assertEquals(3, tables3.size());
+
+        // Record a drop statement and parse it for 2 of our 3 Tables...
+        setLogPosition(39);
+        ddl = "DROP TABLE foo;";
+        history.record(source, position, "db1", ddl);
+        ddlParser.parse(ddl, tables2);
+        assertEquals(2, tables2.size());
+        ddlParser.parse(ddl, tables3);
+        assertEquals(2, tables3.size());
+
+        // Record another DDL statement and parse it for 1 of our 3 Tables...
+        setLogPosition(10003);
+        ddl = "CREATE TABLE suppliers ( supplierId INTEGER NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL);";
+        history.record(source, position, "db1", ddl);
+        ddlParser.parse(ddl, tables3);
+        assertEquals(3, tables3.size());
+
+        // Stop the history (which should stop the producer) ...
+        history.stop();
+        history = new PulsarDatabaseHistory();
+        history.configure(config, null);
+        // no need to start
+
+        // Recover from the very beginning to just past the first change ...
+        Tables recoveredTables = new Tables();
+        setLogPosition(15);
+        history.recover(source, position, recoveredTables, recoveryParser);
+        assertEquals(recoveredTables, tables1);
+
+        // Recover from the very beginning to just past the second change ...
+        recoveredTables = new Tables();
+        setLogPosition(50);
+        history.recover(source, position, recoveredTables, recoveryParser);
+        assertEquals(recoveredTables, tables2);
+
+        // Recover from the very beginning to just past the third change ...
+        recoveredTables = new Tables();
+        setLogPosition(10010);
+        history.recover(source, position, recoveredTables, recoveryParser);
+        assertEquals(recoveredTables, tables3);
+
+        // Recover from the very beginning to way past the third change ...
+        recoveredTables = new Tables();
+        setLogPosition(100000010);
+        history.recover(source, position, recoveredTables, recoveryParser);
+        assertEquals(recoveredTables, tables3);
+    }
+
+    protected void setLogPosition(int index) {
+        this.position = Collect.hashMapOf("filename", "my-txn-file.log",
+            "position", index);
+    }
+
+    @Test
+    public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exception {
+        // Create the empty topic ...
+        testHistoryTopicContent(false);
+    }
+
+    @Test
+    public void shouldIgnoreUnparseableMessages() throws Exception {
+        try (final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+            .topic(topicName)
+            .create()
+        ) {
+            producer.send("");
+            producer.send("{\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
+            producer.send("{\"source\":{\"server\":\"my-server\"},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
+            producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"");
+            producer.send("\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
+            producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}");
+        }
+
+        testHistoryTopicContent(true);
+    }
+
+    @Test(expectedExceptions = ParsingException.class)
+    public void shouldStopOnUnparseableSQL() throws Exception {
+        try (final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
+            producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}");
+        }
+
+        testHistoryTopicContent(false);
+    }
+
+
+    @Test
+    public void testExists() {
+        // happy path
+        testHistoryTopicContent(true);
+        assertTrue(history.exists());
+
+        // Set history to use dummy topic
+        Configuration config = Configuration.create()
+            .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
+            .with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic")
+            .with(DatabaseHistory.NAME, "my-db-history")
+            .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
+            .build();
+
+        history.configure(config, null);
+        history.start();
+
+        // dummytopic should not exist yet
+        assertFalse(history.exists());
+    }
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 10c67c9..5956d62 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -43,6 +43,7 @@
     <module>jdbc</module>
     <module>data-genenator</module>
     <module>elastic-search</module>
+    <module>debezium</module>
   </modules>
 
 </project>