You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/08/13 23:34:59 UTC

[pulsar] branch master updated: [Issue 8502] Upgrade Debezium to a newer version (#11204)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 283ae57  [Issue 8502] Upgrade Debezium to a newer version (#11204)
283ae57 is described below

commit 283ae572a001830cf02275097fdde92f6d4a648b
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Aug 13 16:34:20 2021 -0700

    [Issue 8502] Upgrade Debezium to a newer version (#11204)
    
    Fixes #8502
    
    ### Motivation
    
    Upgrade Debezium to a newer version
    
    ### Modifications
    
    Upgraded Deebzium to v.1.5.4 (latest built with Java 8, v.1.6.x built with Java 11)
    Upgraded kafka-client to 2.7 (version debezium tested with)
    Scala-lib to 2.13.6 (for kafka-client)
    
    Dealt with API changes, tests etc.
    
    PR is on top of https://github.com/apache/pulsar/pull/11154 to have Debezium integration tests on CI.
---
 pom.xml                                            |  8 +++---
 pulsar-io/debezium/core/pom.xml                    |  7 +++++
 .../pulsar/io/debezium/PulsarDatabaseHistory.java  |  5 ++++
 .../io/debezium/PulsarDatabaseHistoryTest.java     | 12 ++++----
 .../kafka/connect/PulsarKafkaSinkTaskContext.java  | 33 ++++++++--------------
 .../io/kafka/connect/PulsarOffsetBackingStore.java | 10 +------
 .../connect/PulsarOffsetBackingStoreTest.java      | 22 ++-------------
 .../tests/integration/io/sources/SourceTester.java |  4 +++
 8 files changed, 42 insertions(+), 59 deletions(-)

diff --git a/pom.xml b/pom.xml
index c332149..1f11163 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,7 +139,7 @@ flexible messaging model and an intuitive client API.</description>
     <hbc-core.version>2.2.0</hbc-core.version>
     <cassandra-driver-core.version>3.6.0</cassandra-driver-core.version>
     <aerospike-client.version>4.4.8</aerospike-client.version>
-    <kafka-client.version>2.3.0</kafka-client.version>
+    <kafka-client.version>2.7.0</kafka-client.version>
     <rabbitmq-client.version>5.1.1</rabbitmq-client.version>
     <aws-sdk.version>1.11.774</aws-sdk.version>
     <avro.version>1.10.2</avro.version>
@@ -153,9 +153,9 @@ flexible messaging model and an intuitive client API.</description>
     <hdfs-offload-version3>3.3.0</hdfs-offload-version3>
     <elasticsearch.version>7.9.1</elasticsearch.version>
     <presto.version>332</presto.version>
-    <scala.binary.version>2.11</scala.binary.version>
-    <scala-library.version>2.11.12</scala-library.version>
-    <debezium.version>1.0.0.Final</debezium.version>
+    <scala.binary.version>2.13</scala.binary.version>
+    <scala-library.version>2.13.6</scala-library.version>
+    <debezium.version>1.5.4.Final</debezium.version>
     <jsonwebtoken.version>0.11.1</jsonwebtoken.version>
     <opencensus.version>0.18.0</opencensus.version>
     <hbase.version>2.3.0</hbase.version>
diff --git a/pulsar-io/debezium/core/pom.xml b/pulsar-io/debezium/core/pom.xml
index 4bd97ed..0b92898 100644
--- a/pulsar-io/debezium/core/pom.xml
+++ b/pulsar-io/debezium/core/pom.xml
@@ -90,6 +90,13 @@
       <type>test-jar</type>
     </dependency>
 
+    <dependency>
+      <groupId>io.debezium</groupId>
+      <artifactId>debezium-connector-mysql</artifactId>
+      <version>${debezium.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>
diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index c3d95a7..9e9a7f1 100644
--- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -251,6 +251,11 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
     }
 
     @Override
+    public boolean storageExists() {
+        return true;
+    }
+
+    @Override
     public String toString() {
         if (topicName != null) {
             return "Pulsar topic (" + topicName + ") at " + serviceUrl;
diff --git a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
index 6a21812..ba3bc6a 100644
--- a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
+++ b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
@@ -23,9 +23,9 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
 import io.debezium.relational.Tables;
-import io.debezium.relational.ddl.DdlParserSql2003;
-import io.debezium.relational.ddl.LegacyDdlParser;
+import io.debezium.relational.ddl.DdlParser;
 import io.debezium.relational.history.DatabaseHistory;
 import io.debezium.relational.history.DatabaseHistoryListener;
 import io.debezium.text.ParsingException;
@@ -86,8 +86,8 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
         // Calling it another time to ensure we can work with the DB history topic already existing
         history.initializeStorage();
 
-        LegacyDdlParser recoveryParser = new DdlParserSql2003();
-        LegacyDdlParser ddlParser = new DdlParserSql2003();
+        DdlParser recoveryParser = new MySqlAntlrDdlParser();
+        DdlParser ddlParser = new MySqlAntlrDdlParser();
         ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well
         Tables tables1 = new Tables();
         Tables tables2 = new Tables();
@@ -102,9 +102,9 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
 
         // Now record schema changes, which writes out to kafka but doesn't actually change the Tables ...
         setLogPosition(10);
-        ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" +
+        ddl = "CREATE TABLE foo ( first VARCHAR(22) NOT NULL ); \n" +
             "CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
-            "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n";
+            "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL ); \n";
         history.record(source, position, "db1", ddl);
 
         // Parse the DDL statement 3x and each time update a different Tables object ...
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index b04ca16..ef2faa8 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -97,30 +97,21 @@ public class PulsarKafkaSinkTaskContext implements SinkTaskContext {
             List<ByteBuffer> req = Lists.newLinkedList();
             ByteBuffer key = topicPartitionAsKey(topicPartition);
             req.add(key);
-            CompletableFuture<Long> offsetFuture = new CompletableFuture<>();
-            offsetStore.get(req, (Throwable ex, Map<ByteBuffer, ByteBuffer> result) -> {
-                if (ex == null) {
-                    if (result != null && result.size() != 0) {
-                        Optional<ByteBuffer> val = result.entrySet().stream()
-                                .filter(entry -> entry.getKey().equals(key))
-                                .findFirst().map(entry -> entry.getValue());
-                        if (val.isPresent()) {
-                            long received = val.get().getLong();
-                            if (log.isDebugEnabled()) {
-                                log.debug("read initial offset for {} == {}", topicPartition, received);
-                            }
-                            offsetFuture.complete(received);
-                            return;
+            try {
+                Map<ByteBuffer, ByteBuffer> result = offsetStore.get(req).get();
+                if (result != null && result.size() != 0) {
+                    Optional<ByteBuffer> val = result.entrySet().stream()
+                            .filter(entry -> entry.getKey().equals(key))
+                            .findFirst().map(entry -> entry.getValue());
+                    if (val.isPresent()) {
+                        long received = val.get().getLong();
+                        if (log.isDebugEnabled()) {
+                            log.debug("read initial offset for {} == {}", topicPartition, received);
                         }
+                        return received;
                     }
-                    offsetFuture.complete(-1L);
-                } else {
-                    offsetFuture.completeExceptionally(ex);
                 }
-            });
-
-            try {
-                return offsetFuture.get();
+                return -1L;
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 log.error("error getting initial state of {}", topicPartition, e);
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index 4638b00..774b8f1 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -175,8 +175,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
     }
 
     @Override
-    public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys,
-                                                   Callback<Map<ByteBuffer, ByteBuffer>> callback) {
+    public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) {
         CompletableFuture<Void> endFuture = new CompletableFuture<>();
         readToEnd(endFuture);
         return endFuture.thenApply(ignored -> {
@@ -190,14 +189,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
                     values.put(key, value);
                 }
             }
-            if (null != callback) {
-                callback.onCompletion(null, values);
-            }
             return values;
-        }).whenComplete((ignored, cause) -> {
-            if (null != cause && null != callback) {
-                callback.onCompletion(cause, null);
-            }
         });
     }
 
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
index 8d8b754..bb2eced 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.util.Callback;
@@ -86,28 +87,11 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
     @Test
     public void testGetFromEmpty() throws Exception {
         assertTrue(offsetBackingStore.get(
-            Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))),
-            null
+            Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8)))
         ).get().isEmpty());
     }
 
     @Test
