You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/19 01:58:03 UTC

[incubator-inlong] branch master updated: [INLONG-2179][Feature] [Agent] support db SQL collect - add mysql connector (#2180)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c84517  [INLONG-2179][Feature] [Agent] support db SQL collect - add mysql connector (#2180)
1c84517 is described below

commit 1c8451777b06eb191fd13664a5122cda46cd9592
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Wed Jan 19 09:57:58 2022 +0800

    [INLONG-2179][Feature] [Agent] support db SQL collect - add mysql connector (#2180)
---
 .../src/main/java/org/apache/inlong/agent/db/JobProfileDb.java   | 2 +-
 .../src/main/java/org/apache/inlong/agent/core/job/Job.java      | 1 +
 .../main/java/org/apache/inlong/agent/core/job/JobWrapper.java   | 2 +-
 inlong-agent/agent-plugins/pom.xml                               | 6 ++++++
 .../org/apache/inlong/agent/plugin/sources/DataBaseSource.java   | 9 +++++++--
 5 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
index 086091c..5898299 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
@@ -62,7 +62,7 @@ public class JobProfileDb {
             String keyName = jobProfile.get(JobConstants.JOB_INSTANCE_ID);
             jobProfile.setLong(JobConstants.JOB_STORE_TIME, System.currentTimeMillis());
             KeyValueEntity entity = new KeyValueEntity(keyName,
-                jobProfile.toJsonStr(), jobProfile.get(JobConstants.JOB_DIR_FILTER_PATTERN));
+                jobProfile.toJsonStr(), jobProfile.get(JobConstants.JOB_DIR_FILTER_PATTERN, ""));
             entity.setStateSearchKey(StateSearchKey.ACCEPTED);
             db.put(entity);
         }
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 2109e66..4c9ac92 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -92,6 +92,7 @@ public class Job {
                 taskList.add(new Task(taskId, reader, writer, channel, getJobConf()));
             }
         } catch (Exception ex) {
+            LOGGER.error("create taks fail", ex);
             throw new RuntimeException(ex);
         }
         return taskList;
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
index 81f897c..21900ff 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
@@ -113,7 +113,7 @@ public class JobWrapper extends AbstractStateWrapper {
         } catch (Exception ex) {
             doChangeState(State.FAILED);
             LOGGER.error("error caught: {}, message: {}",
-                    job.getJobConf().toJsonStr(), ex.getMessage());
+                    job.getJobConf().toJsonStr(), ex);
         }
     }
 
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index 6802cc0..763085e 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -42,6 +42,7 @@
     </repositories>
     <properties>
         <powermock.version>2.0.2</powermock.version>
+        <mysql-connector-java.version>8.0.26</mysql-connector-java.version>
     </properties>
 
     <dependencies>
@@ -62,6 +63,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql-connector-java.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>agent-core</artifactId>
         </dependency>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
index 9dd6efe..8b9ad8c 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.plugin.sources;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.Source;
@@ -27,6 +28,7 @@ import org.apache.inlong.agent.plugin.metrics.SourceMetrics;
 import org.apache.inlong.agent.plugin.metrics.SourcePrometheusMetrics;
 import org.apache.inlong.agent.plugin.sources.reader.SqlReader;
 import org.apache.inlong.agent.utils.AgentDbUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ConfigUtil;
 
 /**
@@ -39,12 +41,15 @@ public class DataBaseSource implements Source {
     private static final String DATABASE_SOURCE_TAG_NAME = "AgentDatabaseSourceMetric";
 
     private final SourceMetrics sourceMetrics;
+    private static AtomicLong metricsIndex = new AtomicLong(0);
 
     public DataBaseSource() {
         if (ConfigUtil.isPrometheusEnabled()) {
-            this.sourceMetrics = new SourcePrometheusMetrics(DATABASE_SOURCE_TAG_NAME);
+            this.sourceMetrics = new SourcePrometheusMetrics(AgentUtils.getUniqId(
+                DATABASE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
         } else {
-            this.sourceMetrics = new SourceJmxMetric(DATABASE_SOURCE_TAG_NAME);
+            this.sourceMetrics = new SourceJmxMetric(AgentUtils.getUniqId(
+                DATABASE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
         }
     }