You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/26 02:54:15 UTC

[inlong] branch master updated: [INLONG-5988][Agent] Add the Partition Key settings for file source (#6008)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new affd000ae [INLONG-5988][Agent] Add the Partition Key settings for file source (#6008)
affd000ae is described below

commit affd000ae773d2b109785e1347c96ddef5cf5fa6
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Mon Sep 26 10:54:11 2022 +0800

    [INLONG-5988][Agent] Add the Partition Key settings for file source (#6008)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../inlong/agent/constant/CommonConstants.java     |  5 +++-
 .../apache/inlong/agent/constant/JobConstants.java |  2 +-
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  4 +++
 .../java/org/apache/inlong/agent/core/job/Job.java |  5 +++-
 .../agent/core/task/TaskPositionManager.java       |  2 +-
 .../inlong/agent/core/trigger/TriggerManager.java  | 30 +++++++++++++---------
 .../apache/inlong/agent/task/TestTaskWrapper.java  |  8 +++---
 .../sources/reader/file/FileReaderOperator.java    |  9 ++++++-
 .../inlong/common/pojo/agent/DataConfig.java       |  1 +
 9 files changed, 45 insertions(+), 21 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 8a8de1780..9ecfd8ca7 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -61,11 +61,14 @@ public class CommonConstants {
     // determine if the send method is sync or async
     public static final String PROXY_SEND_SYNC = "proxy.sync";
 
+    // the same task must have the same Partition Key if choose sync 
+    public static final String PROXY_SEND_PARTITION_KEY = "proxy.partitionKey";
+
     // max size of single batch in bytes, default is 200KB.
     public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 200000;
 
     public static final String PROXY_MESSAGE_SEMAPHORE = "proxy.semaphore";
-    public static final int DEFAULT_PROXY_MESSAGE_SEMAPHORE = 10000;
+    public static final int DEFAULT_PROXY_MESSAGE_SEMAPHORE = 20000;
 
     public static final String PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = "proxy.group.queue.maxNumber";
     public static final int DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = 10000;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 02937c6c3..87b7303ff 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -133,7 +133,7 @@ public class JobConstants extends CommonConstants {
 
     public static final String JOB_READ_WAIT_TIMEOUT = "job.file.read.wait";
 
-    public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 10;
+    public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 5;
 
     public static final String JOB_ID_PREFIX = "job_";
 
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 8ed03b800..b0c356732 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -234,6 +234,9 @@ public class JobProfileDto {
         if (null != dataConfigs.getSyncSend()) {
             proxy.setSync(dataConfigs.getSyncSend() == SYNC_SEND_OPEN);
         }
+        if (null != dataConfigs.getSyncPartitionKey()) {
+            proxy.setPartitionKey(dataConfigs.getSyncPartitionKey());
+        }
         return proxy;
     }
 
@@ -331,6 +334,7 @@ public class JobProfileDto {
         private String inlongStreamId;
         private Manager manager;
         private Boolean sync;
+        private String partitionKey;
     }
 
 }
\ No newline at end of file
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 16e5b8f1f..db99948d8 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.core.job;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.core.task.Task;
@@ -115,7 +116,9 @@ public class Job {
                 Channel channel = (Channel) Class.forName(jobConf.get(JobConstants.JOB_CHANNEL)).newInstance();
                 String taskId = String.format("%s_%d", jobInstanceId, threadNum.get());
                 threadNum.set(threadNum.get() + COUNTER);
-                taskList.add(new Task(taskId, reader, writer, channel, getJobConf()));
+                JobProfile jobProfile = getJobConf();
+                jobProfile.set(reader.getReadSource(), DigestUtils.md5Hex(reader.getReadSource()));
+                taskList.add(new Task(taskId, reader, writer, channel, jobProfile));
             }
         } catch (Throwable ex) {
             LOGGER.error("create task failed", ex);
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index b0094abf8..c3bfa0abe 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -141,7 +141,7 @@ public class TaskPositionManager extends AbstractDaemon {
         if (position == null) {
             position = positionTemp;
         }
-        Long beforePosition = position.getOrDefault(sourcePath, 1L);
+        Long beforePosition = position.getOrDefault(sourcePath, 0L);
         position.put(sourcePath, beforePosition + size);
     }
 
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
index fa2308139..38df48aef 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
@@ -161,7 +161,7 @@ public class TriggerManager extends AbstractDaemon {
                     });
                     TimeUnit.SECONDS.sleep(triggerFetchInterval);
                 } catch (Throwable e) {
-                    LOGGER.info("ignored Exception ", e);
+                    LOGGER.info("ignored exception: ", e);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
                 }
             }
