You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/07/15 05:57:25 UTC
[rocketmq-connect] branch master updated: [ISSUE #191]Encountered change event for table databasename.tablename whose schema isn`t known to this connector (#195)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 0b6da66 [ISSUE #191]Encountered change event for table databasename.tablename whose schema isn`t known to this connector (#195)
0b6da66 is described below
commit 0b6da66e85b002c1c51b5e433ce9c49fe35aa61f
Author: xiaoyi <su...@163.com>
AuthorDate: Fri Jul 15 13:57:20 2022 +0800
[ISSUE #191]Encountered change event for table databasename.tablename whose schema isn`t known to this connector (#195)
* Fix debezium demecial type conversion problem #190
* Upgrade rocketmq-replicator API to v0.1.3 #189
* Encountered change event for table databasename.tablename whose schema isn`t known to this connector #191
---
.../kafka-connect-adaptor/pom.xml | 4 +-
.../connector/AbstractKafkaSinkConnector.java | 27 ++++-
.../connector/AbstractKafkaSourceConnector.java | 30 +++++-
.../adaptor/connector/ConnectorClassSetter.java | 32 ++++--
.../adaptor/connector/KafkaConnectorContext.java | 20 +++-
.../connector/KafkaSinkAdaptorConnector.java | 22 +++-
.../connector/KafkaSourceAdaptorConnector.java | 20 +++-
.../context/RocketMQKafkaErrantRecordReporter.java | 3 +-
.../context/RocketMQKafkaSinkTaskContext.java | 22 ++--
.../kafka/connect/adaptor/schema/Converters.java | 28 ++---
.../adaptor/schema/KafkaSinkSchemaConverter.java | 16 +--
.../adaptor/schema/KafkaSinkValueConverter.java | 3 +-
.../schema/RocketMQSourceSchemaConverter.java | 9 +-
.../schema/RocketMQSourceValueConverter.java | 1 -
.../adaptor/task/AbstractKafkaConnectSink.java | 12 +--
.../adaptor/task/AbstractKafkaConnectSource.java | 14 +--
.../adaptor/task/KafkaConnectAdaptorSink.java | 1 +
.../connect/adaptor/task/TaskClassSetter.java | 29 +++--
.../connect/adaptor/SourceRecordConverterTest.java | 4 +-
connectors/rocketmq-connect-debezium/pom.xml | 6 +-
.../rocketmq-connect-debezium-core/pom.xml | 4 +-
.../connect/debezium/DebeziumConnector.java | 10 +-
.../{ConnectUtil.java => RocketMQConnectUtil.java} | 119 +++++++++------------
.../connect/debezium/RocketMqConnectConfig.java | 32 +-----
.../connect/debezium/RocketMqDatabaseHistory.java | 105 +++++++-----------
.../rocketmq-connect-debezium-mysql/pom.xml | 4 +-
.../debezium/mysql/DebeziumMysqlConnector.java | 2 +-
.../resources/debezium-mysql-source-config.yaml | 8 +-
.../rocketmq-connect-debezium-oracle/pom.xml | 4 +-
.../debezium/oracle/DebeziumOracleConnector.java | 2 +-
.../connect/runtime/connectorwrapper/Worker.java | 4 +-
31 files changed, 330 insertions(+), 267 deletions(-)
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
index 64190bb..b79d77b 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-connect-debezium</artifactId>
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java
index 8e5391e..242acdf 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java
@@ -1,8 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
-import io.openmessaging.connector.api.component.task.source.SourceConnector;
import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.internal.DefaultKeyValue;
import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -36,15 +52,17 @@ public abstract class AbstractKafkaSinkConnector extends SinkConnector implement
/**
* try override start and stop
+ *
* @return
*/
- protected org.apache.kafka.connect.sink.SinkConnector originalSinkConnector(){
+ protected org.apache.kafka.connect.sink.SinkConnector originalSinkConnector() {
return sinkConnector;
}
/**
* Returns a set of configurations for Tasks based on the current configuration,
* producing at most count configurations.
+ *
* @param maxTasks maximum number of configurations to generate
* @return configurations for Tasks
*/
@@ -54,7 +72,7 @@ public abstract class AbstractKafkaSinkConnector extends SinkConnector implement
List<KeyValue> configs = new ArrayList<>();
for (Map<String, String> configMaps : groupConnectors) {
KeyValue keyValue = new DefaultKeyValue();
- configMaps.forEach((k, v)->{
+ configMaps.forEach((k, v) -> {
keyValue.put(k, v);
});
configs.add(keyValue);
@@ -64,6 +82,7 @@ public abstract class AbstractKafkaSinkConnector extends SinkConnector implement
/**
* Start the component
+ *
* @param config component context
*/
@Override
@@ -90,7 +109,7 @@ public abstract class AbstractKafkaSinkConnector extends SinkConnector implement
*/
@Override
public void stop() {
- if (sinkConnector != null){
+ if (sinkConnector != null) {
sinkConnector = null;
configValue = null;
taskConfig = null;
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java
index f9d5456..921455c 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
import io.openmessaging.KeyValue;
@@ -36,15 +53,17 @@ public abstract class AbstractKafkaSourceConnector extends SourceConnector imple
/**
* try override start and stop
+ *
* @return
*/
- protected org.apache.kafka.connect.source.SourceConnector originalSinkConnector(){
+ protected org.apache.kafka.connect.source.SourceConnector originalSinkConnector() {
return sourceConnector;
}
/**
* Returns a set of configurations for Tasks based on the current configuration,
* producing at most count configurations.
+ *
* @param maxTasks maximum number of configurations to generate
* @return configurations for Tasks
*/
@@ -54,7 +73,7 @@ public abstract class AbstractKafkaSourceConnector extends SourceConnector imple
List<KeyValue> configs = new ArrayList<>();
for (Map<String, String> configMaps : groupConnectors) {
KeyValue keyValue = new DefaultKeyValue();
- configMaps.forEach((k, v)->{
+ configMaps.forEach((k, v) -> {
keyValue.put(k, v);
});
configs.add(keyValue);
@@ -64,6 +83,7 @@ public abstract class AbstractKafkaSourceConnector extends SourceConnector imple
/**
* Start the component
+ *
* @param config component context
*/
@Override
@@ -73,7 +93,7 @@ public abstract class AbstractKafkaSourceConnector extends SourceConnector imple
this.configValue.put(key, config.getString(key));
});
setConnectorClass(configValue);
- taskConfig = new HashMap<>(configValue.config());
+ taskConfig = new HashMap<>(configValue.config());
// get the source class name from config and create source task from reflection
try {
sourceConnector = Class.forName(taskConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))
@@ -90,9 +110,9 @@ public abstract class AbstractKafkaSourceConnector extends SourceConnector imple
*/
@Override
public void stop() {
- if (sourceConnector != null){
+ if (sourceConnector != null) {
sourceConnector = null;
- configValue = null;
+ configValue = null;
this.taskConfig = null;
}
}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java
index b0c0007..f323d9a 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
import io.openmessaging.KeyValue;
@@ -8,15 +25,16 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
*/
public interface ConnectorClassSetter {
/**
- * set connector class
- * @param config
+ * get connector class
*/
- default void setConnectorClass(KeyValue config) {
- config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, getConnectorClass());
- }
+ String getConnectorClass();
/**
- * get connector class
+ * set connector class
+ *
+ * @param config
*/
- String getConnectorClass();
+ default void setConnectorClass(KeyValue config) {
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, getConnectorClass());
+ }
}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java
index a42c507..1054e8a 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
import org.apache.kafka.connect.connector.ConnectorContext;
@@ -7,7 +24,8 @@ import org.apache.kafka.connect.connector.ConnectorContext;
*/
public class KafkaConnectorContext implements ConnectorContext {
io.openmessaging.connector.api.component.connector.ConnectorContext context;
- public KafkaConnectorContext(io.openmessaging.connector.api.component.connector.ConnectorContext context){
+
+ public KafkaConnectorContext(io.openmessaging.connector.api.component.connector.ConnectorContext context) {
this.context = context;
}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java
index 90575bf..a4d6d10 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
import io.openmessaging.KeyValue;
@@ -9,6 +26,7 @@ public abstract class KafkaSinkAdaptorConnector extends AbstractKafkaSinkConnect
/**
* Start the component
+ *
* @param config component context
*/
@Override
@@ -24,8 +42,8 @@ public abstract class KafkaSinkAdaptorConnector extends AbstractKafkaSinkConnect
*/
@Override
public void stop() {
- if (sinkConnector != null){
- sinkConnector.stop();
+ if (sinkConnector != null) {
+ sinkConnector.stop();
}
super.stop();
}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java
index cea0a22..337c90b 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
import io.openmessaging.KeyValue;
@@ -9,6 +26,7 @@ public abstract class KafkaSourceAdaptorConnector extends AbstractKafkaSourceCon
/**
* Start the component
+ *
* @param config component context
*/
@Override
@@ -24,7 +42,7 @@ public abstract class KafkaSourceAdaptorConnector extends AbstractKafkaSourceCon
*/
@Override
public void stop() {
- if (sourceConnector != null){
+ if (sourceConnector != null) {
sourceConnector.stop();
}
super.stop();
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java
index 3b951c6..0cd3c2d 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java
@@ -14,7 +14,8 @@ import java.util.concurrent.Future;
*/
public class RocketMQKafkaErrantRecordReporter implements ErrantRecordReporter {
final ErrorRecordReporter errorRecordReporter;
- RocketMQKafkaErrantRecordReporter(SinkTaskContext context){
+
+ RocketMQKafkaErrantRecordReporter(SinkTaskContext context) {
errorRecordReporter = context.errorRecordReporter();
}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java
index 0fd48a8..63ce1c6 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java
@@ -58,7 +58,7 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
@Override
public void offset(Map<TopicPartition, Long> map) {
Map<RecordPartition, RecordOffset> offsets = new HashMap<>();
- map.forEach((topicPartition, offset)->{
+ map.forEach((topicPartition, offset) -> {
offsets.put(convertToRecordPartition(topicPartition), convertToRecordOffset(offset));
});
sinkContext.resetOffset(offsets);
@@ -76,7 +76,7 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
@Override
public Set<TopicPartition> assignment() {
- Set<TopicPartition> topicPartitions = new HashSet<>();
+ Set<TopicPartition> topicPartitions = new HashSet<>();
Set<RecordPartition> recordPartitions = sinkContext.assignment();
recordPartitions.forEach(partition -> {
topicPartitions.add(convertToTopicPartition(partition.getPartition()));
@@ -87,9 +87,9 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
@Override
public void pause(TopicPartition... topicPartitions) {
List<RecordPartition> partitions = new ArrayList<>();
- for (TopicPartition topicPartition : topicPartitions){
+ for (TopicPartition topicPartition : topicPartitions) {
RecordPartition recordPartition = convertToRecordPartition(topicPartition);
- if (recordPartition != null){
+ if (recordPartition != null) {
partitions.add(recordPartition);
}
}
@@ -99,9 +99,9 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
@Override
public void resume(TopicPartition... topicPartitions) {
List<RecordPartition> partitions = new ArrayList<>();
- for (TopicPartition topicPartition : topicPartitions){
+ for (TopicPartition topicPartition : topicPartitions) {
RecordPartition recordPartition = convertToRecordPartition(topicPartition);
- if (recordPartition != null){
+ if (recordPartition != null) {
partitions.add(recordPartition);
}
}
@@ -121,11 +121,12 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
/**
* convert to kafka topic partition
+ *
* @param partitionMap
* @return
*/
public TopicPartition convertToTopicPartition(Map<String, ?> partitionMap) {
- if (partitionMap.containsKey(TOPIC) && partitionMap.containsKey(QUEUE_ID)){
+ if (partitionMap.containsKey(TOPIC) && partitionMap.containsKey(QUEUE_ID)) {
return new TopicPartition(partitionMap.get(TOPIC).toString(), Integer.valueOf(partitionMap.get(QUEUE_ID).toString()));
}
return null;
@@ -133,18 +134,19 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
/**
* convert to rocketmq record partition
+ *
* @param topicPartition
* @return
*/
public RecordPartition convertToRecordPartition(TopicPartition topicPartition) {
- if (topicPartition != null){
- return new RecordPartition(Collections.singletonMap(topicPartition.topic(),topicPartition.partition()));
+ if (topicPartition != null) {
+ return new RecordPartition(Collections.singletonMap(topicPartition.topic(), topicPartition.partition()));
}
return null;
}
private RecordOffset convertToRecordOffset(long l) {
- return new RecordOffset(Collections.singletonMap(QUEUE_OFFSET,l));
+ return new RecordOffset(Collections.singletonMap(QUEUE_OFFSET, l));
}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
index abc2abf..9c0c529 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
@@ -38,7 +38,7 @@ import java.util.Map;
public class Converters {
- public static ConnectRecord fromSourceRecord(SourceRecord record){
+ public static ConnectRecord fromSourceRecord(SourceRecord record) {
// sourceRecord convert connect Record
RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
@@ -58,7 +58,7 @@ public class Converters {
}
- public static ConnectRecord fromSinkRecord(SinkRecord record){
+ public static ConnectRecord fromSinkRecord(SinkRecord record) {
// sourceRecord convert connect Record
RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
@@ -78,13 +78,13 @@ public class Converters {
}
-
/**
* convert rocketmq connect record to sink record
+ *
* @param record
* @return
*/
- public static SinkRecord fromConnectRecord(ConnectRecord record){
+ public static SinkRecord fromConnectRecord(ConnectRecord record) {
// connect record convert kafka sink record
KafkaSinkSchemaConverter kafkaSinkSchemaConverter = new KafkaSinkSchemaConverter(record.getSchema());
Schema schema = kafkaSinkSchemaConverter.schema();
@@ -94,7 +94,7 @@ public class Converters {
Iterator extensions = record.getExtensions().keySet().iterator();
while (extensions.hasNext()) {
String key = String.valueOf(extensions.next());
- headers.add(key, record.getExtensions().getString(key) , null);
+ headers.add(key, record.getExtensions().getString(key), null);
}
SinkRecord sinkRecord = new SinkRecord(
@@ -112,7 +112,7 @@ public class Converters {
return sinkRecord;
}
- public static RecordPartition toRecordPartition(SinkRecord record){
+ public static RecordPartition toRecordPartition(SinkRecord record) {
Map<String, String> recordPartitionMap = new HashMap<>();
recordPartitionMap.put(RocketMQKafkaSinkTaskContext.TOPIC, record.topic());
@@ -120,7 +120,7 @@ public class Converters {
return new RecordPartition(recordPartitionMap);
}
- public static RecordOffset toRecordOffset(SinkRecord record){
+ public static RecordOffset toRecordOffset(SinkRecord record) {
Map<String, String> recordOffsetMap = new HashMap<>();
recordOffsetMap.put(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET, record.kafkaOffset() + "");
return new RecordOffset(recordOffsetMap);
@@ -129,31 +129,35 @@ public class Converters {
/**
* get topic
+ *
* @param partition
* @return
*/
- public static String topic(RecordPartition partition){
+ public static String topic(RecordPartition partition) {
return partition.getPartition().get(RocketMQKafkaSinkTaskContext.TOPIC).toString();
}
/**
* get partition
+ *
* @param partition
* @return
*/
- public static int partition(RecordPartition partition){
- if (partition.getPartition().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_ID)){
+ public static int partition(RecordPartition partition) {
+ if (partition.getPartition().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_ID)) {
return Integer.valueOf(partition.getPartition().get(RocketMQKafkaSinkTaskContext.QUEUE_ID).toString());
}
return -1;
}
+
/**
* get offset
+ *
* @param offset
* @return
*/
- public static int offset(RecordOffset offset){
- if (offset.getOffset().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET)){
+ public static int offset(RecordOffset offset) {
+ if (offset.getOffset().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET)) {
return Integer.valueOf(offset.getOffset().get(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET).toString());
}
return -1;
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
index 4f718b0..d6c7574 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
@@ -37,14 +37,16 @@ import java.util.Map;
public class KafkaSinkSchemaConverter {
private static Logger logger = LoggerFactory.getLogger(KafkaSinkSchemaConverter.class);
private static Map<String, String> logicalMapping = new HashMap<>();
+
static {
- logicalMapping.put(io.openmessaging.connector.api.data.logical.Decimal.LOGICAL_NAME,Decimal.LOGICAL_NAME);
+ logicalMapping.put(io.openmessaging.connector.api.data.logical.Decimal.LOGICAL_NAME, Decimal.LOGICAL_NAME);
logicalMapping.put(io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME, Date.LOGICAL_NAME);
logicalMapping.put(io.openmessaging.connector.api.data.logical.Time.LOGICAL_NAME, Time.LOGICAL_NAME);
logicalMapping.put(io.openmessaging.connector.api.data.logical.Timestamp.LOGICAL_NAME, Timestamp.LOGICAL_NAME);
}
private SchemaBuilder builder;
+
public KafkaSinkSchemaConverter(Schema schema) {
builder = convertKafkaSchema(schema);
}
@@ -55,6 +57,7 @@ public class KafkaSinkSchemaConverter {
/**
* convert kafka schema
+ *
* @param originalSchema
* @return
*/
@@ -151,8 +154,7 @@ public class KafkaSinkSchemaConverter {
.name(schemaName)
.doc(originalSchema.getDoc())
.defaultValue(originalSchema.getDefaultValue())
- .parameters(parameters)
- ;
+ .parameters(parameters);
convertStructSchema(schemaBuilder, originalSchema);
return schemaBuilder;
case ARRAY:
@@ -192,10 +194,10 @@ public class KafkaSinkSchemaConverter {
// schema
Schema schema = field.getSchema();
- String schemaName = convertSchemaName(field.getSchema().getName());
+ String schemaName = convertSchemaName(field.getSchema().getName());
// field name
- String fieldName = field.getName();
+ String fieldName = field.getName();
FieldType type = schema.getFieldType();
Map<String, String> parameters = schema.getParameters() == null ? new HashMap<>() : schema.getParameters();
@@ -337,8 +339,8 @@ public class KafkaSinkSchemaConverter {
}
- private String convertSchemaName(String schemaName){
- if (logicalMapping.containsKey(schemaName)){
+ private String convertSchemaName(String schemaName) {
+ if (logicalMapping.containsKey(schemaName)) {
return logicalMapping.get(schemaName);
}
return schemaName;
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
index 0cbc2f9..7b37f59 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
@@ -35,12 +35,13 @@ public class KafkaSinkValueConverter {
private static Logger logger = LoggerFactory.getLogger(KafkaSinkValueConverter.class);
- public Object value(Schema schema,Object data) {
+ public Object value(Schema schema, Object data) {
return convertKafkaValue(schema, data);
}
/**
* convert value
+ *
* @param targetSchema
* @param originalValue
* @return
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
index 5ca408e..cee3d94 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
-import com.beust.jcommander.internal.Maps;
import io.openmessaging.connector.api.data.SchemaBuilder;
import io.openmessaging.connector.api.data.logical.Timestamp;
import io.openmessaging.connector.api.errors.ConnectException;
@@ -39,6 +38,7 @@ import java.util.Map;
public class RocketMQSourceSchemaConverter {
private static Logger logger = LoggerFactory.getLogger(RocketMQSourceSchemaConverter.class);
private static Map<String, String> logicalMapping = new HashMap<>();
+
static {
logicalMapping.put(Decimal.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Decimal.LOGICAL_NAME);
logicalMapping.put(Date.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME);
@@ -148,8 +148,7 @@ public class RocketMQSourceSchemaConverter {
.name(schemaName)
.doc(originalSchema.doc())
.defaultValue(originalSchema.defaultValue())
- .parameters(parameters)
- ;
+ .parameters(parameters);
convertStructSchema(schemaBuilder, originalSchema);
return schemaBuilder;
case ARRAY:
@@ -328,8 +327,8 @@ public class RocketMQSourceSchemaConverter {
}
- private String convertSchemaName(String schemaName){
- if (logicalMapping.containsKey(schemaName)){
+ private String convertSchemaName(String schemaName) {
+ if (logicalMapping.containsKey(schemaName)) {
return logicalMapping.get(schemaName);
}
return schemaName;
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
index 8801efb..40d6178 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
@@ -22,7 +22,6 @@ import io.openmessaging.connector.api.data.FieldType;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.Struct;
import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java
index 8677db9..0f0c426 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java
@@ -37,28 +37,27 @@ import java.util.Map;
/**
* abstract kafka connect sink
*/
-public abstract class AbstractKafkaConnectSink extends SinkTask implements TaskClassSetter{
+public abstract class AbstractKafkaConnectSink extends SinkTask implements TaskClassSetter {
private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSink.class);
-
- private SinkTaskContext kafkaSinkTaskContext;
- private org.apache.kafka.connect.sink.SinkTask sinkTask;
-
protected TransformationWrapper transformationWrapper;
-
/**
* kafka connect init
*/
protected ConnectKeyValue configValue;
+ private SinkTaskContext kafkaSinkTaskContext;
+ private org.apache.kafka.connect.sink.SinkTask sinkTask;
/**
* convert by kafka sink transform
+ *
* @param record
*/
protected abstract SinkRecord transforms(SinkRecord record);
/**
* convert ConnectRecord to SinkRecord
+ *
* @param record
* @return
*/
@@ -67,6 +66,7 @@ public abstract class AbstractKafkaConnectSink extends SinkTask implements TaskC
/**
* Put the records to the sink
+ *
* @param records sink records
*/
@Override
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java
index f062b3d..cf1595e 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java
@@ -40,19 +40,18 @@ import java.util.Map;
/**
* abstract kafka connect
*/
-public abstract class AbstractKafkaConnectSource extends SourceTask implements TaskClassSetter{
+public abstract class AbstractKafkaConnectSource extends SourceTask implements TaskClassSetter {
private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
protected TransformationWrapper transformationWrapper;
- private SourceTaskContext kafkaSourceTaskContext;
- private org.apache.kafka.connect.source.SourceTask sourceTask;
- private OffsetStorageReader offsetReader;
-
/**
* kafka connect init
*/
protected ConnectKeyValue configValue;
+ private SourceTaskContext kafkaSourceTaskContext;
+ private org.apache.kafka.connect.source.SourceTask sourceTask;
+ private OffsetStorageReader offsetReader;
@Override
public List<ConnectRecord> poll() throws InterruptedException {
@@ -74,6 +73,7 @@ public abstract class AbstractKafkaConnectSource extends SourceTask implements T
/**
* convert transform
+ *
* @param sourceRecord
*/
protected abstract SourceRecord transforms(SourceRecord sourceRecord);
@@ -103,7 +103,7 @@ public abstract class AbstractKafkaConnectSource extends SourceTask implements T
// get the source class name from config and create source task from reflection
try {
- sourceTask = Class.forName(taskConfig.get(TaskConfig.TASK_CLASS_CONFIG))
+ sourceTask = Class.forName(taskConfig.get(TaskConfig.TASK_CLASS_CONFIG), true, AbstractKafkaConnectSource.class.getClassLoader())
.asSubclass(org.apache.kafka.connect.source.SourceTask.class)
.getDeclaredConstructor()
.newInstance();
@@ -112,7 +112,7 @@ public abstract class AbstractKafkaConnectSource extends SourceTask implements T
}
offsetReader = new KafkaOffsetStorageReader(
- sourceTaskContext
+ sourceTaskContext
);
kafkaSourceTaskContext = new RocketMQKafkaSourceTaskContext(offsetReader, taskConfig);
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java
index 439f733..2f3d364 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java
@@ -55,6 +55,7 @@ public abstract class KafkaConnectAdaptorSink extends AbstractKafkaConnectSink {
/**
* convert ConnectRecord to SinkRecord
+ *
* @param record
* @return
*/
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java
index 64c58c4..dffb8b8 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java
@@ -1,23 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
import io.openmessaging.KeyValue;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.TaskConfig;
/**
* task class setter
*/
public interface TaskClassSetter {
+ /**
+ * get connector class
+ */
+ String getTaskClass();
+
/**
* set connector class
+ *
* @param config
*/
default void setTaskClass(KeyValue config) {
config.put(TaskConfig.TASK_CLASS_CONFIG, getTaskClass());
}
-
- /**
- * get connector class
- */
- String getTaskClass();
}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java
index 0264dcb..a6de571 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java
@@ -181,7 +181,7 @@ public class SourceRecordConverterTest {
// sourceRecord convert connect Record
RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(originalRecord.valueSchema());
- io.openmessaging.connector.api.data.Schema schema =rocketMQSourceSchemaConverter.schema();
+ io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
ConnectRecord connectRecord = new ConnectRecord(
new RecordPartition(originalRecord.sourcePartition()),
@@ -198,6 +198,6 @@ public class SourceRecordConverterTest {
}
final byte[] messageBody = JSON.toJSONString(connectRecord).getBytes();
String bodyStr = new String(messageBody, StandardCharsets.UTF_8);
- ConnectRecord newConnectRecord= JSON.parseObject(bodyStr, ConnectRecord.class);
+ ConnectRecord newConnectRecord = JSON.parseObject(bodyStr, ConnectRecord.class);
}
}
diff --git a/connectors/rocketmq-connect-debezium/pom.xml b/connectors/rocketmq-connect-debezium/pom.xml
index 82994e9..b5617e7 100644
--- a/connectors/rocketmq-connect-debezium/pom.xml
+++ b/connectors/rocketmq-connect-debezium/pom.xml
@@ -10,8 +10,8 @@
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.rocketmq</groupId>
@@ -180,7 +180,7 @@
<debezium.version>1.7.2.Final</debezium.version>
<debezium.postgresql.version>42.3.3</debezium.postgresql.version>
<!--rocketmq version-->
- <rocketmq.version>4.5.2</rocketmq.version>
+ <rocketmq.version>4.7.0</rocketmq.version>
<rocketmq-openmessaging.version>4.3.2</rocketmq-openmessaging.version>
<!--rocketmq connect version-->
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
index 4862b5a..121e611 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-connect-debezium</artifactId>
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java
index 7a3f51a..1db1208 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java
@@ -17,20 +17,12 @@
package org.apache.rocketmq.connect.debezium;
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.component.task.source.SourceConnector;
-import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.rocketmq.connect.kafka.connect.adaptor.connector.KafkaSourceAdaptorConnector;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* debezium connector
*/
public abstract class DebeziumConnector extends KafkaSourceAdaptorConnector {
-
+
}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
similarity index 67%
rename from connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java
rename to connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
index 439bcf5..4e33a93 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
@@ -18,10 +18,10 @@
package org.apache.rocketmq.connect.debezium;
import com.beust.jcommander.internal.Sets;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
@@ -38,10 +38,8 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -49,9 +47,7 @@ import java.util.UUID;
/**
* rocket connect util
*/
-public class ConnectUtil {
-
- private static final String SYS_TASK_CG_PREFIX = "connect-";
+public class RocketMQConnectUtil {
public static String createGroupName(String prefix) {
StringBuilder sb = new StringBuilder();
@@ -62,27 +58,47 @@ public class ConnectUtil {
return sb.toString().replace(".", "-");
}
- public static String createGroupName(String prefix, String postfix) {
- return new StringBuilder().append(prefix).append("-").append(postfix).toString();
+ public static String createUniqInstance(String prefix) {
+ return new StringBuffer(prefix).append("-").append(UUID.randomUUID()).toString();
}
- public static String createInstance(String servers) {
- String[] serversArray = servers.split(";");
- List<String> serversList = new ArrayList<String>();
- for (String server : serversArray) {
- if (!serversList.contains(server)) {
- serversList.add(server);
- }
- }
- Collections.sort(serversList);
- return String.valueOf(serversList.toString().hashCode());
+ private static RPCHook getAclRPCHook(String accessKey, String secretKey) {
+ return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
- public static String createUniqInstance(String prefix) {
- return new StringBuffer(prefix).append("-").append(UUID.randomUUID().toString()).toString();
+ /**
+ * init default lite pull consumer
+ *
+ * @param connectConfig
+ * @param topic
+ * @param autoCommit
+ * @return
+ * @throws MQClientException
+ */
+ public static DefaultLitePullConsumer initDefaultLitePullConsumer(RocketMqConnectConfig connectConfig, String topic, boolean autoCommit) throws MQClientException {
+ DefaultLitePullConsumer consumer = null;
+ if (Objects.isNull(consumer)) {
+ if (StringUtils.isBlank(connectConfig.getAccessKey()) && StringUtils.isBlank(connectConfig.getSecretKey())) {
+ consumer = new DefaultLitePullConsumer(
+ connectConfig.getRmqConsumerGroup()
+ );
+ } else {
+ consumer = new DefaultLitePullConsumer(
+ connectConfig.getRmqConsumerGroup(),
+ getAclRPCHook(connectConfig.getAccessKey(), connectConfig.getSecretKey())
+ );
+ }
+ }
+ consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ String uniqueName = Thread.currentThread().getName() + "-" + System.currentTimeMillis() % 1000;
+ consumer.setInstanceName(uniqueName);
+ consumer.setUnitName(uniqueName);
+ consumer.setAutoCommit(autoCommit);
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ consumer.subscribe(topic, "*");
+ return consumer;
}
-
public static DefaultMQProducer initDefaultMQProducer(RocketMqConnectConfig connectConfig) {
RPCHook rpcHook = null;
if (connectConfig.isAclEnable()) {
@@ -91,55 +107,24 @@ public class ConnectUtil {
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
- producer.setProducerGroup(connectConfig.getRmqProducerGroup());
+ producer.setProducerGroup(connectConfig.getRmqConsumerGroup());
producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
producer.setLanguage(LanguageCode.JAVA);
return producer;
}
- public static DefaultMQPullConsumer initDefaultMQPullConsumer(RocketMqConnectConfig connectConfig) {
- RPCHook rpcHook = null;
- if (connectConfig.isAclEnable()) {
- rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
- }
- DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
- consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
- consumer.setConsumerGroup(connectConfig.getRmqConsumerGroup());
- consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
- consumer.setConsumerPullTimeoutMillis((long) connectConfig.getRmqMessageConsumeTimeout());
- consumer.setLanguage(LanguageCode.JAVA);
- return consumer;
- }
-
- public static DefaultMQPushConsumer initDefaultMQPushConsumer(RocketMqConnectConfig connectConfig) {
- RPCHook rpcHook = null;
- if (connectConfig.isAclEnable()) {
- rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
- }
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rpcHook);
- consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
- consumer.setConsumerGroup(createGroupName(connectConfig.getRmqConsumerGroup()));
- consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
- consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout());
- consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums());
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- consumer.setLanguage(LanguageCode.JAVA);
- return consumer;
- }
-
public static DefaultMQAdminExt startMQAdminTool(RocketMqConnectConfig connectConfig) throws MQClientException {
- RPCHook rpcHook = null;
+ DefaultMQAdminExt admin;
if (connectConfig.isAclEnable()) {
- rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+ admin = new DefaultMQAdminExt(new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey())));
+ } else {
+ admin = new DefaultMQAdminExt();
}
- DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
- defaultMQAdminExt.setNamesrvAddr(connectConfig.getNamesrvAddr());
- defaultMQAdminExt.setAdminExtGroup("connector-admin-group");
- defaultMQAdminExt.setInstanceName(ConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr()));
- defaultMQAdminExt.start();
- return defaultMQAdminExt;
+ admin.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ admin.setAdminExtGroup(connectConfig.getRmqConsumerGroup());
+ admin.setInstanceName(RocketMQConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr()));
+ admin.start();
+ return admin;
}
@@ -165,7 +150,7 @@ public class ConnectUtil {
}
}
- public static boolean isTopicExist(RocketMqConnectConfig connectConfig, String topic) {
+ public static boolean topicExist(RocketMqConnectConfig connectConfig, String topic) {
DefaultMQAdminExt defaultMQAdminExt = null;
boolean foundTopicRouteInfo = false;
try {
@@ -184,7 +169,7 @@ public class ConnectUtil {
return foundTopicRouteInfo;
}
- public static Set<String> fetchAllConsumerGroupList(RocketMqConnectConfig connectConfig) {
+ public static Set<String> fetchAllConsumerGroup(RocketMqConnectConfig connectConfig) {
Set<String> consumerGroupSet = Sets.newHashSet();
DefaultMQAdminExt defaultMQAdminExt = null;
try {
@@ -230,3 +215,5 @@ public class ConnectUtil {
return subGroup;
}
}
+
+
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
index 18397a6..580cb36 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
@@ -29,8 +29,6 @@ public class RocketMqConnectConfig {
private String namesrvAddr;
- private String rmqProducerGroup;
-
private int operationTimeout = 3000;
private String rmqConsumerGroup;
@@ -43,24 +41,18 @@ public class RocketMqConnectConfig {
private int rmqMinConsumeThreadNums = 1;
- private String adminExtGroup;
- // set acl config
+ /** set acl config **/
private boolean aclEnable;
private String accessKey;
private String secretKey;
- public RocketMqConnectConfig() {
- }
+ public RocketMqConnectConfig() {}
public RocketMqConnectConfig(Configuration config, String dbHistoryName) {
this.dbHistoryName = dbHistoryName;
- // init config
- this.rmqProducerGroup = this.dbHistoryName.concat("-producer-group");
- this.rmqConsumerGroup = this.dbHistoryName.concat("-consumer-group");
- this.adminExtGroup = this.dbHistoryName.concat("-admin-group");
-
+ this.rmqConsumerGroup = this.dbHistoryName.concat("-group");
// init rocketmq connection
this.namesrvAddr = config.getString(RocketMqDatabaseHistory.NAME_SRV_ADDR);
this.aclEnable = config.getBoolean(RocketMqDatabaseHistory.ROCKETMQ_ACL_ENABLE);
@@ -85,14 +77,6 @@ public class RocketMqConnectConfig {
this.namesrvAddr = namesrvAddr;
}
- public String getRmqProducerGroup() {
- return rmqProducerGroup;
- }
-
- public void setRmqProducerGroup(String rmqProducerGroup) {
- this.rmqProducerGroup = rmqProducerGroup;
- }
-
public int getOperationTimeout() {
return operationTimeout;
}
@@ -165,27 +149,17 @@ public class RocketMqConnectConfig {
this.secretKey = secretKey;
}
- public String getAdminExtGroup() {
- return adminExtGroup;
- }
-
- public void setAdminExtGroup(String adminExtGroup) {
- this.adminExtGroup = adminExtGroup;
- }
-
@Override
public String toString() {
return "RocketMqConnectConfig{" +
"dbHistoryName='" + dbHistoryName + '\'' +
", namesrvAddr='" + namesrvAddr + '\'' +
- ", rmqProducerGroup='" + rmqProducerGroup + '\'' +
", operationTimeout=" + operationTimeout +
", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
", rmqMaxRedeliveryTimes=" + rmqMaxRedeliveryTimes +
", rmqMessageConsumeTimeout=" + rmqMessageConsumeTimeout +
", rmqMaxConsumeThreadNums=" + rmqMaxConsumeThreadNums +
", rmqMinConsumeThreadNums=" + rmqMinConsumeThreadNums +
- ", adminExtGroup='" + adminExtGroup + '\'' +
", aclEnable=" + aclEnable +
", accessKey='" + accessKey + '\'' +
", secretKey='" + secretKey + '\'' +
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
index 11814b9..a8fb38d 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.connect.debezium;
-import com.alibaba.fastjson.JSON;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
@@ -28,10 +27,7 @@ import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import org.apache.kafka.common.config.ConfigDef;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
@@ -45,6 +41,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
@@ -53,7 +50,6 @@ import java.util.function.Consumer;
*/
public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
- private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.topic")
.withDisplayName("Database history topic name")
.withType(ConfigDef.Type.STRING)
@@ -61,7 +57,6 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("The name of the topic for the database schema history")
.withValidation(Field::isRequired);
-
/**
* rocketmq name srv addr
*/
@@ -72,7 +67,6 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Rocketmq name srv addr")
.withValidation(Field::isRequired);
-
public static final Field ROCKETMQ_ACL_ENABLE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "acl.enabled")
.withDisplayName("Rocketmq acl enabled")
.withType(ConfigDef.Type.BOOLEAN)
@@ -80,37 +74,27 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Rocketmq acl enabled");
-
public static final Field ROCKETMQ_ACCESS_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "access.key")
.withDisplayName("Rocketmq access key")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Rocketmq access key");
-
public static final Field ROCKETMQ_SECRET_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "secret.key")
.withDisplayName("Rocketmq secret key")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Rocketmq secret key");
-
- public static final Field CONNECTOR_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.name")
- .withDisplayName("Connector name")
- .withType(ConfigDef.Type.STRING)
- .withWidth(ConfigDef.Width.LONG)
- .withImportance(ConfigDef.Importance.HIGH)
- .withDescription("connector name")
- .withValidation(Field::isRequired);
-
-
+ private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
private final DocumentReader reader = DocumentReader.defaultReader();
private String topicName;
private String dbHistoryName;
private RocketMqConnectConfig connectConfig;
- private DefaultMQPushConsumer consumer;
private DefaultMQProducer producer;
+
+
@Override
public void configure(
Configuration config,
@@ -119,7 +103,6 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
this.topicName = config.getString(TOPIC);
-
this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
log.info("Configure to store the debezium database history {} to rocketmq topic {}",
dbHistoryName, topicName);
@@ -132,33 +115,24 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
super.initializeStorage();
log.info("try to create history topic: {}!", this.topicName);
TopicConfig topicConfig = new TopicConfig(this.topicName);
- ConnectUtil.createTopic(connectConfig, topicConfig);
+ RocketMQConnectUtil.createTopic(connectConfig, topicConfig);
}
@Override
public void start() {
super.start();
try {
- // init consumer
- this.consumer = ConnectUtil.initDefaultMQPushConsumer(connectConfig);
- Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
- if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
- ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+ Set<String> consumerGroupSet = RocketMQConnectUtil.fetchAllConsumerGroup(connectConfig);
+ if (!consumerGroupSet.contains(connectConfig.getRmqConsumerGroup())) {
+ RocketMQConnectUtil.createSubGroup(connectConfig, connectConfig.getRmqConsumerGroup());
}
- this.consumer.subscribe(this.topicName, "*");
- } catch (MQClientException e) {
- throw new DatabaseHistoryException(e);
- }
-
- this.producer = ConnectUtil.initDefaultMQProducer(connectConfig);
- try {
+ this.producer = RocketMQConnectUtil.initDefaultMQProducer(connectConfig);
this.producer.start();
} catch (MQClientException e) {
throw new DatabaseHistoryException(e);
}
}
-
@Override
public void stop() {
try {
@@ -166,10 +140,6 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
producer.shutdown();
this.producer = null;
}
- if (this.consumer != null) {
- this.consumer.shutdown();
- this.consumer = null;
- }
} catch (Exception pe) {
log.warn("Failed to closing rocketmq client", pe);
}
@@ -182,34 +152,37 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
*/
@Override
protected void recoverRecords(Consumer<HistoryRecord> records) {
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- msgs.forEach(message -> {
- try {
- HistoryRecord recordObj = new HistoryRecord(reader.read(message.getBody()));
- log.trace("Recovering database history: {}", recordObj);
- if (recordObj == null || !recordObj.isValid()) {
- log.warn("Skipping invalid database history record '{}'. " +
- "This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
- recordObj, topicName);
- } else {
- records.accept(recordObj);
- log.trace("Recovered database history: {}", recordObj);
- }
- } catch (IOException e) {
- throw new DatabaseHistoryException(e);
- }
- });
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
+ DefaultLitePullConsumer consumer = null;
try {
+ consumer = RocketMQConnectUtil.initDefaultLitePullConsumer(connectConfig, topicName, false);
consumer.start();
- } catch (MQClientException e) {
+ while (true) {
+ List<MessageExt> result = consumer.poll(10000);
+ if (result == null || result.isEmpty()) {
+ break;
+ }
+ for (MessageExt message : result) {
+ HistoryRecord recordObj = new HistoryRecord(reader.read(message.getBody()));
+ log.trace("Recovering database history: {}", recordObj);
+ if (recordObj == null || !recordObj.isValid()) {
+ log.warn("Skipping invalid database history record '{}'. " +
+ "This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
+ recordObj, topicName);
+ } else {
+ records.accept(recordObj);
+ log.trace("Recovered database history: {}", recordObj);
+ }
+ }
+ }
+ } catch (MQClientException ce) {
+ throw new DatabaseHistoryException(ce);
+ } catch (IOException e) {
throw new DatabaseHistoryException(e);
+ } finally {
+ if (consumer != null) {
+ consumer.shutdown();
+ }
}
- log.info("Consumer started.");
}
@Override
@@ -225,7 +198,7 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
try {
Message sourceMessage = new Message();
sourceMessage.setTopic(this.topicName);
- final byte[] messageBody = JSON.toJSONString(record).getBytes();
+ final byte[] messageBody = record.toString().getBytes();
sourceMessage.setBody(messageBody);
producer.send(sourceMessage);
} catch (Exception e) {
@@ -242,7 +215,7 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
@Override
public boolean storageExists() {
// check topic is exist
- return ConnectUtil.isTopicExist(connectConfig, this.topicName);
+ return RocketMQConnectUtil.topicExist(connectConfig, this.topicName);
}
@Override
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml
index 0c027d0..a3ee101 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-connect-debezium</artifactId>
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java
index 074146c..3b04344 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java
@@ -21,7 +21,6 @@ import io.openmessaging.connector.api.component.task.Task;
import org.apache.rocketmq.connect.debezium.DebeziumConnector;
-
/**
* debezium mysql connector
*/
@@ -30,6 +29,7 @@ public class DebeziumMysqlConnector extends DebeziumConnector {
/**
* Return the current connector class
+ *
* @return task implement class
*/
@Override
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
index a371d4d..5dac1dc 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
@@ -18,9 +18,9 @@
#
{
- "connector-class":"org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
- "max-task":"1",
- "connect-topicname":"debezium-mysql-source",
+ "connector-class": "org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
+ "max-task": "1",
+ "connect-topicname": "debezium-mysql-source",
"debezium.transforms": "Unwrap",
"debezium.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"debezium.transforms.Unwrap.delete.handling.mode": "drop",
@@ -42,5 +42,5 @@
"database.include.list": "db-01",
"snapshot.mode": "when_needed",
- "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
+ "source-record-converter": "org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml
index cd18267..c504adf 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-connect-debezium</artifactId>
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java
index ad051d4..a489df2 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.connect.debezium.oracle;
-import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.Task;
import org.apache.rocketmq.connect.debezium.DebeziumConnector;
@@ -30,6 +29,7 @@ public class DebeziumOracleConnector extends DebeziumConnector {
/**
* Return the current connector class
+ *
* @return task implement class
*/
@Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 319332a..f8986d1 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -119,7 +119,7 @@ public class Worker {
private final Plugin plugin;
- private static final int MAX_START_TIMEOUT_MILLS = 5000;
+ private static final int MAX_START_TIMEOUT_MILLS = 1000*60;
private static final long MAX_STOP_TIMEOUT_MILLS = 20000;
@@ -541,7 +541,7 @@ public class Worker {
try {
if (null != future) {
- future.get(1000, TimeUnit.MILLISECONDS);
+ future.get(1000 * 60, TimeUnit.MILLISECONDS);
} else {
log.error("[BUG] errorTasks reference not found in taskFutureMap");
}