You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 03:55:02 UTC
[inlong] branch master updated: [INLONG-6327][Agent] Support collect data from SQLServer by Debezium (#6338)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d07eb5165 [INLONG-6327][Agent] Support collect data from SQLServer by Debezium (#6338)
d07eb5165 is described below
commit d07eb51652f9e808c33580a890c487b8dcbbd47f
Author: haibo.duan <dh...@live.cn>
AuthorDate: Tue Nov 8 11:54:57 2022 +0800
[INLONG-6327][Agent] Support collect data from SQLServer by Debezium (#6338)
---
.../inlong/agent/constant/SqlServerConstants.java | 39 +++
.../apache/inlong/agent/pojo/JobProfileDto.java | 39 +++
.../org/apache/inlong/agent/pojo/SqlServerJob.java | 76 +++++
inlong-agent/agent-plugins/pom.xml | 5 +
.../agent/plugin/sources/SQLServerSource.java | 34 +-
.../plugin/sources/reader/SQLServerReader.java | 344 ++++++++++++++-------
.../sources/snapshot/SqlServerSnapshotBase.java | 52 ++++
.../agent/plugin/sources/TestSQLServerConnect.java | 48 +--
.../agent/plugin/sources/TestSQLServerReader.java | 156 +++++-----
.../agent/plugin/sources/TestSQLServerSource.java | 13 -
pom.xml | 6 +
11 files changed, 556 insertions(+), 256 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java
new file mode 100644
index 000000000..ffb4d7ae8
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+public class SqlServerConstants {
+
+ /**
+ * Takes a snapshot of structure and data of captured tables;
+ * useful if topics should be populated with a complete representation of the data from the captured tables.
+ */
+ public static final String INITIAL = "initial";
+
+ /**
+ * Takes a snapshot of structure and data like initial
+ * but instead does not transition into streaming changes once the snapshot has completed.
+ */
+ public static final String INITIAL_ONLY = "initial_only";
+
+ /**
+ * Takes a snapshot of the structure of captured tables only;
+ * useful if only changes happening from now onwards should be propagated to topics.
+ */
+ public static final String SCHEMA_ONLY = "schema_only";
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index d57b2c341..810a42d2b 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -58,6 +58,10 @@ public class JobProfileDto {
* mqtt source
*/
public static final String MQTT_SOURCE = "org.apache.inlong.agent.plugin.sources.MqttSource";
+ /**
+ * sqlserver source
+ */
+ public static final String SQLSERVER_SOURCE = "org.apache.inlong.agent.plugin.sources.SqlServerSource";
private static final Gson GSON = new Gson();
@@ -226,6 +230,34 @@ public class JobProfileDto {
return mongoJob;
}
+ private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
+ SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
+ SqlServerJob.SqlserverJobConfig.class);
+ SqlServerJob oracleJob = new SqlServerJob();
+ oracleJob.setUser(config.getUser());
+ oracleJob.setHostname(config.getHostname());
+ oracleJob.setPassword(config.getPassword());
+ oracleJob.setPort(config.getPort());
+ oracleJob.setServerName(config.getServerName());
+ oracleJob.setDbname(config.getDbname());
+
+ SqlServerJob.Offset offset = new SqlServerJob.Offset();
+ offset.setFilename(config.getOffsetFilename());
+ offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+ offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+ oracleJob.setOffset(offset);
+
+ SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
+ snapshot.setMode(config.getSnapshotMode());
+ oracleJob.setSnapshot(snapshot);
+
+ SqlServerJob.History history = new SqlServerJob.History();
+ history.setFilename(config.getHistoryFilename());
+ oracleJob.setHistory(history);
+
+ return oracleJob;
+ }
+
public static MqttJob getMqttJob(DataConfig dataConfigs) {
MqttJob.MqttJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
MqttJob.MqttJobConfig.class);
@@ -309,6 +341,12 @@ public class JobProfileDto {
job.setSource(KAFKA_SOURCE);
profileDto.setJob(job);
break;
+ case SQLSERVER:
+ SqlServerJob sqlserverJob = getSqlServerJob(dataConfig);
+ job.setSqlserverJob(sqlserverJob);
+ job.setSource(SQLSERVER_SOURCE);
+ profileDto.setJob(job);
+ break;
case MONGODB:
MongoJob mongoJob = getMongoJob(dataConfig);
job.setMongoJob(mongoJob);
@@ -349,6 +387,7 @@ public class JobProfileDto {
private KafkaJob kafkaJob;
private MongoJob mongoJob;
private MqttJob mqttJob;
+ private SqlServerJob sqlserverJob;
}
@Data
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java
new file mode 100644
index 000000000..735c745bb
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class SqlServerJob {
+
+ private String hostname;
+ private String user;
+ private String password;
+ private String port;
+ private String serverName;
+ private String dbname;
+
+ private SqlServerJob.Snapshot snapshot;
+ private SqlServerJob.Offset offset;
+ private SqlServerJob.History history;
+
+ @Data
+ public static class Offset {
+
+ private String intervalMs;
+ private String filename;
+ private String specificOffsetFile;
+ private String specificOffsetPos;
+ }
+
+ @Data
+ public static class Snapshot {
+
+ private String mode;
+ }
+
+ @Data
+ public static class History {
+
+ private String filename;
+ }
+
+ @Data
+ public static class SqlserverJobConfig {
+
+ private String hostname;
+ private String user;
+ private String password;
+ private String port;
+ private String dbname;
+ private String serverName;
+
+ private String snapshotMode;
+ private String intervalMs;
+ private String offsetFilename;
+ private String historyFilename;
+
+ private String specificOffsetFile;
+ private String specificOffsetPos;
+ }
+
+}
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index b51fd1963..fa8e03a41 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -85,6 +85,11 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-sqlserver</artifactId>
+ </dependency>
+
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
index af6d04a93..a65006152 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
@@ -17,56 +17,32 @@
package org.apache.inlong.agent.plugin.sources;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;
-import org.apache.inlong.agent.utils.AgentDbUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-import java.util.Objects;
/**
- * SQLServer SQL source, split SQLServer SQL source job into multi readers
+ * SQLServer source
*/
public class SQLServerSource extends AbstractSource {
private static final Logger logger = LoggerFactory.getLogger(SQLServerSource.class);
- public static final String JOB_DATABASE_SQL = "job.sql.command";
-
public SQLServerSource() {
}
- private List<Reader> splitSqlJob(String sqlPattern) {
- final List<Reader> result = new ArrayList<>();
- String[] sqlList = AgentDbUtils.replaceDynamicSeq(sqlPattern);
- if (Objects.nonNull(sqlList)) {
- Arrays.stream(sqlList).forEach(sql -> {
- result.add(new SQLServerReader(sql));
- });
- }
- return result;
- }
-
@Override
public List<Reader> split(JobProfile conf) {
super.init(conf);
- String sqlPattern = conf.get(JOB_DATABASE_SQL, StringUtils.EMPTY).toLowerCase();
- List<Reader> readerList = null;
- if (StringUtils.isNotEmpty(sqlPattern)) {
- readerList = splitSqlJob(sqlPattern);
- }
- if (CollectionUtils.isNotEmpty(readerList)) {
- sourceMetric.sourceSuccessCount.incrementAndGet();
- } else {
- sourceMetric.sourceFailCount.incrementAndGet();
- }
+ Reader sqlServerReader = new SQLServerReader();
+ List<Reader> readerList = new ArrayList<>();
+ readerList.add(sqlServerReader);
+ sourceMetric.sourceSuccessCount.incrementAndGet();
return readerList;
}
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
index c4daa8560..079d9af07 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
@@ -17,114 +17,129 @@
package org.apache.inlong.agent.plugin.sources.reader;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang3.CharUtils;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import io.debezium.connector.sqlserver.SqlServerConnector;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.relational.history.FileDatabaseHistory;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
+import org.apache.inlong.agent.constant.SqlServerConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.utils.AgentDbUtils;
+import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
+import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
+import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
+import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
+import org.apache.inlong.agent.utils.GsonUtil;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
-import static java.sql.Types.BINARY;
-import static java.sql.Types.BLOB;
-import static java.sql.Types.LONGVARBINARY;
-import static java.sql.Types.VARBINARY;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
/**
- * Read data from SQLServer database by SQL
+ * Read data from SQLServer database by Debezium
*/
public class SQLServerReader extends AbstractReader {
public static final String SQLSERVER_READER_TAG_NAME = "AgentSQLServerMetric";
- public static final String JOB_DATABASE_USER = "job.sqlserverJob.user";
- public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password";
public static final String JOB_DATABASE_HOSTNAME = "job.sqlserverJob.hostname";
public static final String JOB_DATABASE_PORT = "job.sqlserverJob.port";
+ public static final String JOB_DATABASE_USER = "job.sqlserverJob.user";
+ public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password";
public static final String JOB_DATABASE_DBNAME = "job.sqlserverJob.dbname";
- public static final String JOB_DATABASE_BATCH_SIZE = "job.sqlserverJob.batchSize";
- public static final int DEFAULT_JOB_DATABASE_BATCH_SIZE = 1000;
- public static final String JOB_DATABASE_DRIVER_CLASS = "job.database.driverClass";
- public static final String DEFAULT_JOB_DATABASE_DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
- public static final String STD_FIELD_SEPARATOR_SHORT = "\001";
- public static final String JOB_DATABASE_SEPARATOR = "job.sql.separator";
- // pre-set sql lines, commands like "set xxx=xx;"
- public static final String JOB_DATABASE_TYPE = "job.database.type";
- public static final String SQLSERVER = "sqlserver";
+ public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.sqlserverJob.snapshot.mode";
+ public static final String JOB_DATABASE_QUEUE_SIZE = "job.sqlserverJob.queueSize";
+ public static final String JOB_DATABASE_OFFSETS = "job.sqlserverJob.offsets";
+ public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.sqlserverJob.offset.specificOffsetFile";
+ public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.sqlserverJob.offset.specificOffsetPos";
+
+ public static final String JOB_DATABASE_SERVER_NAME = "job.sqlserverJob.serverName";
+
+ public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs";
+ public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename";
+
private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
- private static final String[] NEW_LINE_CHARS = new String[]{String.valueOf(CharUtils.CR),
- String.valueOf(CharUtils.LF)};
- private static final String[] EMPTY_CHARS = new String[]{StringUtils.EMPTY, StringUtils.EMPTY};
+ private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
- private final String sql;
+ private static final Gson GSON = new Gson();
- private PreparedStatement preparedStatement;
- private Connection conn;
- private ResultSet resultSet;
- private int columnCount;
+ private String databaseStoreHistoryName;
+ private String instanceId;
+ private String dbName;
+ private String serverName;
+ private String userName;
+ private String password;
+ private String hostName;
+ private String port;
+ private String offsetFlushIntervalMs;
+ private String offsetStoreFileName;
+ private String snapshotMode;
+ private String offset;
+ private String specificOffsetFile;
+ private String specificOffsetPos;
- // column types
- private String[] columnTypeNames;
- private int[] columnTypeCodes;
+ private ExecutorService executor;
+ private SqlServerSnapshotBase sqlServerSnapshot;
private boolean finished = false;
- private String separator;
- public SQLServerReader(String sql) {
- this.sql = sql;
+ private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue;
+ private JobProfile jobProfile;
+ private boolean destroyed = false;
+
+ public SQLServerReader() {
+
}
@Override
public Message read() {
- try {
- if (!resultSet.next()) {
- finished = true;
- return null;
- }
- final List<String> lineColumns = new ArrayList<>();
- for (int i = 1; i <= columnCount; i++) {
- final String dataValue;
- /* handle special blob value, encode with base64, BLOB=2004 */
- final int typeCode = columnTypeCodes[i - 1];
- final String typeName = columnTypeNames[i - 1];
-
- // binary type
- if (typeCode == BLOB || typeCode == BINARY || typeCode == VARBINARY
- || typeCode == LONGVARBINARY || typeName.contains("BLOB")) {
- final byte[] data = resultSet.getBytes(i);
- dataValue = new String(Base64.encodeBase64(data, false), StandardCharsets.UTF_8);
- } else {
- // non-binary type
- dataValue = StringUtils.replaceEachRepeatedly(resultSet.getString(i),
- NEW_LINE_CHARS, EMPTY_CHARS);
- }
- lineColumns.add(dataValue);
- }
- long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum();
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
- System.currentTimeMillis(), 1, dataSize);
- readerMetric.pluginReadSuccessCount.incrementAndGet();
- readerMetric.pluginReadCount.incrementAndGet();
- return generateMessage(lineColumns);
- } catch (Exception ex) {
- LOGGER.error("error while reading data", ex);
- readerMetric.pluginReadFailCount.incrementAndGet();
- readerMetric.pluginReadCount.incrementAndGet();
- throw new RuntimeException(ex);
+ if (!sqlServerMessageQueue.isEmpty()) {
+ return getSqlServerMessage();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * poll message from buffer pool
+ *
+ * @return org.apache.inlong.agent.plugin.Message
+ */
+ private DefaultMessage getSqlServerMessage() {
+ // Retrieves and removes the head of this queue,
+ // or returns null if this queue is empty.
+ Pair<String, String> message = sqlServerMessageQueue.poll();
+ if (Objects.isNull(message)) {
+ return null;
}
+ Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
+ header.put(PROXY_KEY_DATA, message.getKey());
+ return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header);
}
- private Message generateMessage(List<String> lineColumns) {
- return new DefaultMessage(StringUtils.join(lineColumns, separator).getBytes(StandardCharsets.UTF_8));
+ public boolean isDestroyed() {
+ return destroyed;
}
@Override
@@ -134,7 +149,7 @@ public class SQLServerReader extends AbstractReader {
@Override
public String getReadSource() {
- return sql;
+ return instanceId;
}
@Override
@@ -149,12 +164,16 @@ public class SQLServerReader extends AbstractReader {
@Override
public String getSnapshot() {
- return StringUtils.EMPTY;
+ if (sqlServerSnapshot != null) {
+ return sqlServerSnapshot.getSnapshot();
+ } else {
+ return StringUtils.EMPTY;
+ }
}
@Override
public void finishRead() {
- destroy();
+ this.finished = true;
}
@Override
@@ -162,59 +181,146 @@ public class SQLServerReader extends AbstractReader {
return true;
}
+ private String tryToInitAndGetHistoryPath() {
+ String historyPath = agentConf.get(
+ AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
+ String parentPath = agentConf.get(
+ AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
+ return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
+ }
+
@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
- int batchSize = jobConf.getInt(JOB_DATABASE_BATCH_SIZE, DEFAULT_JOB_DATABASE_BATCH_SIZE);
- String userName = jobConf.get(JOB_DATABASE_USER);
- String password = jobConf.get(JOB_DATABASE_PASSWORD);
- String hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
- String dbname = jobConf.get(JOB_DATABASE_DBNAME);
- int port = jobConf.getInt(JOB_DATABASE_PORT);
-
- String driverClass = jobConf.get(JOB_DATABASE_DRIVER_CLASS,
- DEFAULT_JOB_DATABASE_DRIVER_CLASS);
- separator = jobConf.get(JOB_DATABASE_SEPARATOR, STD_FIELD_SEPARATOR_SHORT);
+ jobProfile = jobConf;
+ LOGGER.info("init SqlServer reader with jobConf {}", jobConf.toJsonStr());
+ userName = jobConf.get(JOB_DATABASE_USER);
+ password = jobConf.get(JOB_DATABASE_PASSWORD);
+ hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
+ port = jobConf.get(JOB_DATABASE_PORT);
+ dbName = jobConf.get(JOB_DATABASE_DBNAME);
+ serverName = jobConf.get(JOB_DATABASE_SERVER_NAME);
+ instanceId = jobConf.getInstanceId();
+ offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
+ offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+ tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId;
+ snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, SqlServerConstants.INITIAL);
+ sqlServerMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
finished = false;
+
+ databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
+ tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId();
+ offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
+ specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
+ specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
+
+ sqlServerSnapshot = new SqlServerSnapshotBase(offsetStoreFileName);
+ sqlServerSnapshot.save(offset, sqlServerSnapshot.getFile());
+
+ Properties props = getEngineProps();
+
+ DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
+ io.debezium.engine.format.Json.class)
+ .using(props)
+ .notifying((records, committer) -> {
+ try {
+ for (ChangeEvent<String, String> record : records) {
+ DebeziumFormat debeziumFormat = GSON
+ .fromJson(record.value(), DebeziumFormat.class);
+ sqlServerMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value()));
+ committer.markProcessed(record);
+ }
+ committer.markBatchFinished();
+ long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+ System.currentTimeMillis(), records.size(), dataSize);
+ readerMetric.pluginReadSuccessCount.addAndGet(records.size());
+ readerMetric.pluginReadCount.addAndGet(records.size());
+ } catch (Exception e) {
+ readerMetric.pluginReadFailCount.addAndGet(records.size());
+ readerMetric.pluginReadCount.addAndGet(records.size());
+ LOGGER.error("parse SqlServer message error", e);
+ }
+ })
+ .using((success, message, error) -> {
+ if (!success) {
+ LOGGER.error("SqlServer job with jobConf {} has error {}", instanceId, message, error);
+ }
+ }).build();
+
+ executor = Executors.newSingleThreadExecutor();
+ executor.execute(engine);
+
+ LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
+ }
+
+ private String serializeOffset() {
+ Map<String, Object> sourceOffset = new HashMap<>();
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null");
+ sourceOffset.put("file", specificOffsetFile);
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null");
+ sourceOffset.put("pos", specificOffsetPos);
+ DebeziumOffset specificOffset = new DebeziumOffset();
+ specificOffset.setSourceOffset(sourceOffset);
+ Map<String, String> sourcePartition = new HashMap<>();
+ sourcePartition.put("server", instanceId);
+ specificOffset.setSourcePartition(sourcePartition);
+ byte[] serializedOffset = new byte[0];
try {
- String databaseType = jobConf.get(JOB_DATABASE_TYPE, SQLSERVER);
- String url = String.format("jdbc:%s://%s:%d;databaseName=%s;", databaseType, hostName, port, dbname);
- conn = AgentDbUtils.getConnectionFailover(driverClass, url, userName, password);
- preparedStatement = conn.prepareStatement(sql);
- preparedStatement.setFetchSize(batchSize);
- resultSet = preparedStatement.executeQuery();
-
- initColumnMeta();
- } catch (Exception ex) {
- LOGGER.error("error create statement", ex);
- destroy();
- throw new RuntimeException(ex);
+ serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+ } catch (IOException e) {
+ LOGGER.error("serialize offset message error", e);
}
+ return new String(serializedOffset, StandardCharsets.UTF_8);
}
- /**
- * Init column meta data.
- *
- * @throws Exception - sql exception
- */
- private void initColumnMeta() throws Exception {
- columnCount = resultSet.getMetaData().getColumnCount();
- columnTypeNames = new String[columnCount];
- columnTypeCodes = new int[columnCount];
- for (int i = 0; i < columnCount; i++) {
- columnTypeCodes[i] = resultSet.getMetaData().getColumnType(i + 1);
- String t = resultSet.getMetaData().getColumnTypeName(i + 1);
- if (t != null) {
- columnTypeNames[i] = t.toUpperCase();
- }
+ private Properties getEngineProps() {
+ Properties props = new Properties();
+ props.setProperty("name", "engine" + instanceId);
+ props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
+ props.setProperty("database.hostname", hostName);
+ props.setProperty("database.port", port);
+ props.setProperty("database.user", userName);
+ props.setProperty("database.password", password);
+ props.setProperty("database.dbname", dbName);
+ props.setProperty("database.server.name", serverName);
+ props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
+ props.setProperty("database.snapshot.mode", snapshotMode);
+ props.setProperty("key.converter.schemas.enable", "false");
+ props.setProperty("value.converter.schemas.enable", "false");
+ props.setProperty("snapshot.mode", snapshotMode);
+ props.setProperty("offset.storage.file.filename", offsetStoreFileName);
+ props.setProperty("database.history.file.filename", databaseStoreHistoryName);
+ if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+ props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
+ props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+ props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName());
+ } else {
+ props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
+ props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
}
+ props.setProperty("tombstones.on.delete", "false");
+ props.setProperty("converters", "datetime");
+ props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
+ props.setProperty("datetime.format.date", "yyyy-MM-dd");
+ props.setProperty("datetime.format.time", "HH:mm:ss");
+ props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
+ props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");
+
+ LOGGER.info("SqlServer job {} start with props {}", jobProfile.getInstanceId(), props);
+ return props;
}
@Override
public void destroy() {
- finished = true;
- AgentUtils.finallyClose(resultSet);
- AgentUtils.finallyClose(preparedStatement);
- AgentUtils.finallyClose(conn);
+ synchronized (this) {
+ if (!destroyed) {
+ this.executor.shutdownNow();
+ this.sqlServerSnapshot.close();
+ this.destroyed = true;
+ }
+ }
}
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java
new file mode 100644
index 000000000..62ee38618
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.snapshot;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ * SqlServer Snapshot
+ */
+public class SqlServerSnapshotBase extends AbstractSnapshot {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSnapshotBase.class);
+ private final File file;
+
+ public SqlServerSnapshotBase(String filePath) {
+ file = new File(filePath);
+ }
+
+ @Override
+ public String getSnapshot() {
+ byte[] offset = this.load(this.file);
+ return ENCODER.encodeToString(offset);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java
index b278abb73..58f83e2f4 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java
@@ -18,43 +18,47 @@
package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;
import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.Objects;
+import java.util.UUID;
-import static org.junit.Assert.assertNotNull;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
/**
* Test cases for {@link SQLServerReader}.
*/
public class TestSQLServerConnect {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestSQLServerConnect.class);
+
/**
* Just using in local test.
*/
+
@Ignore
- public void testSQLServerReader() {
- JobProfile jobProfile = JobProfile.parseJsonStr("{}");
- jobProfile.set(SQLServerReader.JOB_DATABASE_USER, "sa");
- jobProfile.set(SQLServerReader.JOB_DATABASE_PASSWORD, "123456");
- jobProfile.set(SQLServerReader.JOB_DATABASE_HOSTNAME, "127.0.0.1");
- jobProfile.set(SQLServerReader.JOB_DATABASE_PORT, "1434");
- jobProfile.set(SQLServerReader.JOB_DATABASE_DBNAME, "inlong");
- final String sql = "select * from dbo.test01";
- jobProfile.set(SQLServerSource.JOB_DATABASE_SQL, sql);
- final SQLServerSource source = new SQLServerSource();
- List<Reader> readers = source.split(jobProfile);
- for (Reader reader : readers) {
- reader.init(jobProfile);
- while (!reader.isFinished()) {
- Message message = reader.read();
- if (Objects.nonNull(message)) {
- assertNotNull(message.getBody());
- }
+ public void testSqlServer() {
+ JobProfile jobProfile = new JobProfile();
+ jobProfile.set("job.sqlserverJob.hostname", "localhost");
+ jobProfile.set("job.sqlserverJob.port", "1434");
+ jobProfile.set("job.sqlserverJob.user", "sa");
+ jobProfile.set("job.sqlserverJob.password", "123456");
+ jobProfile.set("job.sqlserverJob.dbname", "inlong");
+ jobProfile.set("job.sqlserverJob.serverName", "fullfillment");
+ jobProfile.set(JobConstants.JOB_INSTANCE_ID, UUID.randomUUID().toString());
+ jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString());
+ jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString());
+ SQLServerReader sqlServerReader = new SQLServerReader();
+ sqlServerReader.init(jobProfile);
+ while (true) {
+ Message message = sqlServerReader.read();
+ if (message != null) {
+ LOGGER.info("event content: {}", message);
}
}
}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
index 23f2fc416..33e291fbe 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
@@ -17,15 +17,17 @@
package org.apache.inlong.agent.plugin.sources;
-import org.apache.commons.lang3.StringUtils;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.SqlServerConstants;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;
-import org.apache.inlong.agent.utils.AgentDbUtils;
+import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase;
import org.apache.inlong.common.metric.MetricRegister;
import org.junit.Before;
import org.junit.Test;
@@ -35,11 +37,10 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.Types;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
@@ -49,7 +50,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.field;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
@@ -60,7 +60,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
* Test cases for {@link SQLServerReader}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({AgentDbUtils.class, MetricRegister.class, AuditUtils.class, SQLServerReader.class})
+@PrepareForTest({DebeziumEngine.class, Executors.class, MetricRegister.class, SQLServerReader.class})
@PowerMockIgnore({"javax.management.*"})
public class TestSQLServerReader {
@@ -70,28 +70,31 @@ public class TestSQLServerReader {
private JobProfile jobProfile;
@Mock
- private Connection conn;
+ private AgentMetricItemSet agentMetricItemSet;
@Mock
- private PreparedStatement preparedStatement;
+ private AgentMetricItem agentMetricItem;
@Mock
- private ResultSet resultSet;
+ private SqlServerSnapshotBase sqlServerSnapshot;
@Mock
- private ResultSetMetaData metaData;
+ private DebeziumEngine.Builder builder;
@Mock
- private AgentMetricItemSet agentMetricItemSet;
+ private ExecutorService executorService;
@Mock
- private AgentMetricItem agentMetricItem;
+ private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue;
+
+ @Mock
+ private DebeziumEngine<ChangeEvent<String, String>> engine;
private AtomicLong atomicLong;
private AtomicLong atomicCountLong;
- private String sql;
+ private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d";
@Before
public void setUp() throws Exception {
@@ -99,44 +102,60 @@ public class TestSQLServerReader {
final String password = "123456";
final String hostname = "127.0.0.1";
final String port = "1434";
- final String dbname = "inlong";
- final String typeName1 = "int";
- final String typeName2 = "varchar";
final String groupId = "group01";
final String streamId = "stream01";
+ final String dbName = "inlong";
+ final String serverName = "server1";
+ final String offsetFlushIntervalMs = "1000";
+ final String offsetStoreFileName = "/opt/offset.dat";
+ final String snapshotMode = SqlServerConstants.INITIAL;
+ final int queueSize = 1000;
+ final String databaseStoreHistoryName = "/opt/history.dat";
+ final String offset = "111";
+ final String specificOffsetFile = "";
+ final String specificOffsetPos = "-1";
+
atomicLong = new AtomicLong(0L);
atomicCountLong = new AtomicLong(0L);
- sql = "select * from dbo.test01";
-
+ when(jobProfile.getInstanceId()).thenReturn(instanceId);
when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn(groupId);
when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn(streamId);
when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_USER))).thenReturn(username);
when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_PASSWORD))).thenReturn(password);
when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_HOSTNAME))).thenReturn(hostname);
when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_PORT))).thenReturn(port);
- when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DBNAME))).thenReturn(dbname);
- when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DRIVER_CLASS), anyString())).thenReturn(
- SQLServerReader.DEFAULT_JOB_DATABASE_DRIVER_CLASS);
- when(jobProfile.getInt(eq(SQLServerReader.JOB_DATABASE_BATCH_SIZE), anyInt())).thenReturn(
- SQLServerReader.DEFAULT_JOB_DATABASE_BATCH_SIZE);
- when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_TYPE), anyString())).thenReturn(
- SQLServerReader.SQLSERVER);
- when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SEPARATOR), anyString())).thenReturn(
- SQLServerReader.STD_FIELD_SEPARATOR_SHORT);
- mockStatic(AgentDbUtils.class);
- when(AgentDbUtils.getConnectionFailover(eq(SQLServerReader.DEFAULT_JOB_DATABASE_DRIVER_CLASS), anyString(),
- eq(username), eq(password))).thenReturn(conn);
- when(conn.prepareStatement(anyString())).thenReturn(preparedStatement);
- when(preparedStatement.executeQuery()).thenReturn(resultSet);
- when(resultSet.getMetaData()).thenReturn(metaData);
- when(metaData.getColumnCount()).thenReturn(2);
- when(metaData.getColumnName(1)).thenReturn("id");
- when(metaData.getColumnName(2)).thenReturn("cell");
- when(metaData.getColumnType(1)).thenReturn(Types.INTEGER);
- when(metaData.getColumnType(2)).thenReturn(Types.VARCHAR);
- when(metaData.getColumnTypeName(1)).thenReturn(typeName1);
- when(metaData.getColumnTypeName(2)).thenReturn(typeName2);
+ when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DBNAME))).thenReturn(dbName);
+ when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SERVER_NAME))).thenReturn(serverName);
+ when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_OFFSET_INTERVAL_MS), anyString())).thenReturn(
+ offsetFlushIntervalMs);
+ when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_HISTORY_FILENAME), anyString())).thenReturn(
+ offsetStoreFileName);
+ when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SNAPSHOT_MODE), anyString())).thenReturn(snapshotMode);
+ when(jobProfile.getInt(eq(SQLServerReader.JOB_DATABASE_QUEUE_SIZE), anyInt())).thenReturn(queueSize);
+ when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_HISTORY_FILENAME))).thenReturn(
+ databaseStoreHistoryName);
+ when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSETS), anyString())).thenReturn(offset);
+ when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE), anyString())).thenReturn(
+ specificOffsetFile);
+ when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS), anyString())).thenReturn(
+ specificOffsetPos);
+ whenNew(SqlServerSnapshotBase.class).withAnyArguments().thenReturn(sqlServerSnapshot);
+
+ //mock sqlServerMessageQueue
+ whenNew(LinkedBlockingQueue.class).withAnyArguments().thenReturn(sqlServerMessageQueue);
+
+ //mock DebeziumEngine
+ mockStatic(DebeziumEngine.class);
+ when(DebeziumEngine.create(io.debezium.engine.format.Json.class)).thenReturn(builder);
+ when(builder.using(any(Properties.class))).thenReturn(builder);
+ when(builder.notifying(any(DebeziumEngine.ChangeConsumer.class))).thenReturn(builder);
+ when(builder.using(any(DebeziumEngine.CompletionCallback.class))).thenReturn(builder);
+ when(builder.build()).thenReturn(engine);
+
+ //mock executorService
+ mockStatic(Executors.class);
+ when(Executors.newSingleThreadExecutor()).thenReturn(executorService);
//mock metrics
whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
@@ -146,7 +165,7 @@ public class TestSQLServerReader {
//init method
mockStatic(MetricRegister.class);
- (reader = new SQLServerReader(sql)).init(jobProfile);
+ (reader = new SQLServerReader()).init(jobProfile);
}
/**
@@ -154,23 +173,16 @@ public class TestSQLServerReader {
*/
@Test
public void testRead() throws Exception {
- final String v11 = "11";
- final String v12 = "12";
- final String v21 = "aa";
- final String v22 = "bb";
-
- final String msg1 = String.join(SQLServerReader.STD_FIELD_SEPARATOR_SHORT, v11, v12);
- final String msg2 = String.join(SQLServerReader.STD_FIELD_SEPARATOR_SHORT, v21, v22);
-
- when(resultSet.next()).thenReturn(true, true, false);
- when(resultSet.getString(1)).thenReturn(v11, v21);
- when(resultSet.getString(2)).thenReturn(v12, v22);
- Message message1 = reader.read();
- assertEquals(msg1, message1.toString());
- verify(preparedStatement, times(1)).setFetchSize(SQLServerReader.DEFAULT_JOB_DATABASE_BATCH_SIZE);
- Message message2 = reader.read();
- assertEquals(msg2, message2.toString());
- assertEquals(2L, atomicLong.get());
+ final String right = "value";
+ final String left = "key";
+ final String dataKey = "dataKey";
+ when(sqlServerMessageQueue.isEmpty()).thenReturn(true);
+ assertEquals(null, reader.read());
+ when(sqlServerMessageQueue.isEmpty()).thenReturn(false);
+ when(sqlServerMessageQueue.poll()).thenReturn(Pair.of(left, right));
+ Message result = reader.read();
+ assertEquals(String.join(right, "\"", "\""), result.toString());
+ assertEquals(left, result.getHeader().get(dataKey));
}
/**
@@ -178,12 +190,11 @@ public class TestSQLServerReader {
*/
@Test
public void testDestroy() throws Exception {
- assertFalse(reader.isFinished());
+ assertFalse(reader.isDestroyed());
reader.destroy();
- verify(resultSet).close();
- verify(preparedStatement).close();
- verify(conn).close();
- assertTrue(reader.isFinished());
+ verify(executorService).shutdownNow();
+ verify(sqlServerSnapshot).close();
+ assertTrue(reader.isDestroyed());
}
/**
@@ -192,10 +203,7 @@ public class TestSQLServerReader {
@Test
public void testFinishRead() throws Exception {
assertFalse(reader.isFinished());
- reader.destroy();
- verify(resultSet).close();
- verify(preparedStatement).close();
- verify(conn).close();
+ reader.finishRead();
assertTrue(reader.isFinished());
}
@@ -212,7 +220,9 @@ public class TestSQLServerReader {
*/
@Test
public void testGetSnapshot() {
- assertEquals(StringUtils.EMPTY, reader.getSnapshot());
+ final String snapShort = "snapShort";
+ when(sqlServerSnapshot.getSnapshot()).thenReturn(snapShort);
+ assertEquals(snapShort, reader.getSnapshot());
}
/**
@@ -220,6 +230,6 @@ public class TestSQLServerReader {
*/
@Test
public void testGetReadSource() {
- assertEquals(sql, reader.getReadSource());
+ assertEquals(instanceId, reader.getReadSource());
}
}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 97646ccbf..90b71df9a 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -17,9 +17,7 @@
package org.apache.inlong.agent.plugin.sources;
-import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.common.metric.MetricItem;
@@ -38,7 +36,6 @@ import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.powermock.api.mockito.PowerMockito.whenNew;
import static org.powermock.api.support.membermodification.MemberMatcher.field;
@@ -84,20 +81,10 @@ public class TestSQLServerSource {
*/
@Test
public void testSplit() {
- final String sql1 = "select * from dbo.test01";
- final String sql2 = "select * from dbo.test${01,99}";
// build mock
- when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn("test_group");
- when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn("test_stream");
- when(jobProfile.get(eq(SQLServerSource.JOB_DATABASE_SQL), eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY,
- sql1, sql2);
-
final SQLServerSource source = new SQLServerSource();
-
// assert
- assertEquals(null, source.split(jobProfile));
assertEquals(1, source.split(jobProfile).size());
- assertEquals(99, source.split(jobProfile).size());
}
}
diff --git a/pom.xml b/pom.xml
index eccb11a27..08785dc82 100644
--- a/pom.xml
+++ b/pom.xml
@@ -606,6 +606,12 @@
<version>${debezium.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-sqlserver</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>