You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/06/22 06:19:33 UTC
[rocketmq-connect] branch master updated: [ISSUE #153]upgrade rocketmq connect JDBC plug-in (#154)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 8074d49 [ISSUE #153]upgrade rocketmq connect JDBC plug-in (#154)
8074d49 is described below
commit 8074d49d60be757558f15a367047f48a0ea4e395
Author: xiaoyi <su...@163.com>
AuthorDate: Wed Jun 22 14:19:28 2022 +0800
[ISSUE #153]upgrade rocketmq connect JDBC plug-in (#154)
* upgrade rocketmq connect JDBC plug-in #153
* upgrade rocketmq jdbc plugin and runtime api to 0.1.3 #153
* fixed
* add Decimal logical
* optimize transform api
* upgrade api to 0.1.3
* fixed
* fixed
* update api version 0.1.3
* fixed
Co-authored-by: “sunxiaojian” <“sunxiaojian926@163.com”>
---
connectors/rocketmq-connect-jdbc/pom.xml | 267 +++++++++++----------
.../connect/jdbc/config/AbstractConfig.java | 2 +-
.../connect/jdbc/connector/JdbcSinkConnector.java | 28 +--
.../connect/jdbc/connector/JdbcSinkTask.java | 30 +--
.../connect/jdbc/connector/JdbcSourceConfig.java | 4 +-
.../jdbc/connector/JdbcSourceConnector.java | 23 +-
.../connect/jdbc/connector/JdbcSourceTask.java | 55 ++---
.../connect/jdbc/dialect/DatabaseDialect.java | 18 +-
.../jdbc/dialect/DatabaseDialectFactory.java | 1 +
.../jdbc/dialect/PreparedStatementBinder.java | 13 +-
.../jdbc/dialect/impl/GenericDatabaseDialect.java | 114 ++++-----
.../jdbc/dialect/impl/MySqlDatabaseDialect.java | 10 +-
.../jdbc/dialect/impl/OpenMLDBDatabaseDialect.java | 107 ++++++++-
.../connect/jdbc/sink/BufferedRecords.java | 7 +-
.../jdbc/source/TimestampIncrementingCriteria.java | 13 +-
.../jdbc/source/metadata/SchemaMapping.java | 17 +-
.../connect/jdbc/source/querier/BulkQuerier.java | 3 +-
.../connect/jdbc/source/querier/Querier.java | 2 +-
.../querier/TimestampIncrementingQuerier.java | 11 +-
.../rocketmq/connect/jdbc/util/BytesUtil.java | 6 +-
.../rocketmq/connect/jdbc/util/NumericMapping.java | 6 +-
.../connect/jdbc/connector/sink/JdbcSinkTest.java | 18 ++
.../jdbc/connector/sink/OpenMLDBJdbcSinkTest.java | 23 +-
pom.xml | 2 +-
.../runtime/connectorwrapper/TransformChain.java | 3 +-
.../connect/runtime/connectorwrapper/Worker.java | 3 +-
.../runtime/connectorwrapper/WorkerConnector.java | 4 +-
.../runtime/connectorwrapper/WorkerDirectTask.java | 31 ++-
.../runtime/connectorwrapper/WorkerSinkTask.java | 4 +-
.../connectorwrapper/WorkerSinkTaskContext.java | 11 +
.../runtime/connectorwrapper/WorkerSourceTask.java | 17 +-
.../service/ConfigManagementServiceImpl.java | 2 +-
.../memory/MemoryConfigManagementServiceImpl.java | 2 +-
.../connectorwrapper/testimpl/TestConnector.java | 23 +-
.../connectorwrapper/testimpl/TestSourceTask.java | 15 +-
.../connectorwrapper/testimpl/TestTask.java | 18 +-
.../rocketmq/connect/file/FileSinkConnector.java | 17 +-
.../apache/rocketmq/connect/file/FileSinkTask.java | 28 +--
.../rocketmq/connect/file/FileSourceConnector.java | 11 +-
.../rocketmq/connect/file/FileSourceTask.java | 24 +-
.../rocketmq/connect/file/FilterTransform.java | 23 +-
41 files changed, 537 insertions(+), 479 deletions(-)
diff --git a/connectors/rocketmq-connect-jdbc/pom.xml b/connectors/rocketmq-connect-jdbc/pom.xml
index eaf4336..59d7f4b 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -16,9 +16,10 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-jdbc</artifactId>
+ <packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
- <name>connect-jdbc</name>
+ <name>rocketmq-connect-jdbc</name>
<url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jdbc</url>
<licenses>
@@ -36,21 +37,144 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-
<!-- Compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<rocketmq.version>4.5.2</rocketmq.version>
+
+ <!--test jar-->
+ <junit.version>4.13.1</junit.version>
+ <assertj.version>2.6.0</assertj.version>
+ <mockito.version>2.6.3</mockito.version>
+
+ <!--rocket connect api-->
+ <openmessaging-connector.version>0.1.3</openmessaging-connector.version>
+ <openmessaging-api.version>0.3.1-alpha</openmessaging-api.version>
+
+ <!--fast json-->
+ <fastjson.version>1.2.83</fastjson.version>
+
</properties>
+ <dependencies>
+
+ <!---->
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-openmessaging</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+
+ <!--rocketmq connect api-->
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>${openmessaging-connector.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ <version>${openmessaging-api.version}</version>
+ </dependency>
+
+ <!--junit -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>${assertj.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!--fast json version-->
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>${fastjson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.2.9</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.12</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>8.0.16</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.4paradigm.openmldb</groupId>
+ <artifactId>openmldb-native</artifactId>
+ <version>0.5.0-macos</version>
+ </dependency>
+ <dependency>
+ <groupId>com.4paradigm.openmldb</groupId>
+ <artifactId>openmldb-jdbc</artifactId>
+ <version>0.5.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.4paradigm.openmldb</groupId>
+ <artifactId>openmldb-native</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+
<build>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- <filtering>true</filtering>
- </resource>
- </resources>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
@@ -147,6 +271,11 @@
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.rocketmq.connect.redis.connector.RedisSourceConnector</mainClass>
+ </manifest>
+ </archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
@@ -184,126 +313,4 @@
</plugin>
</plugins>
</build>
-
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.13.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- <version>2.6.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>2.6.3</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <version>1.12</version>
- </dependency>
- <dependency>
- <groupId>io.openmessaging</groupId>
- <artifactId>openmessaging-connector</artifactId>
- <version>0.1.2</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>io.openmessaging</groupId>
- <artifactId>openmessaging-api</artifactId>
- <version>0.3.1-alpha</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.83</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.7</version>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>1.2.0</version>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- <version>1.2.9</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>${rocketmq.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-tools</artifactId>
- <version>${rocketmq.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-remoting</artifactId>
- <version>${rocketmq.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-openmessaging</artifactId>
- <version>4.3.2</version>
- </dependency>
-
- <dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- <version>1.2</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.28</version>
- </dependency>
- <dependency>
- <groupId>io.javalin</groupId>
- <artifactId>javalin</artifactId>
- <version>1.3.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.github.shyiko</groupId>
- <artifactId>mysql-binlog-connector-java</artifactId>
- <version>0.20.1</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>druid</artifactId>
- <version>1.1.22</version>
- </dependency>
-
- <dependency>
- <groupId>com.4paradigm.openmldb</groupId>
- <artifactId>openmldb-native</artifactId>
- <version>0.5.0-macos</version>
- </dependency>
- <dependency>
- <groupId>com.4paradigm.openmldb</groupId>
- <artifactId>openmldb-jdbc</artifactId>
- <version>0.5.0</version>
- <exclusions>
- <exclusion>
- <groupId>com.4paradigm.openmldb</groupId>
- <artifactId>openmldb-native</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
</project>
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
index e4fd758..9d53fdc 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
@@ -129,7 +129,7 @@ public abstract class AbstractConfig {
}
protected Boolean getBoolean(KeyValue config, String key, Boolean defaultValue) {
- return config.containsKey(key) ? Boolean.getBoolean(config.getString(key)) : defaultValue;
+ return config.containsKey(key) ? Boolean.parseBoolean(config.getString(key)) : defaultValue;
}
}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
index aaf8560..4cfa328 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
@@ -21,6 +21,7 @@ import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.connector.ConnectorContext;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,14 +32,13 @@ import java.util.List;
/**
* jdbc sink connector
*/
-public class JdbcSinkConnector extends SinkConnector {
+public class JdbcSinkConnector extends SinkConnector{
private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class);
private KeyValue connectConfig;
- private ConnectorContext context;
@Override
- public void start(ConnectorContext context) {
- this.context = context;
+ public void start(KeyValue config) {
+ this.connectConfig = config;
}
/**
@@ -52,26 +52,9 @@ public class JdbcSinkConnector extends SinkConnector {
// do validate config
}
- /**
- * Init the component
- *
- * @param config
- */
- @Override
- public void init(KeyValue config) {
- this.connectConfig = config;
- }
-
@Override
public void stop() {
- }
-
- @Override
- public void pause() {
- }
-
- @Override
- public void resume() {
+ this.connectConfig = null;
}
/**
@@ -95,5 +78,4 @@ public class JdbcSinkConnector extends SinkConnector {
public Class<? extends Task> taskClass() {
return JdbcSinkTask.class;
}
-
}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
index d69c80a..35656eb 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
@@ -69,7 +69,7 @@ public class JdbcSinkTask extends SinkTask {
SQLException sqlAllMessagesException = getAllMessagesException(sqle);
if (remainingRetries > 0) {
updater.closeQuietly();
- init(originalConfig);
+ start(originalConfig);
remainingRetries--;
throw new RetriableException(sqlAllMessagesException);
}
@@ -87,30 +87,13 @@ public class JdbcSinkTask extends SinkTask {
return sqlAllMessagesException;
}
-
- @Override
- public void start(SinkTaskContext context) {
- this.context = context;
- }
-
/**
- * Should invoke before start the connector.
- *
- * @param config
- * @return error message
- */
- @Override
- public void validate(KeyValue config) {
- // to do nothing
- }
-
- /**
- * Init the component
+ * Start the component
*
* @param keyValue
*/
@Override
- public void init(KeyValue keyValue) {
+ public void start(KeyValue keyValue) {
originalConfig = keyValue;
config = new JdbcSinkConfig(keyValue);
remainingRetries = config.getMaxRetries();
@@ -142,12 +125,5 @@ public class JdbcSinkTask extends SinkTask {
}
}
- @Override
- public void pause() {
- }
-
- @Override
- public void resume() {
- }
}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
index 20f3aa7..99178bf 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
@@ -232,8 +232,8 @@ public class JdbcSourceConfig extends AbstractConfig {
this.timestampColumnNames = getList(config, TIMESTAMP_COLUMN_NAME_CONFIG);
timestampDelayIntervalMs = config.getLong(TIMESTAMP_DELAY_INTERVAL_MS_CONFIG);
// this.timestampInitial=config.getLong(TIMESTAMP_INITIAL_CONFIG,TIMESTAMP_INITIAL_DEFAULT);
- if (config.containsKey(TIMESTAMP_INITIAL_CONFIG)){
- this.timestampInitial=config.getLong(TIMESTAMP_INITIAL_CONFIG);
+ if (config.containsKey(TIMESTAMP_INITIAL_CONFIG)) {
+ this.timestampInitial = config.getLong(TIMESTAMP_INITIAL_CONFIG);
}
this.tableWhitelist = new HashSet<>(getList(config, TABLE_WHITELIST_CONFIG));
this.tableBlacklist = new HashSet<>(getList(config, TABLE_BLACKLIST_CONFIG));
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index c48da71..4ad9e4f 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -30,6 +30,9 @@ import org.slf4j.LoggerFactory;
import io.openmessaging.KeyValue;
+/**
+ * jdbc source connector
+ */
public class JdbcSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(JdbcSourceConnector.class);
private JdbcSourceConfig jdbcSourceConfig;
@@ -49,28 +52,20 @@ public class JdbcSourceConnector extends SourceConnector {
}
/**
- * Init the component
+ * Start the component
*
- * @param config
+ * @param config component context
*/
@Override
- public void init(KeyValue config) {
- if (config.containsKey("connect-topicname")) {
- config.put("connect-topicname", "");
- }
+ public void start(KeyValue config) {
originalConfig = config;
}
- @Override
- public void stop() {
- }
-
- @Override
- public void pause() {
- }
@Override
- public void resume() {
+ public void stop() {
+ this.originalConfig = null;
+ this.jdbcSourceConfig = null;
}
/**
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index 80a7117..192f676 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.connect.jdbc.connector;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.source.SourceTask;
-import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
@@ -157,13 +156,25 @@ public class JdbcSourceTask extends SourceTask {
/**
* start jdbc task
- *
- * @param context
*/
@Override
- public void start(SourceTaskContext context) {
+ public void start(KeyValue props) {
+ // init config
+ config = new JdbcSourceTaskConfig(props);
+ final String dialectName = config.getDialectName();
+ final String url = config.getConnectionDbUrl();
+ if (dialectName != null && !dialectName.trim().isEmpty()) {
+ dialect = DatabaseDialectFactory.create(dialectName, config);
+ } else {
+ dialect = DatabaseDialectFactory.findDialectFor(url, config);
+ }
+ final int maxConnAttempts = config.getAttempts();
+ final long retryBackoff = config.getBackoffMs();
+ cachedConnectionProvider = connectionProvider(maxConnAttempts, retryBackoff);
+ log.info("Using JDBC dialect {}", dialect.name());
+
// compute table offset
- Map<String, Map<String, Object>> offsetValues = SourceOffsetCompute.initOffset(config, context, dialect, cachedConnectionProvider);
+ Map<String, Map<String, Object>> offsetValues = SourceOffsetCompute.initOffset(config, sourceTaskContext, dialect, cachedConnectionProvider);
for (String tableOrQuery : offsetValues.keySet()) {
this.buildAndAddQuerier(
JdbcSourceConfig.TableLoadMode.findTableLoadModeByName(this.config.getMode()),
@@ -258,30 +269,6 @@ public class JdbcSourceTask extends SourceTask {
}
}
- /**
- * Init the component
- *
- * @param props
- */
- @Override
- public void init(KeyValue props) {
- try {
- config = new JdbcSourceTaskConfig(props);
- final String dialectName = config.getDialectName();
- final String url = config.getConnectionDbUrl();
- if (dialectName != null && !dialectName.trim().isEmpty()) {
- dialect = DatabaseDialectFactory.create(dialectName, config);
- } else {
- dialect = DatabaseDialectFactory.findDialectFor(url, config);
- }
- final int maxConnAttempts = config.getAttempts();
- final long retryBackoff = config.getBackoffMs();
- cachedConnectionProvider = connectionProvider(maxConnAttempts, retryBackoff);
- log.info("Using JDBC dialect {}", dialect.name());
- } catch (Exception e) {
- log.error("Cannot start Jdbc Source Task because of configuration error{}", e);
- }
- }
protected CachedConnectionProvider connectionProvider(int maxConnAttempts, long retryBackoff) {
return new CachedConnectionProvider(dialect, maxConnAttempts, retryBackoff) {
@@ -321,14 +308,4 @@ public class JdbcSourceTask extends SourceTask {
}
}
}
-
- @Override
- public void pause() {
- // do nothing
- }
-
- @Override
- public void resume() {
- // do nothing
- }
}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialect.java
index 61c9d6e..14728af 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialect.java
@@ -18,17 +18,18 @@ package org.apache.rocketmq.connect.jdbc.dialect;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
+import org.apache.rocketmq.connect.jdbc.dialect.provider.ConnectionProvider;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnDefinition;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
import org.apache.rocketmq.connect.jdbc.schema.table.TableDefinition;
import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
-import org.apache.rocketmq.connect.jdbc.source.metadata.ColumnMapping;
import org.apache.rocketmq.connect.jdbc.sink.metadata.FieldsMetadata;
import org.apache.rocketmq.connect.jdbc.sink.metadata.SchemaPair;
import org.apache.rocketmq.connect.jdbc.sink.metadata.SinkRecordField;
-import org.apache.rocketmq.connect.jdbc.dialect.provider.ConnectionProvider;
import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingCriteria;
+import org.apache.rocketmq.connect.jdbc.source.metadata.ColumnMapping;
import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
import org.apache.rocketmq.connect.jdbc.util.IdentifierRules;
@@ -56,6 +57,15 @@ public interface DatabaseDialect extends ConnectionProvider {
*/
String name();
+ /**
+ * get dialect class
+ *
+ * @return
+ */
+ default Class getDialectClass() {
+ return this.getClass();
+ }
+
/**
* create jdbc prepared statement
*
@@ -149,11 +159,9 @@ public interface DatabaseDialect extends ConnectionProvider {
* add field to schema
*
* @param column
- * @param schema
- * @param index
* @return
*/
- String addFieldToSchema(ColumnDefinition column, Schema schema, int index);
+ String addFieldToSchema(ColumnDefinition column, SchemaBuilder schemaBuilder);
/**
* Apply the supplied DDL statements using the given connection. This gives the dialect the
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialectFactory.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialectFactory.java
index 0938684..b93d580 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialectFactory.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/DatabaseDialectFactory.java
@@ -137,6 +137,7 @@ public class DatabaseDialectFactory {
return provider.create(config);
}
}
+
throw new ConnectException(
"Unable to find dialect with name '" + dialectName + "' in the available dialects: "
+ dialectNames
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
index dc9c0ae..75155c4 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
@@ -18,9 +18,11 @@ package org.apache.rocketmq.connect.jdbc.dialect;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.serializer.SerializerFeature;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.Struct;
import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
import org.apache.rocketmq.connect.jdbc.sink.metadata.FieldsMetadata;
@@ -98,10 +100,12 @@ public class PreparedStatementBinder implements DatabaseDialect.StatementBinder
}
break;
case RECORD_VALUE: {
- Object[] data = JSONArray.parseArray(JSON.toJSONString(record.getData())).stream().toArray();
+ String jsonData = JSON.toJSONString(record.getData(), SerializerFeature.DisableCircularReferenceDetect);
+ Struct struct = JSON.parseObject(jsonData, Struct.class);
+ struct.setValues(JSON.parseObject(jsonData).getJSONArray("values").toArray());
for (String fieldName : fieldsMetadata.keyFieldNames) {
final Field field = schemaPair.schema.getField(fieldName);
- bindField(index++, field.getSchema(), data[field.getIndex()], fieldName);
+ bindField(index++, field.getSchema(), struct.get(fieldName), fieldName);
}
}
break;
@@ -115,9 +119,12 @@ public class PreparedStatementBinder implements DatabaseDialect.StatementBinder
ConnectRecord record,
int index
) throws SQLException {
+ String jsonData = JSON.toJSONString(record.getData(), SerializerFeature.DisableCircularReferenceDetect);
+ Struct struct = JSON.parseObject(jsonData, Struct.class);
+ struct.setValues(JSON.parseObject(jsonData).getJSONArray("values").toArray());
for (final String fieldName : fieldsMetadata.nonKeyFieldNames) {
final Field field = record.getSchema().getField(fieldName);
- bindField(index++, field.getSchema(), ((Object[]) record.getData())[field.getIndex()], fieldName);
+ bindField(index++, field.getSchema(), struct.get(fieldName), fieldName);
}
return index;
}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
index d9e8dee..089d10d 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
@@ -16,11 +16,12 @@
*/
package org.apache.rocketmq.connect.jdbc.dialect.impl;
-import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.FieldType;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.SchemaBuilder;
-import lombok.SneakyThrows;
+import io.openmessaging.connector.api.data.logical.Date;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.connector.api.data.logical.Time;
import org.apache.rocketmq.connect.jdbc.config.AbstractConfig;
import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConfig;
@@ -28,21 +29,18 @@ import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialectFactory;
import org.apache.rocketmq.connect.jdbc.dialect.DropOptions;
import org.apache.rocketmq.connect.jdbc.dialect.PreparedStatementBinder;
+import org.apache.rocketmq.connect.jdbc.dialect.provider.DatabaseDialectProvider;
+import org.apache.rocketmq.connect.jdbc.dialect.provider.JdbcUrlInfo;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnDefAdjuster;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnDefinition;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
import org.apache.rocketmq.connect.jdbc.schema.table.TableDefinition;
import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
-import org.apache.rocketmq.connect.jdbc.source.metadata.ColumnMapping;
import org.apache.rocketmq.connect.jdbc.sink.metadata.FieldsMetadata;
import org.apache.rocketmq.connect.jdbc.sink.metadata.SchemaPair;
import org.apache.rocketmq.connect.jdbc.sink.metadata.SinkRecordField;
-import org.apache.rocketmq.connect.jdbc.dialect.provider.DatabaseDialectProvider;
-import org.apache.rocketmq.connect.jdbc.dialect.provider.JdbcUrlInfo;
-import org.apache.rocketmq.connect.jdbc.schema.column.parser.DateColumnParser;
-import org.apache.rocketmq.connect.jdbc.schema.column.parser.TimeColumnParser;
-import org.apache.rocketmq.connect.jdbc.schema.column.parser.TimestampColumnParser;
import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingCriteria;
+import org.apache.rocketmq.connect.jdbc.source.metadata.ColumnMapping;
import org.apache.rocketmq.connect.jdbc.util.DateTimeUtils;
import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
import org.apache.rocketmq.connect.jdbc.util.IdentifierRules;
@@ -54,6 +52,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.math.BigDecimal;
import java.net.URL;
import java.nio.ByteBuffer;
import java.sql.Blob;
@@ -125,7 +124,7 @@ public class GenericDatabaseDialect implements DatabaseDialect {
private static final Logger log = LoggerFactory.getLogger(GenericDatabaseDialect.class);
-// @Deprecated
+ // @Deprecated
// protected final Logger log = LoggerFactory.getLogger(GenericDatabaseDialect.class);
protected AbstractConfig config;
/**
@@ -190,7 +189,6 @@ public class GenericDatabaseDialect implements DatabaseDialect {
* @return
* @throws SQLException
*/
- @SneakyThrows
@Override
public Connection getConnection() throws SQLException {
// These config names are the same for both source and sink configs ...
@@ -922,10 +920,9 @@ public class GenericDatabaseDialect implements DatabaseDialect {
@Override
public String addFieldToSchema(
ColumnDefinition columnDefn,
- Schema schema,
- int index
+ SchemaBuilder schemaBuilder
) {
- return addFieldToSchema(columnDefn, schema, fieldNameFor(columnDefn), index, columnDefn.type(),
+ return addFieldToSchema(columnDefn, schemaBuilder, fieldNameFor(columnDefn), columnDefn.type(),
columnDefn.isOptional()
);
}
@@ -936,7 +933,6 @@ public class GenericDatabaseDialect implements DatabaseDialect {
* @param columnDefn
* @param builder
* @param fieldName
- * @param index
* @param sqlType
* @param optional
* @return
@@ -944,9 +940,8 @@ public class GenericDatabaseDialect implements DatabaseDialect {
@SuppressWarnings("fallthrough")
protected String addFieldToSchema(
final ColumnDefinition columnDefn,
- final Schema builder,
+ final SchemaBuilder builder,
final String fieldName,
- final int index,
final int sqlType,
final boolean optional
) {
@@ -958,50 +953,50 @@ public class GenericDatabaseDialect implements DatabaseDialect {
return null;
}
case Types.BOOLEAN: {
- builder.addField(new Field(index, fieldName, SchemaBuilder.bool().build()));
+ builder.field(fieldName, SchemaBuilder.bool().build());
break;
}
// ints <= 8 bits
case Types.BIT: {
- builder.addField(new Field(index, fieldName, SchemaBuilder.int8().build()));
+ builder.field(fieldName, SchemaBuilder.int8().build());
break;
}
case Types.TINYINT: {
if (columnDefn.isSignedNumber()) {
- builder.addField(new Field(index, fieldName, SchemaBuilder.int8().build()));
+ builder.field(fieldName, SchemaBuilder.int8().build());
} else {
- builder.addField(new Field(index, fieldName, SchemaBuilder.int32().build()));
+ builder.field(fieldName, SchemaBuilder.int32().build());
}
break;
}
// 16 bit ints
case Types.SMALLINT: {
- builder.addField(new Field(index, fieldName, SchemaBuilder.int32().build()));
+ builder.field(fieldName, SchemaBuilder.int32().build());
break;
}
// 32 bit ints
case Types.INTEGER: {
if (columnDefn.isSignedNumber()) {
- builder.addField(new Field(index, fieldName, SchemaBuilder.int32().build()));
+ builder.field(fieldName, SchemaBuilder.int32().build());
} else {
- builder.addField(new Field(index, fieldName, SchemaBuilder.int64().build()));
+ builder.field(fieldName, SchemaBuilder.int64().build());
}
break;
}
// 64 bit ints
case Types.BIGINT: {
- builder.addField(new Field(index, fieldName, SchemaBuilder.int64().build()));
+ builder.field(fieldName, SchemaBuilder.int64().build());
break;
}
// REAL is a single precision floating point value, i.e. a Java float
case Types.REAL: {
- builder.addField(new Field(index, fieldName, SchemaBuilder.float32().build()));
+ builder.field(fieldName, SchemaBuilder.float32().build());
break;
}
@@ -1009,8 +1004,15 @@ public class GenericDatabaseDialect implements DatabaseDialect {
// for single precision
case Types.FLOAT:
case Types.DOUBLE:
+ builder.field(fieldName, SchemaBuilder.float64().build());
+ break;
case Types.DECIMAL:
- builder.addField(new Field(index, fieldName, SchemaBuilder.float64().build()));
+ scale = decimalScale(columnDefn);
+ SchemaBuilder fieldBuilder = Decimal.builder(scale);
+ if (optional) {
+ fieldBuilder.optional();
+ }
+ builder.field(fieldName, fieldBuilder.build());
break;
/**
@@ -1020,17 +1022,17 @@ public class GenericDatabaseDialect implements DatabaseDialect {
if (mapNumerics == NumericMapping.PRECISION_ONLY) {
log.debug("NUMERIC with precision: '{}' and scale: '{}'", precision, scale);
if (scale == 0 && precision <= MAX_INTEGER_TYPE_PRECISION) { // integer
- builder.addField(new Field(index, fieldName, integerSchema(optional, precision)));
+ builder.field(fieldName, integerSchema(optional, precision));
break;
}
} else if (mapNumerics == NumericMapping.BEST_FIT) {
log.debug("NUMERIC with precision: '{}' and scale: '{}'", precision, scale);
if (precision <= MAX_INTEGER_TYPE_PRECISION) { // fits in primitive data types.
if (scale < 1 && scale >= NUMERIC_TYPE_SCALE_LOW) { // integer
- builder.addField(new Field(index, fieldName, integerSchema(optional, precision)));
+ builder.field(fieldName, integerSchema(optional, precision));
break;
} else if (scale > 0) { // floating point - use double in all cases
- builder.addField(new Field(index, fieldName, SchemaBuilder.float64().build()));
+ builder.field(fieldName, SchemaBuilder.float64().build());
break;
}
}
@@ -1038,11 +1040,11 @@ public class GenericDatabaseDialect implements DatabaseDialect {
log.debug("NUMERIC with precision: '{}' and scale: '{}'", precision, scale);
if (scale < 1 && scale >= NUMERIC_TYPE_SCALE_LOW) { // integer
if (precision <= MAX_INTEGER_TYPE_PRECISION) { // fits in primitive data types.
- builder.addField(new Field(index, fieldName, integerSchema(optional, precision)));
+ builder.field(fieldName, integerSchema(optional, precision));
break;
}
} else if (scale > 0) { // floating point - use double in all cases
- builder.addField(new Field(index, fieldName, SchemaBuilder.float64().build()));
+ builder.field(fieldName, SchemaBuilder.float64().build());
break;
}
}
@@ -1059,7 +1061,7 @@ public class GenericDatabaseDialect implements DatabaseDialect {
case Types.SQLXML: {
// Some of these types will have fixed size, but we drop this from the schema conversion
// since only fixed byte arrays can have a fixed size
- builder.addField(new Field(index, fieldName, SchemaBuilder.string().build()));
+ builder.field(fieldName, SchemaBuilder.string().build());
break;
}
@@ -1069,28 +1071,28 @@ public class GenericDatabaseDialect implements DatabaseDialect {
case Types.BLOB:
case Types.VARBINARY:
case Types.LONGVARBINARY: {
- builder.addField(new Field(index, fieldName, SchemaBuilder.bytes().build()));
+ builder.field(fieldName, SchemaBuilder.bytes().build());
break;
}
// Date is day + moth + year
case Types.DATE: {
- SchemaBuilder dateSchemaBuilder = DateColumnParser.builder();
- builder.addField(new Field(index, fieldName, dateSchemaBuilder.build()));
+ SchemaBuilder dateSchemaBuilder = Date.builder();
+ builder.field(fieldName, dateSchemaBuilder.build());
break;
}
// Time is a time of day -- hour, minute, seconds, nanoseconds
case Types.TIME: {
- SchemaBuilder timeSchemaBuilder = TimestampColumnParser.builder();
- builder.addField(new Field(index, fieldName, timeSchemaBuilder.build()));
+ SchemaBuilder timeSchemaBuilder = Time.builder();
+ builder.field(fieldName, timeSchemaBuilder.build());
break;
}
// Timestamp is a date + time
case Types.TIMESTAMP: {
- SchemaBuilder tsSchemaBuilder = TimestampColumnParser.builder();
- builder.addField(new Field(index, fieldName, tsSchemaBuilder.build()));
+ SchemaBuilder tsSchemaBuilder = io.openmessaging.connector.api.data.logical.Timestamp.builder();
+ builder.field(fieldName, tsSchemaBuilder.build());
break;
}
@@ -1486,7 +1488,7 @@ public class GenericDatabaseDialect implements DatabaseDialect {
}
@Override
- public final String buildDeleteStatement(
+ public String buildDeleteStatement(
TableId table,
Collection<ColumnId> keyColumns
) {
@@ -1552,7 +1554,7 @@ public class GenericDatabaseDialect implements DatabaseDialect {
statement.setObject(index, null);
}
} else {
- boolean bound = maybeBindLogical(statement, index, schema, value, null);
+ boolean bound = maybeBindLogical(statement, index, schema, value);
if (!bound) {
bound = maybeBindPrimitive(statement, index, schema, value);
}
@@ -1566,12 +1568,14 @@ public class GenericDatabaseDialect implements DatabaseDialect {
PreparedStatement statement,
int index,
Schema schema,
- Object value,
- ColumnDefinition colDef
+ Object value
) throws SQLException {
if (schema.getName() != null) {
switch (schema.getName()) {
- case DateColumnParser.LOGICAL_NAME:
+ case Decimal.LOGICAL_NAME:
+ statement.setBigDecimal(index, (BigDecimal) value);
+ return true;
+ case Date.LOGICAL_NAME:
java.sql.Date date;
if (value instanceof java.util.Date) {
date = new java.sql.Date(((java.util.Date) value).getTime());
@@ -1583,7 +1587,7 @@ public class GenericDatabaseDialect implements DatabaseDialect {
DateTimeUtils.getTimeZoneCalendar(timeZone)
);
return true;
- case TimeColumnParser.LOGICAL_NAME:
+ case Time.LOGICAL_NAME:
java.sql.Time time;
if (value instanceof java.util.Date) {
time = new java.sql.Time(((java.util.Date) value).getTime());
@@ -1595,12 +1599,12 @@ public class GenericDatabaseDialect implements DatabaseDialect {
DateTimeUtils.getTimeZoneCalendar(timeZone)
);
return true;
- case TimestampColumnParser.LOGICAL_NAME:
- java.sql.Timestamp timestamp;
+ case io.openmessaging.connector.api.data.logical.Timestamp.LOGICAL_NAME:
+ Timestamp timestamp;
if (value instanceof java.util.Date) {
- timestamp = new java.sql.Timestamp(((java.util.Date) value).getTime());
+ timestamp = new Timestamp(((java.util.Date) value).getTime());
} else {
- timestamp = new java.sql.Timestamp((long) value);
+ timestamp = new Timestamp((long) value);
}
statement.setTimestamp(
index, timestamp,
@@ -1674,22 +1678,22 @@ public class GenericDatabaseDialect implements DatabaseDialect {
) throws SQLException {
switch (schema.getFieldType()) {
case INT8:
- statement.setByte(index, (Byte) value);
+ statement.setByte(index, Byte.parseByte(value.toString()));
break;
case INT32:
- statement.setInt(index, (Integer) value);
+ statement.setInt(index, Integer.parseInt(value.toString()));
break;
case INT64:
- statement.setLong(index, (Long) value);
+ statement.setLong(index, Long.parseLong(value.toString()));
break;
case FLOAT32:
- statement.setFloat(index, (Float) value);
+ statement.setFloat(index, Float.parseFloat(value.toString()));
break;
case FLOAT64:
- statement.setDouble(index, (Double) value);
+ statement.setDouble(index, Double.parseDouble(value.toString()));
break;
case BOOLEAN:
- statement.setBoolean(index, (Boolean) value);
+ statement.setBoolean(index, Boolean.parseBoolean(value.toString()));
break;
case STRING:
statement.setString(index, (String) value);
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/MySqlDatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/MySqlDatabaseDialect.java
index 0eb8724..ceb3fd0 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/MySqlDatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/MySqlDatabaseDialect.java
@@ -16,15 +16,14 @@
*/
package org.apache.rocketmq.connect.jdbc.dialect.impl;
-import lombok.SneakyThrows;
import org.apache.rocketmq.connect.jdbc.config.AbstractConfig;
import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
-import org.apache.rocketmq.connect.jdbc.sink.metadata.SinkRecordField;
import org.apache.rocketmq.connect.jdbc.dialect.provider.DatabaseDialectProvider;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
+import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
+import org.apache.rocketmq.connect.jdbc.sink.metadata.SinkRecordField;
import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
import org.apache.rocketmq.connect.jdbc.util.IdentifierRules;
-import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,14 +37,13 @@ import java.util.Collection;
*/
public class MySqlDatabaseDialect extends GenericDatabaseDialect {
- private final Logger log = LoggerFactory.getLogger(MySqlDatabaseDialect.class);
+ private final static Logger log = LoggerFactory.getLogger(MySqlDatabaseDialect.class);
/**
* The provider for {@link MySqlDatabaseDialect}.
*/
public static class Provider extends DatabaseDialectProvider {
- @SneakyThrows
- public Provider() {
+ public Provider() throws ClassNotFoundException {
super(MySqlDatabaseDialect.class.getSimpleName(), "mysql");
Class.forName("com.mysql.cj.jdbc.Driver");
}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/OpenMLDBDatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/OpenMLDBDatabaseDialect.java
index 0c70626..63198b2 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/OpenMLDBDatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/OpenMLDBDatabaseDialect.java
@@ -16,14 +16,25 @@
*/
package org.apache.rocketmq.connect.jdbc.dialect.impl;
-import lombok.SneakyThrows;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.logical.Date;
+import io.openmessaging.connector.api.data.logical.Time;
+import io.openmessaging.connector.api.data.logical.Timestamp;
import org.apache.rocketmq.connect.jdbc.config.AbstractConfig;
import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
+import org.apache.rocketmq.connect.jdbc.dialect.DropOptions;
import org.apache.rocketmq.connect.jdbc.dialect.provider.DatabaseDialectProvider;
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
+import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
+import org.apache.rocketmq.connect.jdbc.sink.metadata.SinkRecordField;
+import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
import org.apache.rocketmq.connect.jdbc.util.IdentifierRules;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
+import java.util.List;
+
/**
* openmldb database dialect
@@ -36,8 +47,7 @@ public class OpenMLDBDatabaseDialect extends GenericDatabaseDialect {
* The provider for {@link OpenMLDBDatabaseDialect}.
*/
public static class Provider extends DatabaseDialectProvider {
- @SneakyThrows
- public Provider() {
+ public Provider() throws ClassNotFoundException {
super(OpenMLDBDatabaseDialect.class.getSimpleName(), "openmldb");
Class.forName("com._4paradigm.openmldb.jdbc.SQLDriver");
}
@@ -58,6 +68,97 @@ public class OpenMLDBDatabaseDialect extends GenericDatabaseDialect {
}
+ @Override
+ protected String currentTimestampDatabaseQuery() {
+ return null;
+ }
+
+ @Override
+ protected String getSqlType(SinkRecordField field) {
+ if (field.schemaName() != null) {
+ String schema = field.schemaName();
+ switch (schema) {
+ case Timestamp.LOGICAL_NAME:
+ return "TIMESTAMP";
+
+ case Date.LOGICAL_NAME:
+ case Time.LOGICAL_NAME:
+ return "DATE";
+ }
+ }
+
+ switch (field.schemaType()) {
+ case INT32:
+ return "INT";
+ case INT64:
+ return "BIGINT";
+ case FLOAT32:
+ return "FLOAT";
+ case FLOAT64:
+ return "DOUBLE";
+ case BOOLEAN:
+ return "BOOL";
+ case STRING:
+ return "VARCHAR";
+ default:
+ return super.getSqlType(field);
+ }
+ }
+
+ @Override
+ public String buildCreateTableStatement(TableId table, Collection<SinkRecordField> fields) {
+ List<String> pkFieldNames = this.extractPrimaryKeyFieldNames(fields);
+ if (!pkFieldNames.isEmpty()) {
+ throw new UnsupportedOperationException("pk is unsupported in openmldb");
+ } else {
+ return super.buildCreateTableStatement(table, fields);
+ }
+ }
+
+ @Override
+ protected void writeColumnSpec(ExpressionBuilder builder, SinkRecordField f) {
+ builder.appendColumnName(f.name());
+ builder.append(" ");
+ String sqlType = this.getSqlType(f);
+ builder.append(sqlType);
+ if (f.defaultValue() != null) {
+ builder.append(" DEFAULT ");
+ this.formatColumnValue(builder, f.schemaType(), f.defaultValue());
+ } else if (!this.isColumnOptional(f)) {
+ builder.append(" NOT NULL");
+ }
+
+ }
+
+ @Override
+ public String buildDropTableStatement(TableId table, DropOptions options) {
+ ExpressionBuilder builder = this.expressionBuilder();
+ builder.append("DROP TABLE ");
+ builder.append(table);
+ return builder.toString();
+ }
+
+ @Override
+ public List<String> buildAlterTable(TableId table, Collection<SinkRecordField> fields) {
+ throw new UnsupportedOperationException("alter is unsupported");
+ }
+
+ @Override
+ public String buildUpdateStatement(TableId table, Collection<ColumnId> keyColumns, Collection<ColumnId> nonKeyColumns) {
+ throw new UnsupportedOperationException("update is unsupported");
+ }
+
+ @Override
+ public String buildDeleteStatement(TableId table, Collection<ColumnId> keyColumns) {
+ throw new UnsupportedOperationException("delete is unsupported");
+ }
+
+ @Override
+ protected Integer getSqlTypeForSchema(Schema schema) {
+ return 0;
+ }
+
+
@Override
protected String sanitizedUrl(String url) {
// MySQL can also have "username:password@" at the beginning of the host list and
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
index 87773fb..3ad19be 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
@@ -22,12 +22,11 @@ import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
import org.apache.rocketmq.connect.jdbc.dialect.impl.GenericDatabaseDialect;
-import org.apache.rocketmq.connect.jdbc.dialect.impl.OpenMLDBDatabaseDialect;
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
import org.apache.rocketmq.connect.jdbc.schema.db.DbStructure;
+import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
import org.apache.rocketmq.connect.jdbc.sink.metadata.FieldsMetadata;
import org.apache.rocketmq.connect.jdbc.sink.metadata.SchemaPair;
-import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
-import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -181,7 +180,7 @@ public class BufferedRecords {
);
if (totalUpdateCount.filter(total -> total != expectedCount).isPresent()
&& config.getInsertMode() == JdbcSinkConfig.InsertMode.INSERT) {
- if (dbDialect.name().equals(GenericDatabaseDialect.DialectName.generateDialectName(OpenMLDBDatabaseDialect.class)) && totalUpdateCount.get() == 0) {
+ if (dbDialect.name().equals(GenericDatabaseDialect.DialectName.generateDialectName(dbDialect.getDialectClass())) && totalUpdateCount.get() == 0) {
// openMLDB execute success result 0; do nothing
} else {
throw new ConnectException(
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java
index fcd612b..8b8eaa9 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java
@@ -18,9 +18,10 @@ package org.apache.rocketmq.connect.jdbc.source;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.Struct;
import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.rocketmq.connect.jdbc.source.offset.TimestampIncrementingOffset;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
+import org.apache.rocketmq.connect.jdbc.source.offset.TimestampIncrementingOffset;
import org.apache.rocketmq.connect.jdbc.util.DateTimeUtils;
import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
import org.slf4j.Logger;
@@ -180,7 +181,7 @@ public class TimestampIncrementingCriteria {
*/
public TimestampIncrementingOffset extractValues(
Schema schema,
- Object[] record,
+ Struct record,
TimestampIncrementingOffset previousOffset
) {
Timestamp extractedTimestamp = null;
@@ -211,11 +212,11 @@ public class TimestampIncrementingCriteria {
*/
protected Timestamp extractOffsetTimestamp(
Schema schema,
- Object[] record
+ Struct record
) {
for (ColumnId timestampColumn : timestampColumns) {
Field field = schema.getField(timestampColumn.name());
- Timestamp ts = (Timestamp) record[field.getIndex()];
+ Timestamp ts = (Timestamp) record.get(field);
if (ts != null) {
return ts;
}
@@ -232,7 +233,7 @@ public class TimestampIncrementingCriteria {
*/
protected Long extractOffsetIncrementedId(
Schema schema,
- Object[] record
+ Struct record
) {
final Long extractedId;
final Field field = schema.getField(incrementingColumn.name());
@@ -242,7 +243,7 @@ public class TimestampIncrementingCriteria {
}
final Schema incrementingColumnSchema = field.getSchema();
- final Object incrementingColumnValue = record[field.getIndex()];
+ final Object incrementingColumnValue = record.get(field);
if (incrementingColumnValue == null) {
throw new ConnectException(
"Null value for incrementing column of type: " + incrementingColumnSchema.getFieldType());
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/metadata/SchemaMapping.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/metadata/SchemaMapping.java
index e54cdc8..9affae6 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/metadata/SchemaMapping.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/metadata/SchemaMapping.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.jdbc.source.metadata;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnDefinition;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
@@ -54,21 +55,21 @@ public final class SchemaMapping {
// describe columns
Map<ColumnId, ColumnDefinition> colDefins = dialect.describeColumns(conn, tableId, metadata);
Map<String, DatabaseDialect.ColumnConverter> colConvertersByFieldName = new LinkedHashMap<>();
- Schema builder = SchemaBuilder.struct().name(schemaName).build();
- builder.setFields(new ArrayList<>());
+ SchemaBuilder builder = SchemaBuilder.struct().name(schemaName);
int columnNumber = 0;
for (ColumnDefinition colDefn : colDefins.values()) {
- String fieldName = dialect.addFieldToSchema(colDefn, builder, columnNumber);
+ ++columnNumber;
+ String fieldName = dialect.addFieldToSchema(colDefn, builder);
if (fieldName == null) {
continue;
}
- Field field = builder.getField(fieldName);
- ColumnMapping mapping = new ColumnMapping(colDefn, ++columnNumber, field);
+ Field field = builder.field(fieldName);
+ ColumnMapping mapping = new ColumnMapping(colDefn, columnNumber, field);
DatabaseDialect.ColumnConverter converter = dialect.createColumnConverter(mapping);
colConvertersByFieldName.put(fieldName, converter);
}
- return new SchemaMapping(builder, colConvertersByFieldName);
+ return new SchemaMapping(builder.build(), colConvertersByFieldName);
}
private final Schema schema;
@@ -146,11 +147,11 @@ public final class SchemaMapping {
* @throws IOException
*/
public void setField(
- Object[] payload,
+ Struct payload,
ResultSet resultSet
) throws SQLException, IOException {
Object value = this.converter.convert(resultSet);
- payload[field.getIndex()] = value;
+ payload.put(field, value);
}
@Override
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/BulkQuerier.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/BulkQuerier.java
index fff48a2..9afc6ed 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/BulkQuerier.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/BulkQuerier.java
@@ -20,6 +20,7 @@ import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.Struct;
import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.rocketmq.connect.jdbc.common.JdbcSourceConfigConstants;
import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
@@ -85,7 +86,7 @@ public class BulkQuerier extends Querier {
@Override
public ConnectRecord extractRecord() throws SQLException {
Schema schema = schemaMapping.schema();
- Object[] payload = new Object[schema.getFields().size()];
+ Struct payload = new Struct(schema);
for (SchemaMapping.FieldSetter setter : schemaMapping.fieldSetters()) {
try {
setter.setField(payload, resultSet);
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/Querier.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/Querier.java
index b97c751..8052ec2 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/Querier.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/Querier.java
@@ -19,8 +19,8 @@ package org.apache.rocketmq.connect.jdbc.source.querier;
import io.openmessaging.connector.api.data.ConnectRecord;
import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
import org.apache.rocketmq.connect.jdbc.dialect.provider.CachedConnectionProvider;
-import org.apache.rocketmq.connect.jdbc.source.metadata.SchemaMapping;
import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
+import org.apache.rocketmq.connect.jdbc.source.metadata.SchemaMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/TimestampIncrementingQuerier.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/TimestampIncrementingQuerier.java
index 7fbbb17..dfb6020 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/TimestampIncrementingQuerier.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/querier/TimestampIncrementingQuerier.java
@@ -20,15 +20,16 @@ import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.Struct;
import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
import org.apache.rocketmq.connect.jdbc.dialect.provider.CachedConnectionProvider;
-import org.apache.rocketmq.connect.jdbc.source.offset.SourceOffsetCompute;
-import org.apache.rocketmq.connect.jdbc.source.metadata.SchemaMapping;
-import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingCriteria;
-import org.apache.rocketmq.connect.jdbc.source.offset.TimestampIncrementingOffset;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnDefinition;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnId;
+import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingCriteria;
+import org.apache.rocketmq.connect.jdbc.source.metadata.SchemaMapping;
+import org.apache.rocketmq.connect.jdbc.source.offset.SourceOffsetCompute;
+import org.apache.rocketmq.connect.jdbc.source.offset.TimestampIncrementingOffset;
import org.apache.rocketmq.connect.jdbc.util.DateTimeUtils;
import org.apache.rocketmq.connect.jdbc.util.ExpressionBuilder;
import org.slf4j.Logger;
@@ -184,7 +185,7 @@ public class TimestampIncrementingQuerier extends Querier implements TimestampIn
@Override
public ConnectRecord extractRecord() throws SQLException {
Schema schema = schemaMapping.schema();
- Object[] payload = new Object[schema.getFields().size()];
+ Struct payload = new Struct(schema);
for (SchemaMapping.FieldSetter setter : schemaMapping.fieldSetters()) {
try {
setter.setField(payload, resultSet);
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/BytesUtil.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/BytesUtil.java
index b2d6667..c183a47 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/BytesUtil.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/BytesUtil.java
@@ -18,12 +18,12 @@ package org.apache.rocketmq.connect.jdbc.util;
public class BytesUtil {
- private static final char[] hexCode = "0123456789ABCDEF".toCharArray();
+ private static final char[] HEX_CODE = "0123456789ABCDEF".toCharArray();
public static String toHex(byte[] data) {
StringBuilder r = new StringBuilder(data.length * 2);
for (byte b : data) {
- r.append(hexCode[(b >> 4) & 0xF]);
- r.append(hexCode[b & 0xF]);
+ r.append(HEX_CODE[(b >> 4) & 0xF]);
+ r.append(HEX_CODE[b & 0xF]);
}
return r.toString();
}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/NumericMapping.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/NumericMapping.java
index e060a5c..3bd1a6d 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/NumericMapping.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/util/NumericMapping.java
@@ -28,17 +28,17 @@ public enum NumericMapping {
BEST_FIT,
BEST_FIT_EAGER_DOUBLE;
- private static final Map<String, NumericMapping> reverse = new HashMap<>(values().length);
+ private static final Map<String, NumericMapping> REVERSE = new HashMap<>(values().length);
static {
for (NumericMapping val : values()) {
- reverse.put(val.name().toLowerCase(Locale.ROOT), val);
+ REVERSE.put(val.name().toLowerCase(Locale.ROOT), val);
}
}
public static NumericMapping get(String prop) {
// not adding a check for null value because the recommender/validator should catch those.
- return reverse.get(prop.toLowerCase(Locale.ROOT));
+ return REVERSE.get(prop.toLowerCase(Locale.ROOT));
}
public static NumericMapping get(JdbcSourceConfig config) {
diff --git a/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/JdbcSinkTest.java b/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/JdbcSinkTest.java
new file mode 100644
index 0000000..ebba6b8
--- /dev/null
+++ b/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/JdbcSinkTest.java
@@ -0,0 +1,18 @@
+package org.apache.rocketmq.connect.jdbc.connector.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.connector.api.data.Struct;
+
+public class JdbcSinkTest {
+ static String test="{\"schema\":{\"fieldsByName\":{\"money\":{\"schema\":{\"optional\":true,\"fieldType\":\"FLOAT64\"},\"name\":\"money\",\"index\":5},\"name\":{\"schema\":{\"optional\":true,\"fieldType\":\"STRING\"},\"name\":\"name\",\"index\":1},\"begin_time\":{\"schema\":{\"name\":\"io.openmessaging.connector.api.data.logical.Timestamp\",\"optional\":true,\"fieldType\":\"INT64\",\"version\":1},\"name\":\"begin_time\",\"index\":6},\"company\":{\"schema\":{\"optional\":true,\"fieldT [...]
+
+ public static void main(String[] args) {
+ Struct struct = JSON.parseObject(test, Struct.class);
+ JSONObject object = JSON.parseObject(test);
+ Object[] values=object.getJSONArray("values").toArray();
+ struct.setValues(values);
+ System.out.println(object);
+ }
+
+}
diff --git a/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/OpenMLDBJdbcSinkTest.java b/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/OpenMLDBJdbcSinkTest.java
index 6d3a9d1..274db7f 100644
--- a/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/OpenMLDBJdbcSinkTest.java
+++ b/connectors/rocketmq-connect-jdbc/src/test/java/org/apache/rocketmq/connect/jdbc/connector/sink/OpenMLDBJdbcSinkTest.java
@@ -23,6 +23,7 @@ import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
import io.openmessaging.internal.DefaultKeyValue;
import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkTask;
import org.junit.After;
@@ -73,7 +74,7 @@ public class OpenMLDBJdbcSinkTest {
}
assertNotNull(connection);
KeyValue config = buildConfig(jdbcUrlDB);
- openJDBCSinkTask.init(config);
+ openJDBCSinkTask.start(config);
}
@After
@@ -95,6 +96,7 @@ public class OpenMLDBJdbcSinkTest {
config.put("insert.mode", "INSERT");
config.put("db.timezone", "UTC");
config.put("table.types", "TABLE");
+ config.put("auto.create", "true");
config.put("source-record-converter", "org.apache.rocketmq.connect.runtime.converter.JsonConverter");
return config;
}
@@ -106,16 +108,17 @@ public class OpenMLDBJdbcSinkTest {
public void testOpenMLDBJdbcSinkWriterTest() throws SQLException {
List<ConnectRecord> records = new ArrayList<>();
// build schema
- Schema schema = SchemaBuilder.struct().name(
- tableName
- ).build();
- schema.addField(new Field(0, "c1", SchemaBuilder.int32().build()));
- schema.addField(new Field(1, "c2", SchemaBuilder.string().build()));
+ Schema schema = SchemaBuilder.struct()
+ .name(tableName)
+ .field("c1",SchemaBuilder.int32().build())
+ .field("c2", SchemaBuilder.string().build())
+ .build();
// build record
- Object[] payload = new Object[2];
int param0 = 1001;
- payload[0] = param0;
- payload[1] = String.format("test-data-%s", param0);
+ Struct struct= new Struct(schema);
+ struct.put("c1",param0);
+ struct.put("c2",String.format("test-data-%s", param0));
+
ConnectRecord record = new ConnectRecord(
// offset partition
// offset partition"
@@ -123,7 +126,7 @@ public class OpenMLDBJdbcSinkTest {
new RecordOffset(new HashMap<>()),
System.currentTimeMillis(),
schema,
- payload
+ struct
);
records.add(record);
openJDBCSinkTask.put(records);
diff --git a/pom.xml b/pom.xml
index 0212f88..94a7e52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,7 @@
<assertj.version>2.6.0</assertj.version>
<mockito.version>3.2.4</mockito.version>
<httpclient.version>4.5.13</httpclient.version>
- <openmessaging.connector.version>0.1.2</openmessaging.connector.version>
+ <openmessaging.connector.version>0.1.3-SNAPSHOT</openmessaging.connector.version>
<fastjson.version>1.2.83</fastjson.version>
<javalin.version>2.8.0</javalin.version>
<slf4j.version>1.7.7</slf4j.version>
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index 86ce305..63cf7d1 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -80,8 +80,7 @@ public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
transformConfig.put(originKey, config.getString(key));
}
}
- transform.validate(transformConfig);
- transform.init(transformConfig);
+ transform.start(transformConfig);
this.transformList.add(transform);
} catch (Exception e) {
log.error("transform new instance error", e);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 48bb04d..58c98ce 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -526,8 +526,9 @@ public class Worker {
}
} catch (ExecutionException e) {
Throwable t = e.getCause();
+ log.error("Execution exception , {}", e);
} catch (CancellationException | TimeoutException | InterruptedException e) {
-
+ log.error("error, {}", e);
} finally {
future.cancel(true);
workerTask.cleanup();
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java
index f0433c2..2b037e0 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java
@@ -53,12 +53,12 @@ public class WorkerConnector {
}
public void initialize() {
- connector.init(keyValue);
+ connector.init(context);
}
public void start() {
connector.validate(keyValue);
- connector.start(context);
+ connector.start(keyValue);
}
public void stop() {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index 2a0e6c7..67e644a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -156,37 +156,42 @@ public class WorkerDirectTask implements WorkerTask {
}
private void starkSinkTask() {
- sinkTask.init(taskConfig);
- sinkTask.start(new SinkTaskContext() {
+ sinkTask.init(new SinkTaskContext() {
- @Override public String getConnectorName() {
+ @Override
+ public String getConnectorName() {
return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
}
- @Override public String getTaskName() {
+ @Override
+ public String getTaskName() {
return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
}
- @Override public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
+ @Override
+ public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
}
-
- @Override public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
+ @Override
+ public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
}
- @Override public void pause(List<RecordPartition> partitions) {
+ @Override
+ public void pause(List<RecordPartition> partitions) {
}
- @Override public void resume(List<RecordPartition> partitions) {
+ @Override
+ public void resume(List<RecordPartition> partitions) {
}
-
- @Override public Set<RecordPartition> assignment() {
+ @Override
+ public Set<RecordPartition> assignment() {
return null;
}
});
+ sinkTask.start(taskConfig);
log.info("Sink task start, config:{}", JSON.toJSONString(taskConfig));
}
@@ -197,8 +202,7 @@ public class WorkerDirectTask implements WorkerTask {
private void startSourceTask() {
state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
- sourceTask.init(taskConfig);
- sourceTask.start(new SourceTaskContext() {
+ sourceTask.init(new SourceTaskContext() {
@Override public OffsetStorageReader offsetStorageReader() {
return positionStorageReader;
}
@@ -211,6 +215,7 @@ public class WorkerDirectTask implements WorkerTask {
return null;
}
});
+ sourceTask.start(taskConfig);
state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
log.info("Source task start, config:{}", JSON.toJSONString(taskConfig));
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 0587875..62adc5b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -209,9 +209,9 @@ public class WorkerSinkTask implements WorkerTask {
consumer.start();
log.info("Sink task consumer start. taskConfig {}", JSON.toJSONString(taskConfig));
state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
- sinkTask.init(taskConfig);
this.sinkTaskContext = new WorkerSinkTaskContext(taskConfig, this, consumer);
- sinkTask.start(sinkTaskContext);
+ sinkTask.init(sinkTaskContext);
+ sinkTask.start(taskConfig);
// we assume executed here means we are safe
log.info("Sink task start, config:{}", JSON.toJSONString(taskConfig));
state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
index f2f67a0..b17cde3 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
@@ -19,6 +19,7 @@
package org.apache.rocketmq.connect.runtime.connectorwrapper;
import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
@@ -183,6 +184,16 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
return taskConfig.getString("taskId");
}
+ /**
+ * Get the configurations of current task.
+ *
+ * @return the configuration of current task.
+ */
+ @Override
+ public KeyValue configs() {
+ return taskConfig;
+ }
+
public Map<MessageQueue, Long> queuesOffsets() {
return this.messageQueuesOffsetMap;
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index da9891e..0de1f6f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -19,6 +19,7 @@
package org.apache.rocketmq.connect.runtime.connectorwrapper;
import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
@@ -161,8 +162,7 @@ public class WorkerSourceTask implements WorkerTask {
producer.start();
log.info("Source task producer start.");
state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
- sourceTask.init(taskConfig);
- sourceTask.start(new SourceTaskContext() {
+ sourceTask.init(new SourceTaskContext() {
@Override
public OffsetStorageReader offsetStorageReader() {
@@ -178,7 +178,18 @@ public class WorkerSourceTask implements WorkerTask {
public String getTaskName() {
return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
}
+
+ /**
+ * Get the configurations of current task.
+ *
+ * @return the configuration of current task.
+ */
+ @Override
+ public KeyValue configs() {
+ return taskConfig;
+ }
});
+ sourceTask.start(taskConfig);
state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
log.info("Source task start, config:{}", JSON.toJSONString(taskConfig));
while (WorkerState.STARTED == workerState.get() && WorkerTaskState.RUNNING == state.get()) {
@@ -316,7 +327,7 @@ public class WorkerSourceTask implements WorkerTask {
sourceMessage.setBody(messageBody);
}
} else {
- final byte[] messageBody = JSON.toJSONString(sourceDataEntry).getBytes();
+ final byte[] messageBody = JSON.toJSONString(sourceDataEntry, SerializerFeature.DisableCircularReferenceDetect).getBytes();
if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
log.error("Send record, message size is greater than {} bytes, sourceDataEntry: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(sourceDataEntry));
continue;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index adaa5ce..1134e1d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -185,7 +185,7 @@ public class ConfigManagementServiceImpl implements ConfigManagementService {
}
final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
connector.validate(configs);
- connector.init(configs);
+ connector.start(configs);
connectorKeyValueStore.put(connectorName, configs);
recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
return "";
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index f0736c4..5e8bb5b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -154,7 +154,7 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
}
final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
connector.validate(configs);
- connector.init(configs);
+ connector.start(configs);
connectorKeyValueStore.put(connectorName, configs);
recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
return "";
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
index aede927..80ab134 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
@@ -33,30 +33,21 @@ public class TestConnector extends Connector {
}
- @Override public void init(KeyValue config) {
+ /**
+ * Start the component
+ *
+ * @param config component context
+ */
+ @Override
+ public void start(KeyValue config) {
this.config = config;
}
- @Override public void start(ConnectorContext componentContext) {
-
- }
-
@Override
public void stop() {
}
-
- @Override
- public void pause() {
-
- }
-
- @Override
- public void resume() {
-
- }
-
@Override public List<KeyValue> taskConfigs(int maxTasks) {
List<KeyValue> configs = new ArrayList<>();
configs.add(this.config);
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSourceTask.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSourceTask.java
index 2c6a9ad..a131aef 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSourceTask.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSourceTask.java
@@ -49,11 +49,7 @@ public class TestSourceTask extends SourceTask {
}
- @Override public void init(KeyValue config) {
-
- }
-
- @Override public void start(SourceTaskContext sourceTaskContext) {
+ @Override public void start(KeyValue config) {
}
@@ -62,13 +58,4 @@ public class TestSourceTask extends SourceTask {
}
- @Override
- public void pause() {
-
- }
-
- @Override
- public void resume() {
-
- }
}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java
index 64e4582..6742325 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java
@@ -28,23 +28,23 @@ class TestTask implements Task {
}
- @Override public void init(KeyValue config) {
+ /**
+ * Init the component
+ *
+ * @param context component context
+ */
+ @Override
+ public void init(ComponentContext context) {
}
- @Override public void start(ComponentContext componentContext) {
- }
-
- @Override public void stop() {
+ @Override public void start(KeyValue config) {
}
- @Override public void pause() {
+ @Override public void stop() {
}
- @Override public void resume() {
-
- }
}
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkConnector.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkConnector.java
index 23de734..35a751d 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkConnector.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkConnector.java
@@ -37,21 +37,20 @@ public class FileSinkConnector extends SinkConnector {
}
}
- @Override public void init(KeyValue config) {
+ /**
+ * Start the component
+ *
+ * @param config component context
+ */
+ @Override
+ public void start(KeyValue config) {
this.config = config;
}
@Override public void stop() {
-
- }
-
- @Override public void pause() {
-
+ this.config = null;
}
- @Override public void resume() {
-
- }
@Override public Class<? extends Task> taskClass() {
return FileSinkTask.class;
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
index 08a8964..28dffd2 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSinkTask.java
@@ -60,7 +60,12 @@ public class FileSinkTask extends SinkTask {
outputStream.flush();
}
- @Override public void start(SinkTaskContext sinkTaskContext) {
+ @Override public void validate(KeyValue config) {
+
+ }
+
+ @Override public void start(KeyValue config) {
+ this.config = config;
fileConfig = new FileConfig();
fileConfig.load(config);
if (fileConfig.getFilename() == null || fileConfig.getFilename().isEmpty()) {
@@ -68,37 +73,22 @@ public class FileSinkTask extends SinkTask {
} else {
try {
outputStream = new PrintStream(
- Files.newOutputStream(Paths.get(fileConfig.getFilename()), StandardOpenOption.CREATE, StandardOpenOption.APPEND),
- false,
- StandardCharsets.UTF_8.name());
+ Files.newOutputStream(Paths.get(fileConfig.getFilename()), StandardOpenOption.CREATE, StandardOpenOption.APPEND),
+ false,
+ StandardCharsets.UTF_8.name());
} catch (IOException e) {
throw new ConnectException("Couldn't find or create file '" + fileConfig.getFilename() + "' for FileStreamSinkTask", e);
}
}
- }
-
- @Override public void validate(KeyValue config) {
}
- @Override public void init(KeyValue config) {
- this.config = config;
- }
-
@Override public void stop() {
if (outputStream != null && outputStream != System.out) {
outputStream.close();
}
}
- @Override public void pause() {
-
- }
-
- @Override public void resume() {
-
- }
-
private String logFilename() {
return fileConfig.getFilename() == null ? "stdout" : fileConfig.getFilename();
}
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceConnector.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceConnector.java
index 9a96a0f..58dd74a 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceConnector.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceConnector.java
@@ -37,21 +37,14 @@ public class FileSourceConnector extends SourceConnector {
}
}
- @Override public void init(KeyValue config) {
+ @Override public void start(KeyValue config) {
this.config = config;
}
@Override public void stop() {
-
- }
-
- @Override public void pause() {
-
+ this.config = null;
}
- @Override public void resume() {
-
- }
@Override public List<KeyValue> taskConfigs(int maxTasks) {
List<KeyValue> config = new ArrayList<>();
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
index 8309833..420f5ac 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.connect.file;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.source.SourceTask;
-import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.FieldType;
@@ -202,8 +201,11 @@ public class FileSourceTask extends SourceTask {
}
}
- @Override public void start(SourceTaskContext sourceTaskContext) {
- this.sourceTaskContext = sourceTaskContext;
+
+ @Override public void validate(KeyValue config) { }
+
+ @Override public void start(KeyValue config) {
+ this.config = config;
fileConfig = new FileConfig();
fileConfig.load(config);
log.info("fileName is:{}", fileConfig.getFilename());
@@ -214,14 +216,6 @@ public class FileSourceTask extends SourceTask {
}
}
- @Override public void validate(KeyValue config) {
-
- }
-
- @Override public void init(KeyValue config) {
- this.config = config;
- }
-
@Override public void stop() {
log.trace("Stopping");
synchronized (this) {
@@ -237,14 +231,6 @@ public class FileSourceTask extends SourceTask {
}
}
- @Override public void pause() {
-
- }
-
- @Override public void resume() {
-
- }
-
private String logFilename() {
return fileConfig.getFilename() == null ? "stdin" : fileConfig.getFilename();
}
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java
index 31f57a6..d0f8f02 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FilterTransform.java
@@ -23,6 +23,8 @@ import io.openmessaging.connector.api.data.ConnectRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+
public class FilterTransform implements Transform<ConnectRecord> {
private static final Logger log = LoggerFactory.getLogger(LoggerName.FILE_CONNECTOR);
@@ -37,20 +39,23 @@ public class FilterTransform implements Transform<ConnectRecord> {
return record;
}
- @Override public void validate(KeyValue config) {
-
- }
- @Override public void init(KeyValue config) {
+ /**
+ * Start the component
+ *
+ * @param config component context
+ */
+ @Override
+ public void start(KeyValue config) {
this.keyValue = config;
log.info("transform config {}", this.keyValue);
}
- @Override public void start(ComponentContext componentContext) {
-
- }
-
- @Override public void stop() {
+ /**
+ * Stop the component.
+ */
+ @Override
+ public void stop() {
}
}