You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/07/28 06:36:39 UTC
[rocketmq-connect] branch master updated: [ISSUE #207]RecordConverter support convert record key (#213)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 9bd9f89 [ISSUE #207]RecordConverter support convert record key (#213)
9bd9f89 is described below
commit 9bd9f8991708609cd40f2a9dbfe9a5da2e7d3b56
Author: xiaoyi <su...@163.com>
AuthorDate: Thu Jul 28 14:36:34 2022 +0800
[ISSUE #207]RecordConverter support convert record key (#213)
* Optimize WorkerSourceTask and WorkerSinkTask #183
* Optimize WorkerSourceTask and WorkerSinkTask #183
* Optimize worker #183
* fixed
* fixed
* format code
* fixed
* fixed WorkerTask #183
* Fix debezium demecial type conversion problem #190
* Upgrade rocketmq-replicator API to v0.1.3 #189
* Encountered change event for table databasename.tablename whose schema isn`t known to this connector #191
* Debezium mysql source connector delete event causes null pointer #196
* remove local config
* Debezium mysql source connector delete event causes null pointer #196
* Ensure the orderly submission of offset #183
* fixed
* Optimize WorkerDirectTask #183
* fixed #183
* test offset commit #183
* fixed #183
* fixed task id #183
* fixed
* upgrade fastjson
* reformat code #183
* Whether the same task is assigned to different worker nodes with different connectorTaskId [fixed]
* fixed
* Rocketmq replicator running null pointer #205
* RecordConverter support convert record key #207
* debezium record add key #207
* jdbc plugin support record key #207
* Jdbc connector no longer needs to serialize and deserialize ConnecRecord objects #209
* support record delete #209
* Fix test error
* test convert record key #207
* fixed
* fixed
* Update openmessaging connector version to 0.1.4
---
.../kafka-connect-adaptor/pom.xml | 4 -
.../kafka/connect/adaptor/schema/Converters.java | 52 ++++++++---
.../adaptor/schema/KafkaSinkValueConverter.java | 6 ++
.../schema/RocketMQSourceValueConverter.java | 6 ++
connectors/rocketmq-connect-debezium/pom.xml | 7 +-
.../rocketmq-connect-debezium-core/pom.xml | 4 -
connectors/rocketmq-connect-jdbc/pom.xml | 46 +++++-----
.../connect/jdbc/config/AbstractConfig.java | 2 +-
.../connect/jdbc/connector/JdbcSinkConfig.java | 10 ++-
.../jdbc/connector/JdbcSourceConnector.java | 2 +-
.../jdbc/dialect/PreparedStatementBinder.java | 23 +++--
.../connect/jdbc/sink/BufferedRecords.java | 31 ++++++-
.../connect/jdbc/sink/RecordValidator.java | 100 +++++++++++++++++++++
.../connect/jdbc/sink/metadata/FieldsMetadata.java | 64 +++++++++++++
.../connect/jdbc/sink/metadata/SchemaPair.java | 7 +-
.../jdbc/source/offset/SourceOffsetCompute.java | 3 +-
pom.xml | 2 +-
.../connect/runtime/DistributedConnectStartup.java | 2 +-
.../connect/runtime/StandaloneConnectStartup.java | 7 +-
.../connect/runtime/common/ConnectKeyValue.java | 22 +++++
.../runtime/config/RuntimeConfigDefine.java | 5 +-
.../runtime/connectorwrapper/TransformChain.java | 4 +-
.../connect/runtime/connectorwrapper/Worker.java | 72 ++++++++-------
.../runtime/connectorwrapper/WorkerDirectTask.java | 1 +
.../runtime/connectorwrapper/WorkerSinkTask.java | 52 +++++------
.../runtime/connectorwrapper/WorkerSourceTask.java | 37 ++++----
.../controller/AbstractConnectController.java | 2 +-
.../distributed/DistributedConnectController.java | 2 +-
.../{utils => controller/isolation}/Plugin.java | 21 ++++-
.../isolation}/PluginClassLoader.java | 2 +-
.../isolation}/PluginUtils.java | 2 +-
.../isolation}/PluginWrapper.java | 2 +-
.../standalone/StandaloneConnectController.java | 2 +-
.../runtime/converter/record/ConverterConfig.java | 36 ++++++++
.../runtime/converter/record/ConverterType.java | 60 +++++++++++++
.../runtime/service/ConfigManagementService.java | 2 +-
.../service/ConfigManagementServiceImpl.java | 6 +-
.../memory/MemoryConfigManagementServiceImpl.java | 5 +-
.../rocketmq/connect/runtime/utils/Base64Util.java | 52 +++++++++++
.../runtime/connectorwrapper/WorkerTest.java | 3 +-
.../connect/runtime/rest/RestHandlerTest.java | 6 +-
.../service/ConfigManagementServiceImplTest.java | 2 +-
42 files changed, 603 insertions(+), 173 deletions(-)
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
index b79d77b..4ee490b 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
@@ -39,10 +39,6 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-openmessaging</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
index 9c0c529..1311554 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
@@ -40,15 +40,25 @@ public class Converters {
public static ConnectRecord fromSourceRecord(SourceRecord record) {
// sourceRecord convert connect Record
- RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
- io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
+ RocketMQSourceSchemaConverter valueSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
+ io.openmessaging.connector.api.data.Schema valueSchema = valueSchemaConverter.schema();
+
+ io.openmessaging.connector.api.data.Schema keySchema = null;
+ if (record.keySchema() != null) {
+ RocketMQSourceSchemaConverter keySchemaConverter = new RocketMQSourceSchemaConverter(record.keySchema());
+ keySchema = keySchemaConverter.schema();
+ }
+
+
RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
ConnectRecord connectRecord = new ConnectRecord(
new RecordPartition(record.sourcePartition()),
new RecordOffset(record.sourceOffset()),
record.timestamp(),
- schema,
- rocketMQSourceValueConverter.value(schema, record.value()));
+ keySchema,
+ record.key() == null ? null : rocketMQSourceValueConverter.value(keySchema, record.key()),
+ valueSchema,
+ record.value() == null ? null : rocketMQSourceValueConverter.value(valueSchema, record.value()));
Iterator<Header> headers = record.headers().iterator();
while (headers.hasNext()) {
Header header = headers.next();
@@ -60,15 +70,24 @@ public class Converters {
public static ConnectRecord fromSinkRecord(SinkRecord record) {
// sourceRecord convert connect Record
- RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
- io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
+ RocketMQSourceSchemaConverter valueSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
+ io.openmessaging.connector.api.data.Schema valueSchema = valueSchemaConverter.schema();
+
+ io.openmessaging.connector.api.data.Schema keySchema = null;
+ if (record.keySchema() != null) {
+ RocketMQSourceSchemaConverter keySchemaConverter = new RocketMQSourceSchemaConverter(record.keySchema());
+ keySchema = keySchemaConverter.schema();
+ }
+
RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
ConnectRecord connectRecord = new ConnectRecord(
toRecordPartition(record),
toRecordOffset(record),
record.timestamp(),
- schema,
- rocketMQSourceValueConverter.value(schema, record.value()));
+ keySchema,
+ record.key() == null ? null : rocketMQSourceValueConverter.value(keySchema, record.key()),
+ valueSchema,
+ record.value() == null ? null :rocketMQSourceValueConverter.value(valueSchema, record.value()));
Iterator<Header> headers = record.headers().iterator();
while (headers.hasNext()) {
Header header = headers.next();
@@ -86,8 +105,15 @@ public class Converters {
*/
public static SinkRecord fromConnectRecord(ConnectRecord record) {
// connect record convert kafka sink record
- KafkaSinkSchemaConverter kafkaSinkSchemaConverter = new KafkaSinkSchemaConverter(record.getSchema());
- Schema schema = kafkaSinkSchemaConverter.schema();
+ KafkaSinkSchemaConverter valueSchemaConverter = new KafkaSinkSchemaConverter(record.getSchema());
+ Schema schema = valueSchemaConverter.schema();
+ // key converter
+ Schema keySchema = null;
+ if (record.getKeySchema() != null){
+ KafkaSinkSchemaConverter keySchemaConverter = new KafkaSinkSchemaConverter(record.getKeySchema());
+ keySchema = keySchemaConverter.schema();
+ }
+
KafkaSinkValueConverter sinkValueConverter = new KafkaSinkValueConverter();
// add headers
Headers headers = new ConnectHeaders();
@@ -100,10 +126,10 @@ public class Converters {
SinkRecord sinkRecord = new SinkRecord(
topic(record.getPosition().getPartition()),
partition(record.getPosition().getPartition()),
- null,
- null,
+ keySchema,
+ record.getKey() == null ? null : sinkValueConverter.value(keySchema, record.getKey()),
schema,
- sinkValueConverter.value(schema, record.getData()),
+ record.getData() == null ? null : sinkValueConverter.value(schema, record.getData()),
offset(record.getPosition().getOffset()),
record.getTimestamp(),
TimestampType.NO_TIMESTAMP_TYPE,
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
index 7b37f59..de986bd 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
@@ -47,6 +47,12 @@ public class KafkaSinkValueConverter {
* @return
*/
private Object convertKafkaValue(Schema targetSchema, Object originalValue) {
+ if (targetSchema == null) {
+ if (originalValue == null) {
+ return null;
+ }
+ return originalValue;
+ }
switch (targetSchema.type()) {
case INT8:
case INT16:
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
index 40d6178..2934564 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
@@ -49,6 +49,12 @@ public class RocketMQSourceValueConverter {
* @return
*/
private Object convertKafkaValue(Schema targetSchema, Object originalValue) {
+ if (targetSchema == null) {
+ if (originalValue == null) {
+ return null;
+ }
+ return originalValue;
+ }
switch (targetSchema.getFieldType()) {
case INT8:
case INT16:
diff --git a/connectors/rocketmq-connect-debezium/pom.xml b/connectors/rocketmq-connect-debezium/pom.xml
index 0715731..5f0e2ec 100644
--- a/connectors/rocketmq-connect-debezium/pom.xml
+++ b/connectors/rocketmq-connect-debezium/pom.xml
@@ -184,7 +184,7 @@
<rocketmq-openmessaging.version>4.3.2</rocketmq-openmessaging.version>
<!--rocketmq connect version-->
- <openmessaging-connector.version>0.1.3</openmessaging-connector.version>
+ <openmessaging-connector.version>0.1.4</openmessaging-connector.version>
<openmessaging-api.version>0.3.1-alpha</openmessaging-api.version>
</properties>
@@ -231,11 +231,6 @@
<artifactId>rocketmq-remoting</artifactId>
<version>${rocketmq.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-openmessaging</artifactId>
- <version>${rocketmq-openmessaging.version}</version>
- </dependency>
<!--kafka && kafka connect-->
<dependency>
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
index 121e611..50cb6b9 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
@@ -68,10 +68,6 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-openmessaging</artifactId>
- </dependency>
<dependency>
<groupId>io.debezium</groupId>
diff --git a/connectors/rocketmq-connect-jdbc/pom.xml b/connectors/rocketmq-connect-jdbc/pom.xml
index 7617043..097bb3a 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -47,7 +47,7 @@
<assertj.version>2.6.0</assertj.version>
<mockito.version>2.6.3</mockito.version>
<!--rocket connect api-->
- <openmessaging-connector.version>0.1.3</openmessaging-connector.version>
+ <openmessaging-connector.version>0.1.4</openmessaging-connector.version>
<openmessaging-api.version>0.3.1-alpha</openmessaging-api.version>
<!--fast json-->
@@ -58,28 +58,34 @@
<dependencies>
- <!---->
<dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>${rocketmq.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-tools</artifactId>
- <version>${rocketmq.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-remoting</artifactId>
- <version>${rocketmq.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-openmessaging</artifactId>
- <version>${rocketmq.version}</version>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.12.0</version>
</dependency>
+<!-- <dependency>-->
+<!-- <groupId>org.apache.rocketmq</groupId>-->
+<!-- <artifactId>rocketmq-client</artifactId>-->
+<!-- <version>${rocketmq.version}</version>-->
+<!-- </dependency>-->
+<!-- <dependency>-->
+<!-- <groupId>org.apache.rocketmq</groupId>-->
+<!-- <artifactId>rocketmq-tools</artifactId>-->
+<!-- <version>${rocketmq.version}</version>-->
+<!-- </dependency>-->
+<!-- <dependency>-->
+<!-- <groupId>org.apache.rocketmq</groupId>-->
+<!-- <artifactId>rocketmq-remoting</artifactId>-->
+<!-- <version>${rocketmq.version}</version>-->
+<!-- </dependency>-->
+<!-- <dependency>-->
+<!-- <groupId>org.apache.rocketmq</groupId>-->
+<!-- <artifactId>rocketmq-openmessaging</artifactId>-->
+<!-- <version>${rocketmq.version}</version>-->
+<!-- </dependency>-->
+
+
<!--rocketmq connect api-->
<dependency>
<groupId>io.openmessaging</groupId>
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
index 9d53fdc..3d7c52d 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.connect.jdbc.config;
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
import io.openmessaging.KeyValue;
import org.apache.rocketmq.connect.jdbc.util.QuoteMethod;
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConfig.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConfig.java
index 7c1f0b3..481bd9a 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConfig.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConfig.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.jdbc.connector;
import io.openmessaging.KeyValue;
import org.apache.rocketmq.connect.jdbc.config.AbstractConfig;
import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
+import org.apache.rocketmq.connect.jdbc.exception.ConfigException;
import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
import org.apache.rocketmq.connect.jdbc.util.TableType;
@@ -187,7 +188,7 @@ public class JdbcSinkConfig extends AbstractConfig {
tableNameFormat = config.getString(TABLE_NAME_FORMAT, TABLE_NAME_FORMAT_DEFAULT).trim();
tableFromHeader = getBoolean(config, TABLE_NAME_FROM_HEADER, false);
batchSize = config.getInt(BATCH_SIZE, BATCH_SIZE_DEFAULT);
- deleteEnabled = getBoolean(config, DELETE_ENABLED, DELETE_ENABLED_DEFAULT);
+
maxRetries = config.getInt(MAX_RETRIES, MAX_RETRIES_DEFAULT);
retryBackoffMs = config.getInt(RETRY_BACKOFF_MS, RETRY_BACKOFF_MS_DEFAULT);
autoCreate = getBoolean(config, AUTO_CREATE, AUTO_CREATE_DEFAULT);
@@ -195,9 +196,13 @@ public class JdbcSinkConfig extends AbstractConfig {
if (Objects.nonNull(config.getString(INSERT_MODE))) {
insertMode = InsertMode.valueOf(config.getString(INSERT_MODE, INSERT_MODE_DEFAULT).toUpperCase());
}
-
+ deleteEnabled = getBoolean(config, DELETE_ENABLED, DELETE_ENABLED_DEFAULT);
pkMode = PrimaryKeyMode.valueOf(config.getString(PK_MODE, PK_MODE_DEFAULT).toUpperCase());
pkFields = getList(config, PK_FIELDS);
+ if (deleteEnabled && pkMode != PrimaryKeyMode.RECORD_KEY) {
+ throw new ConfigException(
+ "Primary key mode must be 'record_key' when delete support is enabled");
+ }
dialectName = config.getString(DIALECT_NAME_CONFIG);
fieldsWhitelist = new HashSet<>(getList(config, FIELDS_WHITELIST));
// table white list
@@ -206,6 +211,7 @@ public class JdbcSinkConfig extends AbstractConfig {
timeZone = TimeZone.getTimeZone(ZoneId.of(dbTimeZone));
tableTypes = TableType.parse(getList(config, TABLE_TYPES_CONFIG, TABLE_TYPES_DEFAULT));
+
}
public String getTableNameFormat() {
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index 4ad9e4f..4a2efde 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.connect.jdbc.connector;
import java.util.List;
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.source.SourceConnector;
import io.openmessaging.internal.DefaultKeyValue;
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
index ddd5532..0443756 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.connect.jdbc.dialect;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.Schema;
@@ -98,10 +96,21 @@ public class PreparedStatementBinder implements DatabaseDialect.StatementBinder
throw new AssertionError();
}
break;
+ case RECORD_KEY: {
+ if (schemaPair.keySchema.getFieldType().isPrimitive()) {
+ assert fieldsMetadata.keyFieldNames.size() == 1;
+ bindField(index++, schemaPair.keySchema, record.getKey(),
+ fieldsMetadata.keyFieldNames.iterator().next());
+ } else {
+ for (String fieldName : fieldsMetadata.keyFieldNames) {
+ final Field field = schemaPair.keySchema.getField(fieldName);
+ bindField(index++, field.getSchema(), ((Struct) record.getKey()).get(field), fieldName);
+ }
+ }
+ }
+ break;
case RECORD_VALUE: {
- String jsonData = JSON.toJSONString(record.getData(), SerializerFeature.DisableCircularReferenceDetect);
- Struct struct = JSON.parseObject(jsonData, Struct.class);
- struct.setValues(JSON.parseObject(jsonData).getJSONArray("values").toArray());
+ Struct struct = (Struct) record.getData();
for (String fieldName : fieldsMetadata.keyFieldNames) {
final Field field = schemaPair.schema.getField(fieldName);
bindField(index++, field.getSchema(), struct.get(fieldName), fieldName);
@@ -118,9 +127,7 @@ public class PreparedStatementBinder implements DatabaseDialect.StatementBinder
ConnectRecord record,
int index
) throws SQLException {
- String jsonData = JSON.toJSONString(record.getData(), SerializerFeature.DisableCircularReferenceDetect);
- Struct struct = JSON.parseObject(jsonData, Struct.class);
- struct.setValues(JSON.parseObject(jsonData).getJSONArray("values").toArray());
+ Struct struct = (Struct) record.getData();
for (final String fieldName : fieldsMetadata.nonKeyFieldNames) {
final Field field = record.getSchema().getField(fieldName);
bindField(index++, field.getSchema(), struct.get(fieldName), fieldName);
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
index 3ad19be..88e058f 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
@@ -57,12 +57,15 @@ public class BufferedRecords {
private final Connection connection;
private List<ConnectRecord> records = new ArrayList<>();
+ private Schema keySchema;
private Schema schema;
private FieldsMetadata fieldsMetadata;
+ private RecordValidator recordValidator;
private PreparedStatement updatePreparedStatement;
private PreparedStatement deletePreparedStatement;
private DatabaseDialect.StatementBinder updateStatementBinder;
private DatabaseDialect.StatementBinder deleteStatementBinder;
+ private boolean deletesInBatch = false;
public BufferedRecords(
JdbcSinkConfig config,
@@ -76,6 +79,7 @@ public class BufferedRecords {
this.dbDialect = dbDialect;
this.dbStructure = dbStructure;
this.connection = connection;
+ this.recordValidator = RecordValidator.create(config);
}
/**
@@ -86,10 +90,27 @@ public class BufferedRecords {
* @throws SQLException
*/
public List<ConnectRecord> add(ConnectRecord record) throws SQLException {
+ recordValidator.validate(record);
final List<ConnectRecord> flushed = new ArrayList<>();
boolean schemaChanged = false;
- // check and update schema
- if (!Objects.equals(schema, record.getSchema())) {
+ if (!Objects.equals(keySchema, record.getKeySchema())) {
+ keySchema = record.getKeySchema();
+ schemaChanged = true;
+ }
+ if (isNull(record.getSchema())) {
+ // For deletes, value and optionally value schema come in as null.
+ // We don't want to treat this as a schema change if key schemas is the same
+ // otherwise we flush unnecessarily.
+ if (config.isDeleteEnabled()) {
+ deletesInBatch = true;
+ }
+ } else if (Objects.equals(schema, record.getSchema())) {
+ if (config.isDeleteEnabled() && deletesInBatch) {
+ // flush so an insert after a delete of same record isn't lost
+ flushed.addAll(flush());
+ }
+ } else {
+ // value schema is not null and has changed. This is a real schema change.
schema = record.getSchema();
schemaChanged = true;
}
@@ -99,6 +120,7 @@ public class BufferedRecords {
flushed.addAll(flush());
// re-initialize everything that depends on the record schema
final SchemaPair schemaPair = new SchemaPair(
+ record.getKeySchema(),
record.getSchema(),
record.getExtensions()
);
@@ -149,6 +171,11 @@ public class BufferedRecords {
}
}
+ // set deletesInBatch if schema value is not null
+ if (isNull(record.getData()) && config.isDeleteEnabled()) {
+ deletesInBatch = true;
+ }
+
records.add(record);
if (records.size() >= config.getBatchSize()) {
flushed.addAll(flush());
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/RecordValidator.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/RecordValidator.java
new file mode 100644
index 0000000..03c9f55
--- /dev/null
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/RecordValidator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.sink;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
+
+@FunctionalInterface
+public interface RecordValidator {
+
+ RecordValidator NO_OP = (record) -> { };
+
+ void validate(ConnectRecord record);
+
+ default RecordValidator and(RecordValidator other) {
+ if (other == null || other == NO_OP || other == this) {
+ return this;
+ }
+ if (this == NO_OP) {
+ return other;
+ }
+ RecordValidator thisValidator = this;
+ return (record) -> {
+ thisValidator.validate(record);
+ other.validate(record);
+ };
+ }
+
+ static RecordValidator create(JdbcSinkConfig config) {
+ RecordValidator requiresKey = requiresKey(config);
+ RecordValidator requiresValue = requiresValue(config);
+
+ RecordValidator keyValidator = NO_OP;
+ RecordValidator valueValidator = NO_OP;
+ switch (config.pkMode) {
+ case RECORD_KEY:
+ keyValidator = keyValidator.and(requiresKey);
+ break;
+ case RECORD_VALUE:
+ case NONE:
+ valueValidator = valueValidator.and(requiresValue);
+ break;
+ default:
+ // no primary key is required
+ break;
+ }
+
+ if (config.isDeleteEnabled()) {
+ // When delete is enabled, we need a key
+ keyValidator = keyValidator.and(requiresKey);
+ } else {
+ // When delete is disabled, we need non-tombstone values
+ valueValidator = valueValidator.and(requiresValue);
+ }
+
+ // Compose the validator that may or may be NO_OP
+ return keyValidator.and(valueValidator);
+ }
+
+ static RecordValidator requiresValue(JdbcSinkConfig config) {
+ return record -> {
+ Schema valueSchema = record.getSchema();
+ if (record.getData() != null
+ && valueSchema != null
+ && valueSchema.getFieldType() == FieldType.STRUCT) {
+ return;
+ }
+ throw new ConnectException(record.toString());
+ };
+ }
+
+ static RecordValidator requiresKey(JdbcSinkConfig config) {
+ return record -> {
+ Schema keySchema = record.getKeySchema();
+ if (record.getKey() != null
+ && keySchema != null
+ && (keySchema.getFieldType() == FieldType.STRUCT || keySchema.getFieldType().isPrimitive())) {
+ return;
+ }
+ throw new ConnectException(record.toString());
+ };
+ }
+}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/FieldsMetadata.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/FieldsMetadata.java
index d185218..88d3810 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/FieldsMetadata.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/FieldsMetadata.java
@@ -74,6 +74,7 @@ public class FieldsMetadata {
pkMode,
configuredPkFields,
fieldsWhitelist,
+ schemaPair.keySchema,
schemaPair.schema,
schemaPair.extensions
);
@@ -95,6 +96,7 @@ public class FieldsMetadata {
final JdbcSinkConfig.PrimaryKeyMode pkMode,
final List<String> configuredPkFields,
final Set<String> fieldsWhitelist,
+ final Schema keySchema,
final Schema schema,
final KeyValue headers
) {
@@ -106,6 +108,9 @@ public class FieldsMetadata {
switch (pkMode) {
case NONE:
break;
+ case RECORD_KEY:
+ extractRecordKeyPk(tableName, configuredPkFields, keySchema, allFields, keyFieldNames);
+ break;
case RECORD_VALUE:
extractRecordValuePk(tableName, configuredPkFields, schema, headers, allFields, keyFieldNames);
break;
@@ -157,6 +162,65 @@ public class FieldsMetadata {
return new FieldsMetadata(keyFieldNames, nonKeyFieldNames, allFieldsOrdered);
}
+ private static void extractRecordKeyPk(
+ final String tableName,
+ final List<String> configuredPkFields,
+ final Schema keySchema,
+ final Map<String, SinkRecordField> allFields,
+ final Set<String> keyFieldNames
+ ) {
+ if (keySchema == null) {
+ throw new ConnectException(String.format(
+ "PK mode for table '%s' is %s, but record key schema is missing",
+ tableName,
+ JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY
+ ));
+ }
+ final FieldType keySchemaType = keySchema.getFieldType();
+ switch (keySchemaType){
+ case STRUCT:
+ if (configuredPkFields.isEmpty()) {
+ keySchema.getFields().forEach(keyField->{
+ keyFieldNames.add(keyField.getName());
+ });
+ } else {
+ for (String fieldName : configuredPkFields) {
+ final Field keyField = keySchema.getField(fieldName);
+ if (keyField == null) {
+ throw new ConnectException(String.format(
+ "PK mode for table '%s' is %s with configured PK fields %s, but record key "
+ + "schema does not contain field: %s",
+ tableName, JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY, configuredPkFields, fieldName
+ ));
+ }
+ }
+ keyFieldNames.addAll(configuredPkFields);
+ }
+ for (String fieldName : keyFieldNames) {
+ final Schema fieldSchema = keySchema.getField(fieldName).getSchema();
+ allFields.put(fieldName, new SinkRecordField(fieldSchema, fieldName, true));
+ }
+ break;
+ default:
+ // todo 基本数据类型,必须指定主键字段,因为基本类型不带schema
+ if (keySchemaType.isPrimitive()) {
+ if (configuredPkFields.size() != 1) {
+ throw new ConnectException(String.format(
+ "Need exactly one PK column defined since the key schema for records is a "
+ + "primitive type, defined columns are: %s",
+ configuredPkFields
+ ));
+ }
+ final String fieldName = configuredPkFields.get(0);
+ keyFieldNames.add(fieldName);
+ allFields.put(fieldName, new SinkRecordField(keySchema, fieldName, true));
+ } else {
+ throw new ConnectException(
+ "Key schema must be primitive type or Struct, but is of type: " + keySchemaType
+ );
+ }
+ }
+ }
/**
* record value
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/SchemaPair.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/SchemaPair.java
index 77955d7..f2196f4 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/SchemaPair.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/SchemaPair.java
@@ -25,15 +25,18 @@ import java.util.Objects;
* schema pair
*/
public class SchemaPair {
+ public final Schema keySchema;
public final Schema schema;
public final KeyValue extensions;
- public SchemaPair(Schema schema) {
+ public SchemaPair(Schema keySchema, Schema schema) {
+ this.keySchema = keySchema;
this.schema = schema;
this.extensions = null;
}
- public SchemaPair(Schema schema, KeyValue extensions) {
+ public SchemaPair(Schema keySchema, Schema schema, KeyValue extensions) {
+ this.keySchema = keySchema;
this.schema = schema;
this.extensions = extensions;
}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
index 0ae2f2c..359e129 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
@@ -16,7 +16,8 @@
*/
package org.apache.rocketmq.connect.jdbc.source.offset;
-import com.beust.jcommander.internal.Maps;
+
+import com.google.common.collect.Maps;
import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
diff --git a/pom.xml b/pom.xml
index 64b719a..f4bd51b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,7 @@
<assertj.version>2.6.0</assertj.version>
<mockito.version>3.2.4</mockito.version>
<httpclient.version>4.5.13</httpclient.version>
- <openmessaging.connector.version>0.1.4-SNAPSHOT</openmessaging.connector.version>
+ <openmessaging.connector.version>0.1.4</openmessaging.connector.version>
<fastjson.version>1.2.83</fastjson.version>
<javalin.version>2.8.0</javalin.version>
<slf4j.version>1.7.7</slf4j.version>
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
index 2f0dcda..3e958d5 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
@@ -40,7 +40,7 @@ import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.service.StagingMode;
import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
import org.apache.rocketmq.connect.runtime.utils.ServiceProviderUtil;
import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
index 2770265..c7adad2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.connect.runtime;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.joran.JoranConfigurator;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
@@ -26,6 +24,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -40,7 +41,7 @@ import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.service.StagingMode;
import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
import org.apache.rocketmq.connect.runtime.utils.ServiceProviderUtil;
import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
index dc8e962..f64b9e9 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
@@ -125,6 +125,26 @@ public class ConnectKeyValue implements KeyValue, Serializable {
this.properties = properties;
}
+ /**
+ * Gets all original settings with the given prefix.
+ * @param prefix the prefix to use as a filter
+ * @param strip strip the prefix before adding to the output if set true
+ * @return a Map containing the settings with the prefix
+ */
+ public Map<String, String> originalsWithPrefix(String prefix, boolean strip) {
+ Map<String, String> result = new ConcurrentHashMap<>();
+ for (Map.Entry<String, String> entry : this.properties.entrySet()) {
+ if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
+ if (strip) {
+ result.put(entry.getKey().substring(prefix.length()), entry.getValue());
+ } else {
+ result.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ return result;
+ }
+
@Override
public boolean equals(Object obj) {
@@ -145,4 +165,6 @@ public class ConnectKeyValue implements KeyValue, Serializable {
"properties=" + properties +
'}';
}
+
+
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
index d7a13e2..7110e6e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
@@ -61,7 +61,10 @@ public class RuntimeConfigDefine {
/**
* The full class name of record converter. Which is used to parse {@link ConnectRecord} to/from byte[].
*/
- public static final String SOURCE_RECORD_CONVERTER = "source-record-converter";
+ public static final String SOURCE_RECORD_CONVERTER = "value-converter";
+ public static final String SOURCE_RECORD_CONVERTER_DEFAULT = "org.apache.rocketmq.connect.runtime.converter.record.StringConverter";
+ public static final String SOURCE_RECORD_KEY_CONVERTER = "key-converter";
+ public static final String SOURCE_RECORD_KEY_CONVERTER_DEFAULT = "org.apache.rocketmq.connect.runtime.converter.record.StringConverter";
public static final String NAMESRV_ADDR = "namesrv-addr";
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index df09f36..ef3d264 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -29,8 +29,8 @@ import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.errors.ErrorReporter;
import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
-import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index dec3e88..cbfd271 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -27,7 +27,6 @@ import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordConverter;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
@@ -35,6 +34,8 @@ import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
+import org.apache.rocketmq.connect.runtime.converter.record.ConverterConfig;
+import org.apache.rocketmq.connect.runtime.converter.record.ConverterType;
import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
@@ -44,8 +45,8 @@ import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
-import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
import org.apache.rocketmq.connect.runtime.utils.ServiceThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +66,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -553,32 +553,38 @@ public class Worker {
createDirectTask(id, keyValue);
continue;
}
+
+
+ ClassLoader savedLoader = plugin.currentThreadLoader();
try {
- String taskClass = keyValue.getString(RuntimeConfigDefine.TASK_CLASS);
- ClassLoader loader = plugin.getPluginClassLoader(taskClass);
- final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
- Class taskClazz;
- boolean isolationFlag = false;
- if (loader instanceof PluginClassLoader) {
- taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
- isolationFlag = true;
- } else {
- taskClazz = Class.forName(taskClass);
- }
- final Task task = (Task) taskClazz.getDeclaredConstructor().newInstance();
- final String converterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER);
- RecordConverter recordConverter = null;
- if (StringUtils.isNotEmpty(converterClazzName)) {
- recordConverter = Class.forName(converterClazzName)
- .asSubclass(RecordConverter.class)
- .getDeclaredConstructor()
- .newInstance();
- recordConverter.configure(keyValue.getProperties());
- }
- if (isolationFlag) {
- Plugin.compareAndSwapLoaders(loader);
- }
+ String connType = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
+ ClassLoader connectorLoader = plugin.getPluginClassLoader(connType);
+ savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);
+
+ final Class<? extends Task> taskClass = plugin.currentThreadLoader().loadClass(keyValue.getString(RuntimeConfigDefine.TASK_CLASS)).asSubclass(Task.class);
+
+ final Task task = plugin.newTask(taskClass);
+
+ final String valueConverterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER_DEFAULT);
+ final String keyConverterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER_DEFAULT);
+ // new stance
+ RecordConverter valueConverter = Class.forName(valueConverterClazzName).asSubclass(RecordConverter.class).getDeclaredConstructor().newInstance();
+ RecordConverter keyConverter = Class.forName(keyConverterClazzName).asSubclass(RecordConverter.class).getDeclaredConstructor().newInstance();
+
+ //value config
+ Map<String, String> valueConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER,true);
+ valueConverterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
+ valueConverter.configure(valueConverterConfig);
+
+ //key config
+ Map<String, String> keyConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, true);
+ keyConverterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName());
+ keyConverter.configure(keyConverterConfig);
+
+// if (isolationFlag) {
+// Plugin.compareAndSwapLoaders(loader);
+// }
if (task instanceof SourceTask) {
DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig);
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
@@ -587,8 +593,8 @@ public class Worker {
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(connectorName, keyValue));
WorkerSourceTask workerSourceTask = new WorkerSourceTask(workerConfig, id,
- (SourceTask) task, loader, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
- Plugin.compareAndSwapLoaders(currentThreadLoader);
+ (SourceTask) task, savedLoader, keyValue, positionManagementService, keyConverter, valueConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
+
Future future = taskExecutor.submit(workerSourceTask);
// schedule offset committer
sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerSourceTask));
@@ -609,13 +615,13 @@ public class Worker {
retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, workerConfig));
WorkerSinkTask workerSinkTask = new WorkerSinkTask(workerConfig, id,
- (SinkTask) task, loader, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
- retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, recordConverter));
- Plugin.compareAndSwapLoaders(currentThreadLoader);
+ (SinkTask) task, savedLoader, keyValue, keyConverter, valueConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
+ retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, valueConverter));
Future future = taskExecutor.submit(workerSinkTask);
taskToFutureMap.put(workerSinkTask, future);
this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
}
+ Plugin.compareAndSwapLoaders(savedLoader);
} catch (Exception e) {
log.error("start worker task exception. config {}" + JSON.toJSONString(keyValue), e);
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index ce56766..0648e90 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -84,6 +84,7 @@ public class WorkerDirectTask extends WorkerSourceTask {
positionManagementService,
null,
null,
+ null,
workerState,
connectStatsManager,
connectStatsService,
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 70fb7bc..cf286aa 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -50,13 +50,13 @@ import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.errors.WorkerErrorRecordReporter;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.Base64Util;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -71,6 +71,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+
/**
* A wrapper of {@link SinkTask} for runtime.
*/
@@ -90,7 +91,8 @@ public class WorkerSinkTask extends WorkerTask {
/**
* A converter to parse sink data entry to object.
*/
- private RecordConverter recordConverter;
+ private RecordConverter keyConverter;
+ private RecordConverter valueConverter;
private final ConcurrentHashMap<MessageQueue, Long> messageQueuesOffsetMap;
@@ -150,7 +152,8 @@ public class WorkerSinkTask extends WorkerTask {
SinkTask sinkTask,
ClassLoader classLoader,
ConnectKeyValue taskConfig,
- RecordConverter recordConverter,
+ RecordConverter keyConverter,
+ RecordConverter valueConverter,
DefaultMQPullConsumer consumer,
AtomicReference<WorkerState> workerState,
ConnectStatsManager connectStatsManager,
@@ -161,7 +164,8 @@ public class WorkerSinkTask extends WorkerTask {
super(workerConfig, id, classLoader, taskConfig, retryWithToleranceOperator, transformChain, workerState);
this.sinkTask = sinkTask;
this.consumer = consumer;
- this.recordConverter = recordConverter;
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
this.connectStatsManager = connectStatsManager;
@@ -437,28 +441,24 @@ public class WorkerSinkTask extends WorkerTask {
private ConnectRecord convertMessages(MessageExt message) {
Map<String, String> properties = message.getProperties();
- ConnectRecord record;
- // start convert
- if (recordConverter == null) {
- final byte[] messageBody = message.getBody();
- String s = new String(messageBody);
- record = JSON.parseObject(s, ConnectRecord.class);
- } else {
- // timestamp
- String connectTimestamp = properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
- Long timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
-
- // partition and offset
- RecordPartition recordPartition = ConnectUtil.convertToRecordPartition(message.getTopic(), message.getBrokerName(), message.getQueueId());
- RecordOffset recordOffset = ConnectUtil.convertToRecordOffset(message.getQueueOffset());
-
- // convert
- SchemaAndValue schemaAndValue = retryWithToleranceOperator.execute(() -> recordConverter.toConnectData(message.getTopic(), message.getBody()),
- ErrorReporter.Stage.CONVERTER, recordConverter.getClass());
- record = new ConnectRecord(recordPartition, recordOffset, timestamp, schemaAndValue.schema(), schemaAndValue.value());
- if (retryWithToleranceOperator.failed()) {
- return null;
- }
+ // timestamp
+ String connectTimestamp = properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
+ Long timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
+
+ // partition and offset
+ RecordPartition recordPartition = ConnectUtil.convertToRecordPartition(message.getTopic(), message.getBrokerName(), message.getQueueId());
+ RecordOffset recordOffset = ConnectUtil.convertToRecordOffset(message.getQueueOffset());
+
+
+ SchemaAndValue schemaAndKey = retryWithToleranceOperator.execute(() -> keyConverter.toConnectData(message.getTopic(), Base64Util.base64Decode(message.getKeys())),
+ ErrorReporter.Stage.CONVERTER, keyConverter.getClass());
+
+ // convert value
+ SchemaAndValue schemaAndValue = retryWithToleranceOperator.execute(() -> valueConverter.toConnectData(message.getTopic(), message.getBody()),
+ ErrorReporter.Stage.CONVERTER, valueConverter.getClass());
+ ConnectRecord record = new ConnectRecord(recordPartition, recordOffset, timestamp, schemaAndKey.schema(), schemaAndKey.value(), schemaAndValue.schema(), schemaAndValue.value());
+ if (retryWithToleranceOperator.failed()) {
+ return null;
}
// Apply the transformations
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 746c871..0a26faa 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -19,7 +19,6 @@
package org.apache.rocketmq.connect.runtime.connectorwrapper;
import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.data.ConnectRecord;
@@ -51,13 +50,13 @@ import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
+import org.apache.rocketmq.connect.runtime.utils.Base64Util;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -105,7 +104,8 @@ public class WorkerSourceTask extends WorkerTask {
/**
* A converter to parse source data entry to byte[].
*/
- private final RecordConverter recordConverter;
+ private final RecordConverter keyConverter;
+ private final RecordConverter valueConverter;
/**
* stat connect
@@ -136,7 +136,8 @@ public class WorkerSourceTask extends WorkerTask {
ClassLoader classLoader,
ConnectKeyValue taskConfig,
PositionManagementService positionManagementService,
- RecordConverter recordConverter,
+ RecordConverter keyConverter,
+ RecordConverter valueConverter,
DefaultMQProducer producer,
AtomicReference<WorkerState> workerState,
ConnectStatsManager connectStatsManager,
@@ -149,7 +150,8 @@ public class WorkerSourceTask extends WorkerTask {
this.offsetStorageReader = new PositionStorageReaderImpl(id.connector(), positionManagementService);
this.positionStorageWriter = new PositionStorageWriter(id.connector(), positionManagementService);
this.producer = producer;
- this.recordConverter = recordConverter;
+ this.valueConverter = valueConverter;
+ this.keyConverter = keyConverter;
this.connectStatsManager = connectStatsManager;
this.connectStatsService = connectStatsService;
this.sourceTaskContext = new WorkerSourceTaskContext(offsetStorageReader, this, taskConfig);
@@ -356,22 +358,16 @@ public class WorkerSourceTask extends WorkerTask {
}
Message sourceMessage = new Message();
sourceMessage.setTopic(topic);
- // converter
- if (recordConverter == null) {
- final byte[] messageBody = JSON.toJSONString(record, SerializerFeature.DisableCircularReferenceDetect, SerializerFeature.WriteMapNullValue).getBytes();
- if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
- log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
- }
- sourceMessage.setBody(messageBody);
- } else {
- byte[] messageBody = retryWithToleranceOperator.execute(() -> recordConverter.fromConnectData(topic, record.getSchema(), record.getData()),
- ErrorReporter.Stage.CONVERTER, recordConverter.getClass());
- if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
- log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
- }
- sourceMessage.setBody(messageBody);
- }
+ byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(topic, record.getKeySchema(), record.getKey()),
+ ErrorReporter.Stage.CONVERTER, keyConverter.getClass());
+ byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(topic, record.getSchema(), record.getData()),
+ ErrorReporter.Stage.CONVERTER, valueConverter.getClass());
+ if (value.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
+ log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
+ }
+ sourceMessage.setKeys(Base64Util.base64Encode(key));
+ sourceMessage.setBody(value);
if (retryWithToleranceOperator.failed()) {
return null;
}
@@ -588,7 +584,6 @@ public class WorkerSourceTask extends WorkerTask {
positionStorageWriter.cancelFlush();
return false;
}
-
long durationMillis = System.currentTimeMillis() - started;
log.debug("{} Finished commitOffsets successfully in {} ms",
this, durationMillis);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
index 84128ae..7ca5a43 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
@@ -26,7 +26,7 @@ import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
index 403cf30..a6f9d47 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
@@ -26,7 +26,7 @@ import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
import org.apache.rocketmq.connect.runtime.service.RebalanceService;
import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Plugin.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
similarity index 88%
rename from rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Plugin.java
rename to rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
index 39ca220..66350da 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Plugin.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connect.runtime.utils;
+package org.apache.rocketmq.connect.runtime.controller.isolation;
import io.openmessaging.connector.api.component.Transform;
import io.openmessaging.connector.api.component.connector.Connector;
@@ -34,6 +34,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
+import io.openmessaging.connector.api.errors.ConnectException;
import org.reflections.Configuration;
import org.reflections.Reflections;
import org.reflections.ReflectionsException;
@@ -166,6 +168,23 @@ public class Plugin extends URLClassLoader {
return null;
}
+ public Task newTask(Class<? extends Task> taskClass) {
+ return newPlugin(taskClass);
+ }
+
+ protected static <T> T newPlugin(Class<T> klass) {
+ // KAFKA-8340: The thread classloader is used during static initialization and must be
+ // set to the plugin's classloader during instantiation
+ ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
+ try {
+ return klass.getDeclaredConstructor().newInstance();
+ } catch (Throwable t) {
+ throw new ConnectException("Instantiation error", t);
+ } finally {
+ compareAndSwapLoaders(savedLoader);
+ }
+ }
+
public ClassLoader currentThreadLoader() {
return Thread.currentThread().getContextClassLoader();
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginClassLoader.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
similarity index 97%
rename from rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginClassLoader.java
rename to rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
index 5f8153e..c5419b4 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginClassLoader.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connect.runtime.utils;
+package org.apache.rocketmq.connect.runtime.controller.isolation;
import java.net.URL;
import java.net.URLClassLoader;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
similarity index 99%
rename from rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java
rename to rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
index b2bf34c..4a6492a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connect.runtime.utils;
+package org.apache.rocketmq.connect.runtime.controller.isolation;
import java.io.IOException;
import java.nio.file.DirectoryStream;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginWrapper.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
similarity index 97%
rename from rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginWrapper.java
rename to rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
index 0173430..070f6ce 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginWrapper.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connect.runtime.utils;
+package org.apache.rocketmq.connect.runtime.controller.isolation;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import io.openmessaging.connector.api.component.task.source.SourceConnector;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
index feb2bb9..f056af0 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
@@ -25,7 +25,7 @@ import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
import org.apache.rocketmq.connect.runtime.service.memory.StandaloneRebalanceService;
import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
/**
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfig.java
new file mode 100644
index 0000000..16e1913
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.converter.record;
+
+import io.openmessaging.KeyValue;
+
+/**
+ * converter config
+ */
+public class ConverterConfig {
+
+ public static final String TYPE_CONFIG = "converter.type";
+ private static final String TYPE_DOC = "How this converter will be used.";
+
+ /**
+ * Get the type of converter as defined by the {@link #TYPE_CONFIG} configuration.
+ * @return the converter type; never null
+ */
+ public ConverterType type(KeyValue config) {
+ return ConverterType.withName(config.getString(TYPE_CONFIG));
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterType.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterType.java
new file mode 100644
index 0000000..c7f427b
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterType.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.converter.record;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+
+/**
+ * converter type
+ */
+public enum ConverterType {
+ KEY,
+ VALUE;
+
+ private static final Map<String, ConverterType> NAME_TO_TYPE;
+
+ static {
+ ConverterType[] types = ConverterType.values();
+ Map<String, ConverterType> nameToType = new HashMap<>(types.length);
+ for (ConverterType type : types) {
+ nameToType.put(type.name, type);
+ }
+ NAME_TO_TYPE = Collections.unmodifiableMap(nameToType);
+ }
+
+
+ public static ConverterType withName(String name) {
+ if (name == null) {
+ return null;
+ }
+ return NAME_TO_TYPE.get(name.toLowerCase(Locale.getDefault()));
+ }
+
+ private String name;
+
+ ConverterType() {
+ this.name = this.name().toLowerCase(Locale.ROOT);
+ }
+
+ public String getName() {
+ return name;
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
index 60343e8..bf1584d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
@@ -24,7 +24,7 @@ import java.util.Map;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
/**
* Interface for config manager. Contains connector configs and task configs. All worker in a cluster should keep the
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 1461f5e..a37fea0 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -17,9 +17,8 @@
package org.apache.rocketmq.connect.runtime.service;
-import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.connector.Connector;
-import java.util.ArrayList;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -31,7 +30,6 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
-import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
import org.apache.rocketmq.connect.runtime.converter.ConnAndTaskConfigConverter;
import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
import org.apache.rocketmq.connect.runtime.converter.ListConverter;
@@ -39,7 +37,7 @@ import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index fa5425b..06256ad 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.connect.runtime.service.memory;
-import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.connector.Connector;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
@@ -27,14 +26,12 @@ import org.apache.rocketmq.connect.runtime.service.AbstractConfigManagementServi
import org.apache.rocketmq.connect.runtime.service.StagingMode;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
import org.apache.rocketmq.connect.runtime.store.MemoryBasedKeyValueStore;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
-import java.util.ArrayList;
-import java.util.Set;
import java.util.Map;
/**
* memory config management service impl for standalone
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Base64Util.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Base64Util.java
new file mode 100644
index 0000000..bc9c876
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Base64Util.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Base64;
+
+/**
+ * base64 utils
+ */
+public class Base64Util {
+
+ /**
+ * encode
+ * @param in
+ * @return
+ */
+ public static String base64Encode(byte[] in) {
+ if (in == null) {
+ return null;
+ }
+ return Base64.getEncoder().encodeToString(in);
+ }
+
+ /**
+ * decode
+ * @param in
+ * @return
+ */
+ public static byte[] base64Decode(String in) {
+ if (StringUtils.isEmpty(in)) {
+ return null;
+ }
+ return Base64.getDecoder().decode(in);
+ }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 529d1ba..63e966b 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -46,7 +46,7 @@ import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.utils.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -126,6 +126,7 @@ public class WorkerTest {
connectKeyValue,
new TestPositionManageServiceImpl(),
new JsonConverter(),
+ new JsonConverter(),
producer,
new AtomicReference(WorkerState.STARTED),
connectStatsManager, connectStatsService,
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index f82fcb2..04c4934 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -58,7 +58,7 @@ import org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -213,13 +213,13 @@ public class RestHandlerTest {
RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName1", connectKeyValue));
- WorkerSourceTask workerSourceTask1 = new WorkerSourceTask(new ConnectConfig(),new ConnectorTaskId("testConnectorName1", 1), sourceTask,null, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator);
+ WorkerSourceTask workerSourceTask1 = new WorkerSourceTask(new ConnectConfig(),new ConnectorTaskId("testConnectorName1", 1), sourceTask,null, connectKeyValue, positionManagementServiceImpl, converter, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator);
// create retry operator
RetryWithToleranceOperator retryWithToleranceOperator02 = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
retryWithToleranceOperator02.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName2", connectKeyValue));
- WorkerSourceTask workerSourceTask2 = new WorkerSourceTask(new ConnectConfig(), new ConnectorTaskId("testConnectorName2", 1), sourceTask,null, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator02);
+ WorkerSourceTask workerSourceTask2 = new WorkerSourceTask(new ConnectConfig(), new ConnectorTaskId("testConnectorName2", 1), sourceTask,null, connectKeyValue1, positionManagementServiceImpl, converter, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator02);
workerTasks = new HashSet<Runnable>() {
{
add(workerSourceTask1);
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
index 03b5e01..df974a4 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
@@ -33,7 +33,7 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.utils.TestUtils;
import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;