You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/26 03:00:19 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5988][Agent] Add the Partition Key settings for file source (#6008)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new c9de90953 [INLONG-5988][Agent] Add the Partition Key settings for file source (#6008)
c9de90953 is described below
commit c9de90953abff9768f84dfaad2ab89cef0b9409b
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Mon Sep 26 10:54:11 2022 +0800
[INLONG-5988][Agent] Add the Partition Key settings for file source (#6008)
Co-authored-by: healchow <he...@gmail.com>
---
.../inlong/agent/constant/CommonConstants.java | 5 +++-
.../apache/inlong/agent/constant/JobConstants.java | 2 +-
.../apache/inlong/agent/pojo/JobProfileDto.java | 4 +++
.../java/org/apache/inlong/agent/core/job/Job.java | 5 +++-
.../agent/core/task/TaskPositionManager.java | 2 +-
.../inlong/agent/core/trigger/TriggerManager.java | 30 +++++++++++++---------
.../apache/inlong/agent/task/TestTaskWrapper.java | 8 +++---
.../sources/reader/file/FileReaderOperator.java | 9 ++++++-
.../inlong/common/pojo/agent/DataConfig.java | 1 +
9 files changed, 45 insertions(+), 21 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 8a8de1780..9ecfd8ca7 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -61,11 +61,14 @@ public class CommonConstants {
// determine if the send method is sync or async
public static final String PROXY_SEND_SYNC = "proxy.sync";
+ // the same task must have the same Partition Key if choose sync
+ public static final String PROXY_SEND_PARTITION_KEY = "proxy.partitionKey";
+
// max size of single batch in bytes, default is 200KB.
public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 200000;
public static final String PROXY_MESSAGE_SEMAPHORE = "proxy.semaphore";
- public static final int DEFAULT_PROXY_MESSAGE_SEMAPHORE = 10000;
+ public static final int DEFAULT_PROXY_MESSAGE_SEMAPHORE = 20000;
public static final String PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = "proxy.group.queue.maxNumber";
public static final int DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = 10000;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 02937c6c3..87b7303ff 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -133,7 +133,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_READ_WAIT_TIMEOUT = "job.file.read.wait";
- public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 10;
+ public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 5;
public static final String JOB_ID_PREFIX = "job_";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 8ed03b800..b0c356732 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -234,6 +234,9 @@ public class JobProfileDto {
if (null != dataConfigs.getSyncSend()) {
proxy.setSync(dataConfigs.getSyncSend() == SYNC_SEND_OPEN);
}
+ if (null != dataConfigs.getSyncPartitionKey()) {
+ proxy.setPartitionKey(dataConfigs.getSyncPartitionKey());
+ }
return proxy;
}
@@ -331,6 +334,7 @@ public class JobProfileDto {
private String inlongStreamId;
private Manager manager;
private Boolean sync;
+ private String partitionKey;
}
}
\ No newline at end of file
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 16e5b8f1f..db99948d8 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.core.job;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.core.task.Task;
@@ -115,7 +116,9 @@ public class Job {
Channel channel = (Channel) Class.forName(jobConf.get(JobConstants.JOB_CHANNEL)).newInstance();
String taskId = String.format("%s_%d", jobInstanceId, threadNum.get());
threadNum.set(threadNum.get() + COUNTER);
- taskList.add(new Task(taskId, reader, writer, channel, getJobConf()));
+ JobProfile jobProfile = getJobConf();
+ jobProfile.set(reader.getReadSource(), DigestUtils.md5Hex(reader.getReadSource()));
+ taskList.add(new Task(taskId, reader, writer, channel, jobProfile));
}
} catch (Throwable ex) {
LOGGER.error("create task failed", ex);
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index b0094abf8..c3bfa0abe 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -141,7 +141,7 @@ public class TaskPositionManager extends AbstractDaemon {
if (position == null) {
position = positionTemp;
}
- Long beforePosition = position.getOrDefault(sourcePath, 1L);
+ Long beforePosition = position.getOrDefault(sourcePath, 0L);
position.put(sourcePath, beforePosition + size);
}
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
index fa2308139..38df48aef 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
@@ -161,7 +161,7 @@ public class TriggerManager extends AbstractDaemon {
});
TimeUnit.SECONDS.sleep(triggerFetchInterval);
} catch (Throwable e) {
- LOGGER.info("ignored Exception ", e);
+ LOGGER.info("ignored exception: ", e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
}
}
@@ -169,21 +169,27 @@ public class TriggerManager extends AbstractDaemon {
}
private boolean isRunningJob(JobProfile profile, Map<String, JobWrapper> jobWrapperMap) {
- JobWrapper jobWrapper;
try {
- jobWrapper = jobWrapperMap.get(profile.getInstanceId());
- } catch (Exception e) {
- LOGGER.warn("get jobWrapper error: ", e);
- return false;
- }
- List<Task> tasks = jobWrapper.getAllTasks();
- for (Task task : tasks) {
- JobProfile runJobProfile = task.getJobConf();
- if (Objects.equals(runJobProfile.get(JOB_DIR_FILTER_PATTERN), profile.get(JOB_DIR_FILTER_PATTERN))) {
+ if (jobWrapperMap == null || jobWrapperMap.get(profile.getInstanceId()) == null) {
return false;
}
+
+ JobWrapper jobWrapper = jobWrapperMap.get(profile.getInstanceId());
+ List<Task> tasks = jobWrapper.getAllTasks();
+ if (tasks == null) {
+ return true;
+ }
+ for (Task task : tasks) {
+ if (task.getJobConf().hasKey(profile.get(JOB_DIR_FILTER_PATTERN))) {
+ return false;
+ }
+ }
+
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn("not found job {} in the jobs, error: ", profile.toJsonStr(), e);
+ return false;
}
- return true;
}
private boolean isExistJob(JobProfile profile) {
diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
index 438db7fc3..84846e68e 100755
--- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
+++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
@@ -77,9 +77,9 @@ public class TestTaskWrapper {
TimeUnit.MILLISECONDS.sleep(100);
}
await().atMost(80, TimeUnit.SECONDS).until(()
- -> reader.getCount() == writer.getWriterCount() + 1);
- Assert.assertEquals("reader is not equals to writer",
- reader.getCount(), writer.getWriterCount() + 1);
+ -> writer.getWriterCount() > 0);
+ Assert.assertEquals("reader and writer are running",
+ reader.getCount() > 0, writer.getWriterCount() > 0);
}
public static class MockChannel implements Channel {
@@ -119,7 +119,7 @@ public class TestTaskWrapper {
@Override
public Message read() {
count += 1;
- return new DefaultMessage("".getBytes(StandardCharsets.UTF_8));
+ return new DefaultMessage("test".getBytes(StandardCharsets.UTF_8));
}
@Override
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index d9a9f5451..3419ee160 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin.sources.reader.file;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.message.DefaultMessage;
@@ -34,6 +35,7 @@ import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -42,6 +44,8 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
import static org.apache.inlong.agent.constant.MetadataConstants.KUBERNETES;
@@ -90,7 +94,10 @@ public class FileReaderOperator extends AbstractReader {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId, System.currentTimeMillis());
readerMetric.pluginReadCount.incrementAndGet();
- return new DefaultMessage(message.getBytes(StandardCharsets.UTF_8));
+ String proxyPartitionKey = jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
+ Map<String, String> header = new HashMap<>();
+ header.put(PROXY_KEY_DATA, proxyPartitionKey);
+ return new DefaultMessage(message.getBytes(StandardCharsets.UTF_8), header);
}
}
AgentUtils.silenceSleepInMs(waitTimeout);
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index 854727e34..dae48131d 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -35,6 +35,7 @@ public class DataConfig {
private String taskName;
private String snapshot;
private Integer syncSend;
+ private String syncPartitionKey;
private String extParams;
/**
* The task version.