You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/17 11:48:12 UTC

[incubator-inlong] branch master updated: [INLONG-2167][Feature] [Agent] support db SQL collect (#2168)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a53492  [INLONG-2167][Feature] [Agent] support db SQL collect (#2168)
8a53492 is described below

commit 8a53492d2c0a651e7ce57a6d57f467fa472a3283
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Mon Jan 17 19:48:07 2022 +0800

    [INLONG-2167][Feature] [Agent] support db SQL collect (#2168)
---
 .../org/apache/inlong/agent/plugin/Reader.java     |  4 +--
 .../java/org/apache/inlong/agent/plugin/Sink.java  |  2 +-
 .../apache/inlong/agent/core/conf/ConfigJetty.java |  5 +++-
 .../java/org/apache/inlong/agent/core/job/Job.java |  2 +-
 .../agent/core/task/TaskPositionManager.java       | 14 ++++-----
 .../apache/inlong/agent/task/TestTaskWrapper.java  |  4 +--
 .../inlong/agent/plugin/sinks/ConsoleSink.java     |  2 +-
 .../inlong/agent/plugin/sinks/ProxySink.java       | 14 ++++-----
 .../inlong/agent/plugin/sinks/PulsarSink.java      |  2 +-
 .../inlong/agent/plugin/sinks/SenderManager.java   |  8 ++---
 .../agent/plugin/sources/DataBaseSource.java       |  2 +-
 .../agent/plugin/sources/reader/SqlReader.java     | 34 ++++++++++++----------
 .../plugin/sources/reader/TextFileReader.java      |  2 +-
 .../agent/plugin/filter/TestStreamIdFilter.java    |  2 +-
 .../apache/inlong/agent/plugin/sinks/MockSink.java |  4 +--
 15 files changed, 53 insertions(+), 48 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
index ccc9fbc..1ade2af 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
@@ -37,10 +37,10 @@ public interface Reader extends Stage {
     boolean isFinished();
 
     /**
-     * Return the reader's reading file name
+     * Return the reader's reading source name
      * @return
      */
-    String getReadFile();
+    String getReadSource();
 
     /**
      * set readTimeout
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java
index 22ee95f..f9fc7d3 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Sink.java
@@ -35,7 +35,7 @@ public interface Sink extends Stage {
      * set source file name where the message is generated
      * @param sourceFileName
      */
-    void setSourceFile(String sourceFileName);
+    void setSourceName(String sourceFileName);
 
     /**
      * every sink should include a message filter to filter out stream id
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
index 49f40b8..3afee90 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.core.conf;
 
+import static org.apache.inlong.agent.constants.JobConstants.JOB_DIR_FILTER_PATTERN;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_TRIGGER;
 
 import java.io.Closeable;
@@ -78,8 +79,10 @@ public class ConfigJetty implements Closeable {
             if (jobProfile.hasKey(JOB_TRIGGER)) {
                 triggerManager.submitTrigger(
                     TriggerProfile.parseJsonStr(jobProfile.toJsonStr()));
-            } else {
+            } else if (jobProfile.hasKey(JOB_DIR_FILTER_PATTERN)) {
                 jobManager.submitFileJobProfile(jobProfile);
+            } else {
+                jobManager.submitSqlJobProfile(jobProfile);
             }
         }
     }
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index f24091e..2109e66 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -86,7 +86,7 @@ public class Job {
             Source source = (Source) Class.forName(jobConf.get(JobConstants.JOB_SOURCE)).newInstance();
             for (Reader reader : source.split(jobConf)) {
                 Sink writer = (Sink) Class.forName(jobConf.get(JobConstants.JOB_SINK)).newInstance();
-                writer.setSourceFile(reader.getReadFile());
+                writer.setSourceName(reader.getReadSource());
                 Channel channel = (Channel) Class.forName(jobConf.get(JobConstants.JOB_CHANNEL)).newInstance();
                 String taskId = String.format("%s_%d", jobInstanceId, index++);
                 taskList.add(new Task(taskId, reader, writer, channel, getJobConf()));
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index 0080810..b4392fc 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -126,14 +126,14 @@ public class TaskPositionManager extends AbstractDaemon {
         waitForTerminate();
     }
 
-    public void updateFileSinkPosition(String jobInstanceId, String sourceFilePath, long size) {
-        ConcurrentHashMap<String, Long> filePositionTemp = new ConcurrentHashMap<>();
-        ConcurrentHashMap<String, Long> filePosition = jobTaskPositionMap.putIfAbsent(jobInstanceId, filePositionTemp);
-        if (filePosition == null) {
-            filePosition = filePositionTemp;
+    public void updateSinkPosition(String jobInstanceId, String sourcePath, long size) {
+        ConcurrentHashMap<String, Long> positionTemp = new ConcurrentHashMap<>();
+        ConcurrentHashMap<String, Long> position = jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
+        if (position == null) {
+            position = positionTemp;
         }
-        Long beforePosition = filePosition.getOrDefault(sourceFilePath, 1L);
-        filePosition.put(sourceFilePath, beforePosition + size);
+        Long beforePosition = position.getOrDefault(sourcePath, 1L);
+        position.put(sourcePath, beforePosition + size);
     }
 
     public ConcurrentHashMap<String, Long> getTaskPositionMap(String jobId) {
diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
index 7ba221a..6ed78e7 100755
--- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
+++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
@@ -130,7 +130,7 @@ public class TestTaskWrapper {
         }
 
         @Override
-        public String getReadFile() {
+        public String getReadSource() {
             return null;
         }
 
@@ -170,7 +170,7 @@ public class TestTaskWrapper {
         }
 
         @Override
-        public void setSourceFile(String sourceFileName) {
+        public void setSourceName(String sourceFileName) {
 
         }
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
index 724c86f..c9a408d 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
@@ -57,7 +57,7 @@ public class ConsoleSink implements Sink {
     }
 
     @Override
-    public void setSourceFile(String sourceFileName) {
+    public void setSourceName(String sourceFileName) {
 
     }
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index b28eebb..a0e30e7 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -78,7 +78,7 @@ public class ProxySink extends AbstractSink {
     private byte[] fieldSplitter;
     private String inlongGroupId;
     private String inlongStreamId;
-    private String sourceFile;
+    private String sourceName;
     private String jobInstanceId;
     private int maxBatchSize;
     private int maxBatchTimeoutMs;
@@ -150,8 +150,8 @@ public class ProxySink extends AbstractSink {
     }
 
     @Override
-    public void setSourceFile(String sourceFileName) {
-        this.sourceFile = sourceFileName;
+    public void setSourceName(String sourceFileName) {
+        this.sourceName = sourceFileName;
     }
 
     /**
@@ -169,9 +169,9 @@ public class ProxySink extends AbstractSink {
                         if (result != null) {
                             senderManager.sendBatch(jobInstanceId, inlongGroupId, result.getKey(),
                                     result.getValue(), 0, dataTime);
-                            LOGGER.info("send group id {} with message size {}, the job id is {}, read file is {}"
+                            LOGGER.info("send group id {} with message size {}, the job id is {}, read source is {}"
                                     + "dataTime is {}", inlongGroupId, result.getRight().size(),
-                                jobInstanceId, sourceFile, dataTime);
+                                jobInstanceId, sourceName, dataTime);
                         }
 
                     });
@@ -202,7 +202,7 @@ public class ProxySink extends AbstractSink {
         fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER, DEFAULT_FIELD_SPLITTER).getBytes(
             StandardCharsets.UTF_8);
         executorService.execute(flushCache());
-        senderManager = new SenderManager(jobConf, inlongGroupId, sourceFile);
+        senderManager = new SenderManager(jobConf, inlongGroupId, sourceName);
         try {
             senderManager.addMessageSender();
         } catch (Exception ex) {
@@ -229,7 +229,7 @@ public class ProxySink extends AbstractSink {
 
     @Override
     public void destroy() {
-        LOGGER.info("destroy sink which sink from source file {}", sourceFile);
+        LOGGER.info("destroy sink which sink from source name {}", sourceName);
         while (!sinkFinish()) {
             LOGGER.info("job {} wait until cache all flushed to proxy", jobInstanceId);
             AgentUtils.silenceSleepInMs(batchFlushInterval);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index b89d9b6..560068b 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -120,7 +120,7 @@ public class PulsarSink extends AbstractDaemon implements Sink {
     }
 
     @Override
-    public void setSourceFile(String sourceFileName) {
+    public void setSourceName(String sourceFileName) {
 
     }
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 1ac662c..6d27c90 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -86,10 +86,10 @@ public class SenderManager {
     private final String inlongGroupId;
     private TaskPositionManager taskPositionManager;
     private final int maxSenderPerGroup;
-    private final String sourceFilePath;
+    private final String sourcePath;
     private final PluginMetric metric;
 
-    public SenderManager(JobProfile jobConf, String inlongGroupId, String sourceFilePath) {
+    public SenderManager(JobProfile jobConf, String inlongGroupId, String sourcePath) {
         AgentConfiguration conf = AgentConfiguration.getAgentConf();
         managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST);
         managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT);
@@ -116,7 +116,7 @@ public class SenderManager {
             CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
         isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE);
         taskPositionManager = TaskPositionManager.getTaskPositionManager();
-        this.sourceFilePath = sourceFilePath;
+        this.sourcePath = sourcePath;
         this.inlongGroupId = inlongGroupId;
 
         if (ConfigUtil.isPrometheusEnabled()) {
@@ -205,7 +205,7 @@ public class SenderManager {
                 return;
             }
             metric.incSendSuccessNum(bodyList.size());
-            taskPositionManager.updateFileSinkPosition(jobId, sourceFilePath, bodyList.size());
+            taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size());
         }
 
         @Override
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
index 76cb02d..9dd6efe 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
@@ -34,7 +34,7 @@ import org.apache.inlong.agent.utils.ConfigUtil;
  */
 public class DataBaseSource implements Source {
 
-    private static final String JOB_DATABASE_SQL = "job.database.sql";
+    private static final String JOB_DATABASE_SQL = "job.sql.command";
 
     private static final String DATABASE_SOURCE_TAG_NAME = "AgentDatabaseSourceMetric";
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index 9d4088c..e8647e9 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -34,6 +34,7 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.CharUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
@@ -54,12 +55,12 @@ public class SqlReader implements Reader {
 
     private static final String SQL_READER_TAG_NAME = "AgentSqlMetric";
 
-    private static final String JOB_DATABASE_USER = "job.database.user";
-    private static final String JOB_DATABASE_PASSWORD = "job.database.password";
-    private static final String JOB_DATABASE_HOSTNAME = "job.database.hostname";
-    private static final String JOB_DATABASE_PORT = "job.database.port";
+    private static final String JOB_DATABASE_USER = "job.sql.user";
+    private static final String JOB_DATABASE_PASSWORD = "job.sql.password";
+    private static final String JOB_DATABASE_HOSTNAME = "job.sql.hostname";
+    private static final String JOB_DATABASE_PORT = "job.sql.port";
 
-    private static final String JOB_DATABASE_BATCH_SIZE = "job.database.batchSize";
+    private static final String JOB_DATABASE_BATCH_SIZE = "job.sql.batchSize";
     private static final int DEFAULT_JOB_DATABASE_BATCH_SIZE = 1000;
 
     private static final String JOB_DATABASE_DRIVER_CLASS = "job.database.driverClass";
@@ -72,7 +73,7 @@ public class SqlReader implements Reader {
 
     /* Standard short field separator */
     private static final String STD_FIELD_SEPARATOR_SHORT = "\001";
-    private static final String JOB_DATABASE_SEPARATOR = "job.database.separator";
+    private static final String JOB_DATABASE_SEPARATOR = "job.sql.separator";
     private static final String[] NEW_LINE_CHARS = new String[]{String.valueOf(CharUtils.CR),
             String.valueOf(CharUtils.LF)};
     private static final String[] EMPTY_CHARS = new String[]{StringUtils.EMPTY, StringUtils.EMPTY};
@@ -129,8 +130,7 @@ public class SqlReader implements Reader {
                 }
                 sqlFileMetric.incReadNum();
 
-                // TODO: return message.
-                return null;
+                return generateMessage(lineColumns);
             } else {
                 finished = true;
             }
@@ -142,14 +142,18 @@ public class SqlReader implements Reader {
         return null;
     }
 
+    private Message generateMessage(List<String> lineColumns) {
+        return new DefaultMessage(StringUtils.join(lineColumns, separator).getBytes(StandardCharsets.UTF_8));
+    }
+
     @Override
     public boolean isFinished() {
         return finished;
     }
 
     @Override
-    public String getReadFile() {
-        return null;
+    public String getReadSource() {
+        return sql;
     }
 
     @Override
@@ -182,7 +186,6 @@ public class SqlReader implements Reader {
 
     @Override
     public void init(JobProfile jobConf) {
-        String setSql = jobConf.get(JOB_DATABASE_PRESET);
         int batchSize = jobConf.getInt(JOB_DATABASE_BATCH_SIZE, DEFAULT_JOB_DATABASE_BATCH_SIZE);
         String userName = jobConf.get(JOB_DATABASE_USER);
         String password = jobConf.get(JOB_DATABASE_PASSWORD);
@@ -190,18 +193,17 @@ public class SqlReader implements Reader {
         int port = jobConf.getInt(JOB_DATABASE_PORT);
 
         String driverClass = jobConf.get(JOB_DATABASE_DRIVER_CLASS,
-                DEFAULT_JOB_DATABASE_DRIVER_CLASS);
+            DEFAULT_JOB_DATABASE_DRIVER_CLASS);
         separator = jobConf.get(JOB_DATABASE_SEPARATOR, STD_FIELD_SEPARATOR_SHORT);
         finished = false;
         try {
-            String databaseType = jobConf.get(JOB_DATABASE_TYPE);
+            String databaseType = jobConf.get(JOB_DATABASE_TYPE, MYSQL);
             String url = String.format("jdbc:%s://%s:%d", databaseType, hostName, port);
             conn = AgentDbUtils.getConnectionFailover(
-                    driverClass, url, userName, password);
+                driverClass, url, userName, password);
             if (databaseType.equals(MYSQL)) {
                 statement = conn.createStatement(
-                        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-                statement.executeQuery(setSql);
+                    ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
                 statement.setFetchSize(Integer.MIN_VALUE);
                 resultSet = statement.executeQuery(sql);
             } else {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index 073cb2f..646fe9d 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -127,7 +127,7 @@ public class TextFileReader implements Reader {
     }
 
     @Override
-    public String getReadFile() {
+    public String getReadSource() {
         return file.getAbsolutePath();
     }
 
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestStreamIdFilter.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestStreamIdFilter.java
index cf81adf..51841d6 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestStreamIdFilter.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestStreamIdFilter.java
@@ -78,7 +78,7 @@ public class TestStreamIdFilter {
         }
 
         @Override
-        public void setSourceFile(String sourceFileName) {
+        public void setSourceName(String sourceFileName) {
 
         }
 
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
index 276a0f5..698bd29 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
@@ -61,7 +61,7 @@ public class MockSink implements Sink {
     public void write(Message message) {
         if (message != null) {
             number.incrementAndGet();
-            taskPositionManager.updateFileSinkPosition(jobInstanceId, sourceFileName, 1);
+            taskPositionManager.updateSinkPosition(jobInstanceId, sourceFileName, 1);
             // increment the count of successful sinks
             sinkMetrics.incSinkSuccessCount();
         } else {
@@ -71,7 +71,7 @@ public class MockSink implements Sink {
     }
 
     @Override
-    public void setSourceFile(String sourceFileName) {
+    public void setSourceName(String sourceFileName) {
         this.sourceFileName = sourceFileName;
     }