You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/03/21 18:39:19 UTC

[camel] branch camel-3.x updated: CAMEL-19174: camel-jira - Fix duplicate messages created by Jira issues consumer (#9589) (#9595)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new 71be63b471f CAMEL-19174: camel-jira - Fix duplicate messages created by Jira issues consumer (#9589) (#9595)
71be63b471f is described below

commit 71be63b471f698a7d0788f6c487b38da5c082291
Author: Christoph Deppisch <cd...@redhat.com>
AuthorDate: Tue Mar 21 19:39:11 2023 +0100

    CAMEL-19174: camel-jira - Fix duplicate messages created by Jira issues consumer (#9589) (#9595)
    
    - Issues returned are already ordered descendant - do not revert the order of issues a 2nd time
    - Make sure to exit polling loop early when no further issues are returned (total # of issues returned is lower than query page size)
    - Avoid duplicates in returned issue list
    - Adjust query page size according to given max results limitation
    - Fix ordering of mocked issue return values in unit tests (mocks should return issues in descendant order)
    - Add more unit tests on new issues consumer (testing filter offset, pagination, duplicates)
---
 .../jira/consumer/AbstractJiraConsumer.java        |  22 +++--
 .../jira/consumer/NewCommentsConsumer.java         |   2 +-
 .../component/jira/consumer/NewIssuesConsumer.java |  14 +--
 .../jira/consumer/WatchUpdatesConsumer.java        |   2 +-
 .../jira/consumer/NewCommentsConsumerTest.java     |  14 +--
 .../jira/consumer/NewIssuesConsumerTest.java       | 108 ++++++++++++++++++---
 .../jira/consumer/WatchUpdatesConsumerTest.java    |  36 +++----
 7 files changed, 147 insertions(+), 51 deletions(-)

diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java
index d19fbd9817f..29b74c9d72b 100644
--- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java
+++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java
@@ -17,7 +17,9 @@
 package org.apache.camel.component.jira.consumer;
 
 import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Set;
 
 import com.atlassian.jira.rest.client.api.JiraRestClient;
 import com.atlassian.jira.rest.client.api.RestClientException;
@@ -33,7 +35,7 @@ import org.slf4j.LoggerFactory;
 
 public abstract class AbstractJiraConsumer extends ScheduledPollConsumer {
 
-    private static final transient Logger LOG = LoggerFactory.getLogger(AbstractJiraConsumer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractJiraConsumer.class);
 
     private final JiraEndpoint endpoint;
 
@@ -79,25 +81,33 @@ public abstract class AbstractJiraConsumer extends ScheduledPollConsumer {
     protected List<Issue> getIssues(String jql, int start, int maxPerQuery, int maxResults) {
         LOG.debug("Start indexing current JIRA issues...");
 
-        List<Issue> issues = new ArrayList<>();
+        if (maxResults < maxPerQuery) {
+            maxPerQuery = maxResults;
+        }
+
+        // Avoid duplicates
+        Set<Issue> issues = new LinkedHashSet<>();
         while (true) {
             SearchRestClient searchRestClient = endpoint.getClient().getSearchClient();
-            SearchResult searchResult = searchRestClient.searchJql(jql, maxResults, start, null).claim();
+            SearchResult searchResult = searchRestClient.searchJql(jql, maxPerQuery, start, null).claim();
 
             for (Issue issue : searchResult.getIssues()) {
                 issues.add(issue);
             }
 
             // Note: #getTotal == the total # the query would return *without* pagination, effectively telling us
-            // we've reached the end. Also exit early if we're limiting the # of results.
-            if (start >= searchResult.getTotal() || maxResults > 0 && issues.size() >= maxResults) {
+            // we've reached the end. Also exit early if we're limiting the # of results or
+            // if total # of returned issues is lower than the actual page size.
+            if (maxPerQuery >= searchResult.getTotal() ||
+                    start >= searchResult.getTotal() ||
+                    maxResults > 0 && issues.size() >= maxResults) {
                 break;
             }
 
             start += maxPerQuery;
         }
         LOG.debug("End indexing current JIRA issues. {} issues indexed.", issues.size());
-        return issues;
+        return new ArrayList<>(issues);
     }
 
     protected JiraRestClient client() {
diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
index a25121902ae..78dd7c865dc 100644
--- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
+++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
  */
 public class NewCommentsConsumer extends AbstractJiraConsumer {
 
-    private static final transient Logger LOG = LoggerFactory.getLogger(NewCommentsConsumer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(NewCommentsConsumer.class);
 
     private Long lastCommentId = -1L;
 
diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
index ec76103d688..4347da19342 100644
--- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
+++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
  */
 public class NewIssuesConsumer extends AbstractJiraConsumer {
 
-    private static final transient Logger LOG = LoggerFactory.getLogger(NewIssuesConsumer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(NewIssuesConsumer.class);
 
     private final String jql;
     private long latestIssueId = -1;
@@ -58,13 +58,14 @@ public class NewIssuesConsumer extends AbstractJiraConsumer {
         // grab only the top
         try {
             List<Issue> issues = getIssues(jql, 0, 1, 1);
-            // in case there aren't any issues...
             if (!issues.isEmpty()) {
+                // Issues returned are ordered descendant so this is the newest issue
                 return issues.get(0).getId();
             }
         } catch (Exception e) {
             // ignore
         }
+        // in case there aren't any issues...
         return -1;
     }
 
@@ -72,9 +73,8 @@ public class NewIssuesConsumer extends AbstractJiraConsumer {
         // it may happen the poll() is called while the route is doing the initial load,
         // this way we need to wait for the latestIssueId being associated to the last indexed issue id
         List<Issue> newIssues = getNewIssues();
-        // In the end, we want only *new* issues oldest to newest.
-        for (int i = newIssues.size() - 1; i > -1; i--) {
-            Issue newIssue = newIssues.get(i);
+        // In the end, we want only *new* issues oldest to newest. New issues returned are ordered descendant already.
+        for (Issue newIssue : newIssues) {
             Exchange e = createExchange(true);
             e.getIn().setBody(newIssue);
             getProcessor().process(e);
@@ -113,8 +113,8 @@ public class NewIssuesConsumer extends AbstractJiraConsumer {
 
         if (!issues.isEmpty()) {
             // remember last id we have processed
-            int last = issues.size() - 1;
-            latestIssueId = issues.get(last).getId();
+            // issues are ordered descendant so save the first issue in the list as the newest
+            latestIssueId = issues.get(0).getId();
         }
         return issues;
     }
diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
index f91dc8a2aac..774eea83cf3 100644
--- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
+++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 
 public class WatchUpdatesConsumer extends AbstractJiraConsumer {
 
-    private static final transient Logger LOG = LoggerFactory.getLogger(WatchUpdatesConsumer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(WatchUpdatesConsumer.class);
     HashMap<Long, Issue> watchedIssues;
     List<String> watchedFieldsList;
     String watchedIssuesKeys;
diff --git a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java
index c0accd0f9cf..485c46a2a0b 100644
--- a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java
+++ b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java
@@ -57,7 +57,7 @@ import static org.mockito.Mockito.when;
 @ExtendWith(MockitoExtension.class)
 public class NewCommentsConsumerTest extends CamelTestSupport {
 
-    private static List<Issue> issues = new ArrayList<>();
+    private static final List<Issue> ISSUES = new ArrayList<>();
 
     @Mock
     private JiraRestClient jiraClient;
@@ -81,13 +81,13 @@ public class NewCommentsConsumerTest extends CamelTestSupport {
 
     @BeforeAll
     public static void beforeAll() {
-        issues.add(createIssueWithComments(1L, 1));
-        issues.add(createIssueWithComments(2L, 1));
-        issues.add(createIssueWithComments(3L, 1));
+        ISSUES.add(createIssueWithComments(3L, 1));
+        ISSUES.add(createIssueWithComments(2L, 1));
+        ISSUES.add(createIssueWithComments(1L, 1));
     }
 
     public void setMocks() {
-        SearchResult result = new SearchResult(0, 50, 100, issues);
+        SearchResult result = new SearchResult(0, 50, 100, ISSUES);
         Promise<SearchResult> promiseSearchResult = Promises.promise(result);
         Issue issue = createIssueWithComments(4L, 1);
         Promise<Issue> promiseIssue = Promises.promise(issue);
@@ -159,9 +159,9 @@ public class NewCommentsConsumerTest extends CamelTestSupport {
         Issue issue2 = createIssueWithComments(21L, 3000);
         Issue issue3 = createIssueWithComments(22L, 1000);
         List<Issue> newIssues = new ArrayList<>();
-        newIssues.add(issue1);
-        newIssues.add(issue2);
         newIssues.add(issue3);
+        newIssues.add(issue2);
+        newIssues.add(issue1);
         Issue issueWithNoComments = createIssue(31L);
 
         reset(searchRestClient);
diff --git a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java
index 76a4cfcb105..c3931b8003a 100644
--- a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java
+++ b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jira.consumer;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -34,6 +35,7 @@ import org.apache.camel.component.jira.JiraComponent;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -52,7 +54,7 @@ import static org.mockito.Mockito.when;
 @ExtendWith(MockitoExtension.class)
 public class NewIssuesConsumerTest extends CamelTestSupport {
 
-    private static List<Issue> issues = new ArrayList<>();
+    private static final List<Issue> ISSUES = new ArrayList<>();
 
     @Mock
     private JiraRestClient jiraClient;
@@ -73,13 +75,13 @@ public class NewIssuesConsumerTest extends CamelTestSupport {
 
     @BeforeAll
     public static void beforeAll() {
-        issues.add(createIssue(1L));
-        issues.add(createIssue(2L));
-        issues.add(createIssue(3L));
+        ISSUES.add(createIssue(3L));
+        ISSUES.add(createIssue(2L));
+        ISSUES.add(createIssue(1L));
     }
 
     public void setMocks() {
-        SearchResult result = new SearchResult(0, 50, 100, issues);
+        SearchResult result = new SearchResult(0, 50, 3, ISSUES);
         Promise<SearchResult> promiseSearchResult = Promises.promise(result);
 
         when(jiraClient.getSearchClient()).thenReturn(searchRestClient);
@@ -102,7 +104,7 @@ public class NewIssuesConsumerTest extends CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() {
-                from("jira://newIssues?jiraUrl=" + JIRA_CREDENTIALS + "&jql=project=" + PROJECT + "&delay=5000")
+                from("jira://newIssues?jiraUrl=" + JIRA_CREDENTIALS + "&jql=project=" + PROJECT + "&delay=1000")
                         .to(mockResult);
             }
         };
@@ -121,12 +123,12 @@ public class NewIssuesConsumerTest extends CamelTestSupport {
         reset(searchRestClient);
         AtomicBoolean searched = new AtomicBoolean();
         when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
-            List<Issue> newIissues = new ArrayList<>();
+            List<Issue> newIssues = new ArrayList<>();
             if (!searched.get()) {
-                newIissues.add(issue);
+                newIssues.add(issue);
                 searched.set(true);
             }
-            SearchResult result = new SearchResult(0, 50, 100, newIissues);
+            SearchResult result = new SearchResult(0, 50, 100, newIssues);
             return Promises.promise(result);
         });
         mockResult.expectedBodiesReceived(issue);
@@ -144,9 +146,9 @@ public class NewIssuesConsumerTest extends CamelTestSupport {
         when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
             List<Issue> newIssues = new ArrayList<>();
             if (!searched.get()) {
-                newIssues.add(issue1);
-                newIssues.add(issue2);
                 newIssues.add(issue3);
+                newIssues.add(issue2);
+                newIssues.add(issue1);
                 searched.set(true);
             }
             SearchResult result = new SearchResult(0, 50, 3, newIssues);
@@ -157,4 +159,88 @@ public class NewIssuesConsumerTest extends CamelTestSupport {
         mockResult.assertIsSatisfied();
     }
 
+    @Test
+    public void multipleIssuesPaginationTest() throws Exception {
+        Issue issue1 = createIssue(31);
+        Issue issue2 = createIssue(32);
+        Issue issue3 = createIssue(33);
+        Issue issue4 = createIssue(34);
+        Issue issue5 = createIssue(35);
+
+        reset(searchRestClient);
+        when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
+            int startAt = invocation.getArgument(2);
+            Assertions.assertEquals(0, startAt);
+
+            // return getTotal=100 to force next page query
+            SearchResult result = new SearchResult(0, 50, 100, List.of(issue5, issue4, issue3));
+            return Promises.promise(result);
+        }).then(invocation -> {
+            int startAt = invocation.getArgument(2);
+            Assertions.assertEquals(50, startAt);
+            SearchResult result = new SearchResult(0, 50, 100, List.of(issue2, issue1));
+            return Promises.promise(result);
+        }).then(invocation -> {
+            int startAt = invocation.getArgument(2);
+            Assertions.assertEquals(100, startAt);
+            SearchResult result = new SearchResult(0, 50, 0, Collections.emptyList());
+            return Promises.promise(result);
+        });
+
+        mockResult.expectedBodiesReceived(issue5, issue4, issue3, issue2, issue1);
+        mockResult.assertIsSatisfied();
+    }
+
+    @Test
+    public void multipleIssuesAvoidDuplicatesTest() throws Exception {
+        Issue issue1 = createIssue(41);
+        Issue issue2 = createIssue(42);
+        Issue issue3 = createIssue(43);
+
+        reset(searchRestClient);
+        when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
+            // return getTotal=100 to force next page query
+            SearchResult result = new SearchResult(0, 50, 100, List.of(issue3, issue2));
+            return Promises.promise(result);
+        }).then(invocation -> {
+            SearchResult result = new SearchResult(0, 50, 100, List.of(issue3, issue2, issue1));
+            return Promises.promise(result);
+        }).then(invocation -> {
+            SearchResult result = new SearchResult(0, 50, 0, Collections.emptyList());
+            return Promises.promise(result);
+        });
+
+        mockResult.expectedBodiesReceived(issue3, issue2, issue1);
+        mockResult.assertIsSatisfied();
+    }
+
+    @Test
+    public void multipleQueriesOffsetFilterTest() throws Exception {
+        Issue issue1 = createIssue(51);
+        Issue issue2 = createIssue(52);
+        Issue issue3 = createIssue(53);
+        Issue issue4 = createIssue(54);
+
+        reset(searchRestClient);
+        when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
+            SearchResult result = new SearchResult(0, 50, 3, List.of(issue3, issue2, issue1));
+            return Promises.promise(result);
+        }).then(invocation -> {
+            int startAt = invocation.getArgument(2);
+            Assertions.assertEquals(0, startAt);
+
+            String jqlFilter = invocation.getArgument(0);
+            Assertions.assertTrue(jqlFilter.startsWith("id > 53"));
+            SearchResult result = new SearchResult(0, 50, 1, Collections.singletonList(issue4));
+            return Promises.promise(result);
+        });
+
+        mockResult.expectedBodiesReceived(issue3, issue2, issue1);
+        mockResult.assertIsSatisfied();
+
+        mockResult.reset();
+
+        mockResult.expectedBodiesReceived(issue4);
+        mockResult.assertIsSatisfied();
+    }
 }
diff --git a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java
index 369393fae28..b74cb5d3bae 100644
--- a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java
+++ b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java
@@ -52,7 +52,7 @@ import static org.mockito.Mockito.*;
 
 @ExtendWith(MockitoExtension.class)
 public class WatchUpdatesConsumerTest extends CamelTestSupport {
-    private static List<Issue> issues = new ArrayList<>();
+    private static final List<Issue> ISSUES = new ArrayList<>();
 
     @Mock
     private JiraRestClient jiraClient;
@@ -73,14 +73,14 @@ public class WatchUpdatesConsumerTest extends CamelTestSupport {
 
     @BeforeAll
     public static void beforeAll() {
-        issues.clear();
-        issues.add(createIssue(1L));
-        issues.add(createIssue(2L));
-        issues.add(createIssue(3L));
+        ISSUES.clear();
+        ISSUES.add(createIssue(1L));
+        ISSUES.add(createIssue(2L));
+        ISSUES.add(createIssue(3L));
     }
 
     public void setMocks() {
-        SearchResult result = new SearchResult(0, 50, 100, issues);
+        SearchResult result = new SearchResult(0, 50, 100, ISSUES);
         Promise<SearchResult> promiseSearchResult = Promises.promise(result);
 
         when(jiraClient.getSearchClient()).thenReturn(searchRestClient);
@@ -118,17 +118,17 @@ public class WatchUpdatesConsumerTest extends CamelTestSupport {
 
     @Test
     public void singleChangeTest() throws Exception {
-        Issue issue = setPriority(issues.get(0), new Priority(
+        Issue issue = setPriority(ISSUES.get(0), new Priority(
                 null, 4L, "High", null, null, null));
         reset(searchRestClient);
         AtomicBoolean searched = new AtomicBoolean();
         when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
 
             if (!searched.get()) {
-                issues.remove(0);
-                issues.add(0, issue);
+                ISSUES.remove(0);
+                ISSUES.add(0, issue);
             }
-            SearchResult result = new SearchResult(0, 50, 100, issues);
+            SearchResult result = new SearchResult(0, 50, 100, ISSUES);
             return Promises.promise(result);
         });
 
@@ -141,23 +141,23 @@ public class WatchUpdatesConsumerTest extends CamelTestSupport {
 
     @Test
     public void multipleChangesWithAddedNewIssueTest() throws Exception {
-        final Issue issue = transitionIssueDone(issues.get(1));
-        final Issue issue2 = setPriority(issues.get(2), new Priority(
+        final Issue issue = transitionIssueDone(ISSUES.get(1));
+        final Issue issue2 = setPriority(ISSUES.get(2), new Priority(
                 null, 4L, "High", null, null, null));
 
         reset(searchRestClient);
         AtomicBoolean searched = new AtomicBoolean();
         when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
             if (!searched.get()) {
-                issues.add(createIssue(4L));
-                issues.remove(1);
-                issues.add(1, issue);
-                issues.remove(2);
-                issues.add(2, issue2);
+                ISSUES.add(createIssue(4L));
+                ISSUES.remove(1);
+                ISSUES.add(1, issue);
+                ISSUES.remove(2);
+                ISSUES.add(2, issue2);
                 searched.set(true);
             }
 
-            SearchResult result = new SearchResult(0, 50, 3, issues);
+            SearchResult result = new SearchResult(0, 50, 3, ISSUES);
             return Promises.promise(result);
         });