@@ -169,21 +169,27 @@ public class TriggerManager extends AbstractDaemon {
     }
 
     private boolean isRunningJob(JobProfile profile, Map<String, JobWrapper> jobWrapperMap) {
-        JobWrapper jobWrapper;
         try {
-            jobWrapper = jobWrapperMap.get(profile.getInstanceId());
-        } catch (Exception e) {
-            LOGGER.warn("get jobWrapper error: ", e);
-            return false;
-        }
-        List<Task> tasks = jobWrapper.getAllTasks();
-        for (Task task : tasks) {
-            JobProfile runJobProfile = task.getJobConf();
-            if (Objects.equals(runJobProfile.get(JOB_DIR_FILTER_PATTERN), profile.get(JOB_DIR_FILTER_PATTERN))) {
+            if (jobWrapperMap == null || jobWrapperMap.get(profile.getInstanceId()) == null) {
                 return false;
             }
+
+            JobWrapper jobWrapper = jobWrapperMap.get(profile.getInstanceId());
+            List<Task> tasks = jobWrapper.getAllTasks();
+            if (tasks == null) {
+                return true;
+            }
+            for (Task task : tasks) {
+                if (task.getJobConf().hasKey(profile.get(JOB_DIR_FILTER_PATTERN))) {
+                    return false;
+                }
+            }
+
+            return true;
+        } catch (Exception e) {
+            LOGGER.warn("not found job {} in the jobs, error: ", profile.toJsonStr(), e);
+            return false;
         }
-        return true;
     }
 
     private boolean isExistJob(JobProfile profile) {
diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
index 438db7fc3..84846e68e 100755
--- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
+++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
@@ -77,9 +77,9 @@ public class TestTaskWrapper {
             TimeUnit.MILLISECONDS.sleep(100);
         }
         await().atMost(80, TimeUnit.SECONDS).until(()
-                -> reader.getCount() == writer.getWriterCount() + 1);
-        Assert.assertEquals("reader is not equals to writer",
-                reader.getCount(), writer.getWriterCount() + 1);
+                -> writer.getWriterCount() > 0);
+        Assert.assertEquals("reader and writer are running",
+                reader.getCount() > 0, writer.getWriterCount() > 0);
     }
 
     public static class MockChannel implements Channel {
@@ -119,7 +119,7 @@ public class TestTaskWrapper {
         @Override
         public Message read() {
             count += 1;
-            return new DefaultMessage("".getBytes(StandardCharsets.UTF_8));
+            return new DefaultMessage("test".getBytes(StandardCharsets.UTF_8));
         }
 
         @Override
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index d9a9f5451..3419ee160 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.plugin.sources.reader.file;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.message.DefaultMessage;
@@ -34,6 +35,7 @@ import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +44,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
 import static org.apache.inlong.agent.constant.MetadataConstants.KUBERNETES;
@@ -90,7 +94,10 @@ public class FileReaderOperator extends AbstractReader {
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
                         inlongGroupId, inlongStreamId, System.currentTimeMillis());
                 readerMetric.pluginReadCount.incrementAndGet();
-                return new DefaultMessage(message.getBytes(StandardCharsets.UTF_8));
+                String proxyPartitionKey = jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
+                Map<String, String> header = new HashMap<>();
+                header.put(PROXY_KEY_DATA, proxyPartitionKey);
+                return new DefaultMessage(message.getBytes(StandardCharsets.UTF_8), header);
             }
         }
         AgentUtils.silenceSleepInMs(waitTimeout);
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index 854727e34..dae48131d 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -35,6 +35,7 @@ public class DataConfig {
     private String taskName;
     private String snapshot;
     private Integer syncSend;
+    private String syncPartitionKey;
     private String extParams;
     /**
      * The task version.