-    public void testGetFromEmptyCallback() throws Exception {
-        CompletableFuture<Map<ByteBuffer, ByteBuffer>> callbackFuture = new CompletableFuture<>();
-        assertTrue(offsetBackingStore.get(
-            Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))),
-            (error, result) -> {
-                if (null != error) {
-                    callbackFuture.completeExceptionally(error);
-                } else {
-                    callbackFuture.complete(result);
-                }
-            }
-        ).get().isEmpty());
-        assertTrue(callbackFuture.get().isEmpty());
-    }
-
-    @Test
     public void testGetSet() throws Exception {
         testGetSet(false);
     }
@@ -144,7 +128,7 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
         }
 
         Map<ByteBuffer, ByteBuffer> result =
-            offsetBackingStore.get(keys, null).get();
+            offsetBackingStore.get(keys).get();
         assertEquals(numKeys, result.size());
         AtomicInteger count = new AtomicInteger();
         new TreeMap<>(result).forEach((key, value) -> {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
index 14f0bd4..f50d3be 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
@@ -58,6 +58,7 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
         add("source");
         add("op");
         add("ts_ms");
+        add("transaction");
     }};
 
     protected SourceTester(String sourceType) {
@@ -144,7 +145,10 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
             GenericRecord valueRecord = msg.getValue().getValue();
             Assert.assertNotNull(valueRecord.getFields());
             Assert.assertTrue(valueRecord.getFields().size() > 0);
+
+            log.info("Received message: key = {}, value = {}.", keyRecord.getNativeObject(), valueRecord.getNativeObject());
             for (Field field : valueRecord.getFields()) {
+                log.info("validating field {}", field.getName());
                 Assert.assertTrue(DEBEZIUM_FIELD_SET.contains(field.getName()));
             }