You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/08/13 23:34:59 UTC
[pulsar] branch master updated: [Issue 8502] Upgrade Debezium to a
newer version (#11204)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 283ae57 [Issue 8502] Upgrade Debezium to a newer version (#11204)
283ae57 is described below
commit 283ae572a001830cf02275097fdde92f6d4a648b
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Aug 13 16:34:20 2021 -0700
[Issue 8502] Upgrade Debezium to a newer version (#11204)
Fixes #8502
### Motivation
Upgrade Debezium to a newer version
### Modifications
Upgraded Deebzium to v.1.5.4 (latest built with Java 8, v.1.6.x built with Java 11)
Upgraded kafka-client to 2.7 (version debezium tested with)
Scala-lib to 2.13.6 (for kafka-client)
Dealt with API changes, tests etc.
PR is on top of https://github.com/apache/pulsar/pull/11154 to have Debezium integration tests on CI.
---
pom.xml | 8 +++---
pulsar-io/debezium/core/pom.xml | 7 +++++
.../pulsar/io/debezium/PulsarDatabaseHistory.java | 5 ++++
.../io/debezium/PulsarDatabaseHistoryTest.java | 12 ++++----
.../kafka/connect/PulsarKafkaSinkTaskContext.java | 33 ++++++++--------------
.../io/kafka/connect/PulsarOffsetBackingStore.java | 10 +------
.../connect/PulsarOffsetBackingStoreTest.java | 22 ++-------------
.../tests/integration/io/sources/SourceTester.java | 4 +++
8 files changed, 42 insertions(+), 59 deletions(-)
diff --git a/pom.xml b/pom.xml
index c332149..1f11163 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,7 +139,7 @@ flexible messaging model and an intuitive client API.</description>
<hbc-core.version>2.2.0</hbc-core.version>
<cassandra-driver-core.version>3.6.0</cassandra-driver-core.version>
<aerospike-client.version>4.4.8</aerospike-client.version>
- <kafka-client.version>2.3.0</kafka-client.version>
+ <kafka-client.version>2.7.0</kafka-client.version>
<rabbitmq-client.version>5.1.1</rabbitmq-client.version>
<aws-sdk.version>1.11.774</aws-sdk.version>
<avro.version>1.10.2</avro.version>
@@ -153,9 +153,9 @@ flexible messaging model and an intuitive client API.</description>
<hdfs-offload-version3>3.3.0</hdfs-offload-version3>
<elasticsearch.version>7.9.1</elasticsearch.version>
<presto.version>332</presto.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala-library.version>2.11.12</scala-library.version>
- <debezium.version>1.0.0.Final</debezium.version>
+ <scala.binary.version>2.13</scala.binary.version>
+ <scala-library.version>2.13.6</scala-library.version>
+ <debezium.version>1.5.4.Final</debezium.version>
<jsonwebtoken.version>0.11.1</jsonwebtoken.version>
<opencensus.version>0.18.0</opencensus.version>
<hbase.version>2.3.0</hbase.version>
diff --git a/pulsar-io/debezium/core/pom.xml b/pulsar-io/debezium/core/pom.xml
index 4bd97ed..0b92898 100644
--- a/pulsar-io/debezium/core/pom.xml
+++ b/pulsar-io/debezium/core/pom.xml
@@ -90,6 +90,13 @@
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-mysql</artifactId>
+ <version>${debezium.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index c3d95a7..9e9a7f1 100644
--- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -251,6 +251,11 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
}
@Override
+ public boolean storageExists() {
+ return true;
+ }
+
+ @Override
public String toString() {
if (topicName != null) {
return "Pulsar topic (" + topicName + ") at " + serviceUrl;
diff --git a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
index 6a21812..ba3bc6a 100644
--- a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
+++ b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
@@ -23,9 +23,9 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
-import io.debezium.relational.ddl.DdlParserSql2003;
-import io.debezium.relational.ddl.LegacyDdlParser;
+import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.text.ParsingException;
@@ -86,8 +86,8 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
// Calling it another time to ensure we can work with the DB history topic already existing
history.initializeStorage();
- LegacyDdlParser recoveryParser = new DdlParserSql2003();
- LegacyDdlParser ddlParser = new DdlParserSql2003();
+ DdlParser recoveryParser = new MySqlAntlrDdlParser();
+ DdlParser ddlParser = new MySqlAntlrDdlParser();
ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well
Tables tables1 = new Tables();
Tables tables2 = new Tables();
@@ -102,9 +102,9 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
// Now record schema changes, which writes out to kafka but doesn't actually change the Tables ...
setLogPosition(10);
- ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" +
+ ddl = "CREATE TABLE foo ( first VARCHAR(22) NOT NULL ); \n" +
"CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
- "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n";
+ "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL ); \n";
history.record(source, position, "db1", ddl);
// Parse the DDL statement 3x and each time update a different Tables object ...
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index b04ca16..ef2faa8 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -97,30 +97,21 @@ public class PulsarKafkaSinkTaskContext implements SinkTaskContext {
List<ByteBuffer> req = Lists.newLinkedList();
ByteBuffer key = topicPartitionAsKey(topicPartition);
req.add(key);
- CompletableFuture<Long> offsetFuture = new CompletableFuture<>();
- offsetStore.get(req, (Throwable ex, Map<ByteBuffer, ByteBuffer> result) -> {
- if (ex == null) {
- if (result != null && result.size() != 0) {
- Optional<ByteBuffer> val = result.entrySet().stream()
- .filter(entry -> entry.getKey().equals(key))
- .findFirst().map(entry -> entry.getValue());
- if (val.isPresent()) {
- long received = val.get().getLong();
- if (log.isDebugEnabled()) {
- log.debug("read initial offset for {} == {}", topicPartition, received);
- }
- offsetFuture.complete(received);
- return;
+ try {
+ Map<ByteBuffer, ByteBuffer> result = offsetStore.get(req).get();
+ if (result != null && result.size() != 0) {
+ Optional<ByteBuffer> val = result.entrySet().stream()
+ .filter(entry -> entry.getKey().equals(key))
+ .findFirst().map(entry -> entry.getValue());
+ if (val.isPresent()) {
+ long received = val.get().getLong();
+ if (log.isDebugEnabled()) {
+ log.debug("read initial offset for {} == {}", topicPartition, received);
}
+ return received;
}
- offsetFuture.complete(-1L);
- } else {
- offsetFuture.completeExceptionally(ex);
}
- });
-
- try {
- return offsetFuture.get();
+ return -1L;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("error getting initial state of {}", topicPartition, e);
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index 4638b00..774b8f1 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -175,8 +175,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
}
@Override
- public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys,
- Callback<Map<ByteBuffer, ByteBuffer>> callback) {
+ public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) {
CompletableFuture<Void> endFuture = new CompletableFuture<>();
readToEnd(endFuture);
return endFuture.thenApply(ignored -> {
@@ -190,14 +189,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
values.put(key, value);
}
}
- if (null != callback) {
- callback.onCompletion(null, values);
- }
return values;
- }).whenComplete((ignored, cause) -> {
- if (null != cause && null != callback) {
- callback.onCompletion(cause, null);
- }
});
}
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
index 8d8b754..bb2eced 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.util.Callback;
@@ -86,28 +87,11 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
@Test
public void testGetFromEmpty() throws Exception {
assertTrue(offsetBackingStore.get(
- Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))),
- null
+ Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8)))
).get().isEmpty());
}
@Test
- public void testGetFromEmptyCallback() throws Exception {
- CompletableFuture<Map<ByteBuffer, ByteBuffer>> callbackFuture = new CompletableFuture<>();
- assertTrue(offsetBackingStore.get(
- Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))),
- (error, result) -> {
- if (null != error) {
- callbackFuture.completeExceptionally(error);
- } else {
- callbackFuture.complete(result);
- }
- }
- ).get().isEmpty());
- assertTrue(callbackFuture.get().isEmpty());
- }
-
- @Test
public void testGetSet() throws Exception {
testGetSet(false);
}
@@ -144,7 +128,7 @@ public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
}
Map<ByteBuffer, ByteBuffer> result =
- offsetBackingStore.get(keys, null).get();
+ offsetBackingStore.get(keys).get();
assertEquals(numKeys, result.size());
AtomicInteger count = new AtomicInteger();
new TreeMap<>(result).forEach((key, value) -> {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
index 14f0bd4..f50d3be 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
@@ -58,6 +58,7 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
add("source");
add("op");
add("ts_ms");
+ add("transaction");
}};
protected SourceTester(String sourceType) {
@@ -144,7 +145,10 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
GenericRecord valueRecord = msg.getValue().getValue();
Assert.assertNotNull(valueRecord.getFields());
Assert.assertTrue(valueRecord.getFields().size() > 0);
+
+ log.info("Received message: key = {}, value = {}.", keyRecord.getNativeObject(), valueRecord.getNativeObject());
for (Field field : valueRecord.getFields()) {
+ log.info("validating field {}", field.getName());
Assert.assertTrue(DEBEZIUM_FIELD_SET.contains(field.getName()));
}