You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/16 01:31:46 UTC

[GitHub] [inlong] GanfengTan opened a new pull request, #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

GanfengTan opened a new pull request, #5910:
URL: https://github.com/apache/inlong/pull/5910

    Fix WatchService is no effect in the k8s.
   
   - Fixes #5909 
   
   ### Motivation
   
   Trigger a new task when Data changes, for example: add data.
   
   ### Modifications
   
   Validate the job for same GroupId and StreamId.
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r974857126


##########
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java:
##########
@@ -169,4 +170,11 @@ public void addCallbacks() {
     public List<Task> getAllTasks() {
         return allTasks;
     }
+

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
dockerzhang merged PR #5910:
URL: https://github.com/apache/inlong/pull/5910


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r974855940


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java:
##########
@@ -300,6 +307,24 @@ private void dealWithFetchResult(TaskResult taskResult) {
         }
     }
 
+    private boolean triggerIsRunning(TriggerProfile newProfile) {
+        int type = ManagerOpEnum.getOpType(newProfile.getInt(JOB_OP)).getType();
+        if (ManagerOpEnum.ACTIVE.getType() != type || ManagerOpEnum.ADD.getType() != type) {
+            return false;
+        }
+        JobProfileDb jobProfileDb = agentManager.getJobProfileDb();
+        List<JobProfile> jobsByState = jobProfileDb.getJobsByState(StateSearchKey.ACCEPTED);
+        jobsByState.addAll(jobProfileDb.getJobsByState(StateSearchKey.RUNNING));
+        AtomicBoolean jobIsRunning = new AtomicBoolean(false);
+        jobsByState.forEach(jobProfile -> {
+            if (jobProfile.get(JOB_ID).equals(newProfile.get(JOB_ID))) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r974847679


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java:
##########
@@ -300,6 +307,24 @@ private void dealWithFetchResult(TaskResult taskResult) {
         }
     }
 
+    private boolean triggerIsRunning(TriggerProfile newProfile) {
+        int type = ManagerOpEnum.getOpType(newProfile.getInt(JOB_OP)).getType();
+        if (ManagerOpEnum.ACTIVE.getType() != type || ManagerOpEnum.ADD.getType() != type) {
+            return false;
+        }
+        JobProfileDb jobProfileDb = agentManager.getJobProfileDb();
+        List<JobProfile> jobsByState = jobProfileDb.getJobsByState(StateSearchKey.ACCEPTED);
+        jobsByState.addAll(jobProfileDb.getJobsByState(StateSearchKey.RUNNING));
+        AtomicBoolean jobIsRunning = new AtomicBoolean(false);
+        jobsByState.forEach(jobProfile -> {
+            if (jobProfile.get(JOB_ID).equals(newProfile.get(JOB_ID))) {

Review Comment:
   Will `jobProfile.get(JOB_ID)` be null?
   Wouldn't it be better to use `Objects.equals()` here?



##########
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java:
##########
@@ -83,8 +88,21 @@ public void setJobInstanceId(String jobInstanceId) {
      * @return taskList
      */
     public List<Task> createTasks() {
+        return getTasks(this.jobConf);
+    }
+
+    /**
+     * build task from job config
+     *
+     * @param jobConf

Review Comment:
   Please add a description for the param and return.



##########
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java:
##########
@@ -156,6 +169,35 @@ private Runnable jobFetchThread() {
         };
     }
 
+    private boolean isRunningJob(JobProfile profile, Map<String, JobWrapper> jobWrapperMap) {
+        JobWrapper jobWrapper;
+        try {
+            jobWrapper = jobWrapperMap.get(profile.getInstanceId());
+        } catch (Exception exception) {
+            LOGGER.warn("jobWrapper is null");
+            return false;
+        }
+        List<Task> tasks = jobWrapper.getAllTasks();
+        for (Task task : tasks) {
+            JobProfile runJobProfile = task.getJobConf();
+            if (runJobProfile.get(JOB_DIR_FILTER_PATTERN).equals(profile.get(JOB_DIR_FILTER_PATTERN))) {

Review Comment:
   Will `jobProfile.get(JOB_DIR_FILTER_PATTERN)` be null?
   Wouldn't it be better to use `Objects.equals()` here?



##########
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java:
##########
@@ -169,4 +170,11 @@ public void addCallbacks() {
     public List<Task> getAllTasks() {
         return allTasks;
     }
+

Review Comment:
   Suggest adding Java doc for public methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r973571347


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java:
##########
@@ -128,13 +134,28 @@ private void registerAllSubDir(PathPattern entity,
             } else {
                 JobProfile copiedJobProfile = PluginUtils.copyJobProfile(profile,
                         entity.getSuitTime(), path.toFile());
+                if (!validateJobProfile(copiedJobProfile, path)) {
+                    return;
+                }
                 LOGGER.info("trigger {} generate job profile to read file {}",
                         getTriggerProfile().getTriggerId(), path.toString());
                 queue.offer(copiedJobProfile);
             }
         }
     }
 
+    private boolean validateJobProfile(JobProfile jobProfile, Path path) {
+        String groupId = jobProfile.get(JOB_GROUP_ID);
+        String streamId = jobProfile.get(JOB_STREAM_ID);
+        String pathUri = path.toUri().getPath();
+        String key = groupId.concat(DELIMITER_HYPHEN).concat(streamId).concat(DELIMITER_HYPHEN).concat(pathUri);

Review Comment:
   Adding more comments, please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r974833238


##########
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java:
##########
@@ -192,4 +197,20 @@ public List<JobProfile> getJobsByState(StateSearchKey stateSearchKey) {
         }
         return profileList;
     }
+
+    /**
+     * check local job state.
+     *
+     * @return list of job profile.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r973571069


##########
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java:
##########
@@ -241,6 +241,8 @@ public Runnable dbStorageCheckThread() {
             while (isRunnable()) {
                 try {
                     jobProfileDb.removeExpireJobs(jobDbCacheTime);
+                    Map<String, String> jobStateMap = jobProfileDb.getJobsState();

Review Comment:
   Excuse me, why just print the state into the log, it seems not useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r974834153


##########
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java:
##########
@@ -241,6 +241,8 @@ public Runnable dbStorageCheckThread() {
             while (isRunnable()) {
                 try {
                     jobProfileDb.removeExpireJobs(jobDbCacheTime);
+                    Map<String, String> jobStateMap = jobProfileDb.getJobsState();

Review Comment:
   check local job, submit state to manager in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan closed pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
GanfengTan closed pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s
URL: https://github.com/apache/inlong/pull/5910


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r973570887


##########
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java:
##########
@@ -192,4 +197,20 @@ public List<JobProfile> getJobsByState(StateSearchKey stateSearchKey) {
         }
         return profileList;
     }
+
+    /**
+     * check local job state.
+     *
+     * @return list of job profile.

Review Comment:
   The result type is a Map, please change the Java doc, and add the desc for the key-value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r974858242


##########
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java:
##########
@@ -83,8 +88,21 @@ public void setJobInstanceId(String jobInstanceId) {
      * @return taskList
      */
     public List<Task> createTasks() {
+        return getTasks(this.jobConf);
+    }
+
+    /**
+     * build task from job config
+     *
+     * @param jobConf

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r974865689


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java:
##########
@@ -79,6 +81,10 @@ public void getData() throws IOException {
                         }
                         resultLines.add(splitLines[i].trim());
                     }
+                    if (data.startsWith(splitStr, data.length() - splitStr.length() - 1)) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r974847223


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java:
##########
@@ -79,6 +81,10 @@ public void getData() throws IOException {
                         }
                         resultLines.add(splitLines[i].trim());
                     }
+                    if (data.startsWith(splitStr, data.length() - splitStr.length() - 1)) {

Review Comment:
   It's a bit difficult to understand here, can you add some explanations or examples?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5910:
URL: https://github.com/apache/inlong/pull/5910#discussion_r974856370


##########
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java:
##########
@@ -156,6 +169,35 @@ private Runnable jobFetchThread() {
         };
     }
 
+    private boolean isRunningJob(JobProfile profile, Map<String, JobWrapper> jobWrapperMap) {
+        JobWrapper jobWrapper;
+        try {
+            jobWrapper = jobWrapperMap.get(profile.getInstanceId());
+        } catch (Exception exception) {
+            LOGGER.warn("jobWrapper is null");
+            return false;
+        }
+        List<Task> tasks = jobWrapper.getAllTasks();
+        for (Task task : tasks) {
+            JobProfile runJobProfile = task.getJobConf();
+            if (runJobProfile.get(JOB_DIR_FILTER_PATTERN).equals(profile.get(JOB_DIR_FILTER_PATTERN))) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org