You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/19 01:58:03 UTC
[incubator-inlong] branch master updated: [INLONG-2179][Feature] [Agent] support db SQL collect - add mysql connector (#2180)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1c84517 [INLONG-2179][Feature] [Agent] support db SQL collect - add mysql connector (#2180)
1c84517 is described below
commit 1c8451777b06eb191fd13664a5122cda46cd9592
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Wed Jan 19 09:57:58 2022 +0800
[INLONG-2179][Feature] [Agent] support db SQL collect - add mysql connector (#2180)
---
.../src/main/java/org/apache/inlong/agent/db/JobProfileDb.java | 2 +-
.../src/main/java/org/apache/inlong/agent/core/job/Job.java | 1 +
.../main/java/org/apache/inlong/agent/core/job/JobWrapper.java | 2 +-
inlong-agent/agent-plugins/pom.xml | 6 ++++++
.../org/apache/inlong/agent/plugin/sources/DataBaseSource.java | 9 +++++++--
5 files changed, 16 insertions(+), 4 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
index 086091c..5898299 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
@@ -62,7 +62,7 @@ public class JobProfileDb {
String keyName = jobProfile.get(JobConstants.JOB_INSTANCE_ID);
jobProfile.setLong(JobConstants.JOB_STORE_TIME, System.currentTimeMillis());
KeyValueEntity entity = new KeyValueEntity(keyName,
- jobProfile.toJsonStr(), jobProfile.get(JobConstants.JOB_DIR_FILTER_PATTERN));
+ jobProfile.toJsonStr(), jobProfile.get(JobConstants.JOB_DIR_FILTER_PATTERN, ""));
entity.setStateSearchKey(StateSearchKey.ACCEPTED);
db.put(entity);
}
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 2109e66..4c9ac92 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -92,6 +92,7 @@ public class Job {
taskList.add(new Task(taskId, reader, writer, channel, getJobConf()));
}
} catch (Exception ex) {
+ LOGGER.error("create taks fail", ex);
throw new RuntimeException(ex);
}
return taskList;
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
index 81f897c..21900ff 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
@@ -113,7 +113,7 @@ public class JobWrapper extends AbstractStateWrapper {
} catch (Exception ex) {
doChangeState(State.FAILED);
LOGGER.error("error caught: {}, message: {}",
- job.getJobConf().toJsonStr(), ex.getMessage());
+ job.getJobConf().toJsonStr(), ex);
}
}
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index 6802cc0..763085e 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -42,6 +42,7 @@
</repositories>
<properties>
<powermock.version>2.0.2</powermock.version>
+ <mysql-connector-java.version>8.0.26</mysql-connector-java.version>
</properties>
<dependencies>
@@ -62,6 +63,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql-connector-java.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>agent-core</artifactId>
</dependency>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
index 9dd6efe..8b9ad8c 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.plugin.sources;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
@@ -27,6 +28,7 @@ import org.apache.inlong.agent.plugin.metrics.SourceMetrics;
import org.apache.inlong.agent.plugin.metrics.SourcePrometheusMetrics;
import org.apache.inlong.agent.plugin.sources.reader.SqlReader;
import org.apache.inlong.agent.utils.AgentDbUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ConfigUtil;
/**
@@ -39,12 +41,15 @@ public class DataBaseSource implements Source {
private static final String DATABASE_SOURCE_TAG_NAME = "AgentDatabaseSourceMetric";
private final SourceMetrics sourceMetrics;
+ private static AtomicLong metricsIndex = new AtomicLong(0);
public DataBaseSource() {
if (ConfigUtil.isPrometheusEnabled()) {
- this.sourceMetrics = new SourcePrometheusMetrics(DATABASE_SOURCE_TAG_NAME);
+ this.sourceMetrics = new SourcePrometheusMetrics(AgentUtils.getUniqId(
+ DATABASE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
} else {
- this.sourceMetrics = new SourceJmxMetric(DATABASE_SOURCE_TAG_NAME);
+ this.sourceMetrics = new SourceJmxMetric(AgentUtils.getUniqId(
+ DATABASE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
}
}