You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/11 00:08:40 UTC

[GitHub] sijie closed pull request #3154: Debezium: create a simple debezium integration test

sijie closed pull request #3154: Debezium: create a simple debezium integration test
URL: https://github.com/apache/pulsar/pull/3154
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
new file mode 100644
index 0000000000..2d0613e167
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+public class DebeziumMySQLContainer extends ChaosContainer<DebeziumMySQLContainer> {
+
+    public static final String NAME = "mysql";
+    static final Integer[] PORTS = { 3306 };
+
+    private static final String IMAGE_NAME = "debezium/example-mysql:0.8";
+
+    public DebeziumMySQLContainer(String clusterName) {
+        super(clusterName, IMAGE_NAME);
+        this.withEnv("MYSQL_USER", "mysqluser");
+        this.withEnv("MYSQL_PASSWORD", "mysqlpw");
+        this.withEnv("MYSQL_ROOT_PASSWORD", "debezium");
+
+    }
+
+    @Override
+    public String getContainerName() {
+        return clusterName;
+    }
+
+    @Override
+    protected void configure() {
+        super.configure();
+        this.withNetworkAliases(NAME)
+            .withExposedPorts(PORTS)
+            .withCreateContainerCmdModifier(createContainerCmd -> {
+                createContainerCmd.withHostName(NAME);
+                createContainerCmd.withName(getContainerName());
+            })
+            .waitingFor(new HostPortWaitStrategy());
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index f100c21a13..94aad281e9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -49,6 +49,7 @@
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
+import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
@@ -100,6 +101,11 @@ public void testElasticSearchSink() throws Exception {
         testSink(new ElasticSearchSinkTester(), true);
     }
 
+    @Test
+    public void testDebeziumMySqlSource() throws Exception {
+        testDebeziumMySqlConnect();
+    }
+
     private void testSink(SinkTester tester, boolean builtin) throws Exception {
         tester.startServiceContainer(pulsarCluster);
         try {
@@ -1130,4 +1136,63 @@ private static void publishAndConsumeAvroMessages(String inputTopic,
             assertEquals("value-" + i, msg.getValue());
         }
     }
-}
\ No newline at end of file
+
+    private  void testDebeziumMySqlConnect()
+        throws Exception {
+
+        final String tenant = TopicName.PUBLIC_TENANT;
+        final String namespace = TopicName.DEFAULT_NAMESPACE;
+        final String outputTopicName = "debe-output-topic-name";
+        final String consumeTopicName = "dbserver1.inventory.products";
+        final String sourceName = "test-source-connector-"
+            + functionRuntimeType + "-name-" + randomName(8);
+
+        // This is the binlog count that contained in mysql container.
+        final int numMessages = 47;
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(consumeTopicName)
+            .subscriptionName("debezium-source-tester")
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscribe();
+
+        DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster);
+
+        // setup debezium mysql server
+        DebeziumMySQLContainer mySQLContainer = new DebeziumMySQLContainer(pulsarCluster.getClusterName());
+        sourceTester.setServiceContainer(mySQLContainer);
+
+        // prepare the testing environment for source
+        prepareSource(sourceTester);
+
+        // submit the source connector
+        submitSourceConnector(sourceTester, tenant, namespace, sourceName, outputTopicName);
+
+        // get source info
+        getSourceInfoSuccess(sourceTester, tenant, namespace, sourceName);
+
+        // get source status
+        getSourceStatus(tenant, namespace, sourceName);
+
+        // wait for source to process messages
+        waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);
+
+        // validate the source result
+        sourceTester.validateSourceResult(consumer, null);
+
+        // delete the source
+        deleteSource(tenant, namespace, sourceName);
+
+        // get source info (source should be deleted)
+        getSourceInfoNotFound(tenant, namespace, sourceName);
+
+        pulsarCluster.stopService("mysql", sourceTester.getDebeziumMySqlContainer());
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
new file mode 100644
index 0000000000..b122ccc967
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
+
+/**
+ * A tester for testing Debezium MySQL source.
+ *
+ * It reads binlog from MySQL, and store the debezium output into Pulsar.
+ * This test verify that the target topic contains wanted number messages.
+ *
+ * Debezium MySQL Container is "debezium/example-mysql:0.8",
+ * which is a MySQL database server preconfigured with an inventory database.
+ */
+@Slf4j
+public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContainer> {
+
+    private static final String NAME = "kafka-connect-adaptor";
+
+    private final String pulsarServiceUrl;
+
+    @Getter
+    private DebeziumMySQLContainer debeziumMySqlContainer;
+
+    private final PulsarCluster pulsarCluster;
+
+    public DebeziumMySqlSourceTester(PulsarCluster cluster) {
+        super(NAME);
+        this.pulsarCluster = cluster;
+        pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
+
+        sourceConfig.put("task.class", "io.debezium.connector.mysql.MySqlConnectorTask");
+        sourceConfig.put("database.hostname", "mysql");
+        sourceConfig.put("database.port", "3306");
+        sourceConfig.put("database.user", "debezium");
+        sourceConfig.put("database.password", "dbz");
+        sourceConfig.put("database.server.id", "184054");
+        sourceConfig.put("database.server.name", "dbserver1");
+        sourceConfig.put("database.whitelist", "inventory");
+        sourceConfig.put("database.history", "org.apache.pulsar.io.debezium.PulsarDatabaseHistory");
+        sourceConfig.put("database.history.pulsar.topic", "history-topic");
+        sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
+        sourceConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        sourceConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
+        sourceConfig.put("offset.storage.topic", "offset-topic");
+    }
+
+    @Override
+    public void setServiceContainer(DebeziumMySQLContainer container) {
+        log.info("start debezium mysql server container.");
+        debeziumMySqlContainer = container;
+        pulsarCluster.startService("mysql", debeziumMySqlContainer);
+    }
+
+    @Override
+    public void prepareSource() throws Exception {
+        log.info("debezium mysql server already contains preconfigured data.");
+    }
+
+    @Override
+    public Map<String, String> produceSourceMessages(int numMessages) throws Exception {
+        log.info("debezium mysql server already contains preconfigured data.");
+        return null;
+    }
+
+    public void validateSourceResult(Consumer<String> consumer, Map<String, String> kvs) throws Exception {
+        int recordsNumber = 0;
+        Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+        while(msg != null) {
+            recordsNumber ++;
+            log.info("Received message: {}.", msg.getValue());
+            Assert.assertTrue(msg.getValue().contains("dbserver1.inventory.products"));
+            consumer.acknowledge(msg);
+            msg = consumer.receive(1, TimeUnit.SECONDS);
+        }
+
+        Assert.assertEquals(recordsNumber, 9);
+        log.info("Stop debezium mysql server container. topic: {} has {} records.", consumer.getTopic(), recordsNumber);
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services