You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/03/21 18:39:19 UTC
[camel] branch camel-3.x updated: CAMEL-19174: camel-jira - Fix duplicate messages created by Jira issues consumer (#9589) (#9595)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push:
new 71be63b471f CAMEL-19174: camel-jira - Fix duplicate messages created by Jira issues consumer (#9589) (#9595)
71be63b471f is described below
commit 71be63b471f698a7d0788f6c487b38da5c082291
Author: Christoph Deppisch <cd...@redhat.com>
AuthorDate: Tue Mar 21 19:39:11 2023 +0100
CAMEL-19174: camel-jira - Fix duplicate messages created by Jira issues consumer (#9589) (#9595)
- Issues returned are already ordered descendant - do not revert the order of issues a 2nd time
- Make sure to exit polling loop early when no further issues are returned (total # of issues returned is lower than query page size)
- Avoid duplicates in returned issue list
- Adjust query page size according to given max results limitation
- Fix ordering of mocked issue return values in unit tests (mocks should return issues in descendant order)
- Add more unit tests on new issues consumer (testing filter offset, pagination, duplicates)
---
.../jira/consumer/AbstractJiraConsumer.java | 22 +++--
.../jira/consumer/NewCommentsConsumer.java | 2 +-
.../component/jira/consumer/NewIssuesConsumer.java | 14 +--
.../jira/consumer/WatchUpdatesConsumer.java | 2 +-
.../jira/consumer/NewCommentsConsumerTest.java | 14 +--
.../jira/consumer/NewIssuesConsumerTest.java | 108 ++++++++++++++++++---
.../jira/consumer/WatchUpdatesConsumerTest.java | 36 +++----
7 files changed, 147 insertions(+), 51 deletions(-)
diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java
index d19fbd9817f..29b74c9d72b 100644
--- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java
+++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java
@@ -17,7 +17,9 @@
package org.apache.camel.component.jira.consumer;
import java.util.ArrayList;
+import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Set;
import com.atlassian.jira.rest.client.api.JiraRestClient;
import com.atlassian.jira.rest.client.api.RestClientException;
@@ -33,7 +35,7 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractJiraConsumer extends ScheduledPollConsumer {
- private static final transient Logger LOG = LoggerFactory.getLogger(AbstractJiraConsumer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractJiraConsumer.class);
private final JiraEndpoint endpoint;
@@ -79,25 +81,33 @@ public abstract class AbstractJiraConsumer extends ScheduledPollConsumer {
protected List<Issue> getIssues(String jql, int start, int maxPerQuery, int maxResults) {
LOG.debug("Start indexing current JIRA issues...");
- List<Issue> issues = new ArrayList<>();
+ if (maxResults < maxPerQuery) {
+ maxPerQuery = maxResults;
+ }
+
+ // Avoid duplicates
+ Set<Issue> issues = new LinkedHashSet<>();
while (true) {
SearchRestClient searchRestClient = endpoint.getClient().getSearchClient();
- SearchResult searchResult = searchRestClient.searchJql(jql, maxResults, start, null).claim();
+ SearchResult searchResult = searchRestClient.searchJql(jql, maxPerQuery, start, null).claim();
for (Issue issue : searchResult.getIssues()) {
issues.add(issue);
}
// Note: #getTotal == the total # the query would return *without* pagination, effectively telling us
- // we've reached the end. Also exit early if we're limiting the # of results.
- if (start >= searchResult.getTotal() || maxResults > 0 && issues.size() >= maxResults) {
+ // we've reached the end. Also exit early if we're limiting the # of results or
+ // if total # of returned issues is lower than the actual page size.
+ if (maxPerQuery >= searchResult.getTotal() ||
+ start >= searchResult.getTotal() ||
+ maxResults > 0 && issues.size() >= maxResults) {
break;
}
start += maxPerQuery;
}
LOG.debug("End indexing current JIRA issues. {} issues indexed.", issues.size());
- return issues;
+ return new ArrayList<>(issues);
}
protected JiraRestClient client() {
diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
index a25121902ae..78dd7c865dc 100644
--- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
+++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
*/
public class NewCommentsConsumer extends AbstractJiraConsumer {
- private static final transient Logger LOG = LoggerFactory.getLogger(NewCommentsConsumer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NewCommentsConsumer.class);
private Long lastCommentId = -1L;
diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
index ec76103d688..4347da19342 100644
--- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
+++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
*/
public class NewIssuesConsumer extends AbstractJiraConsumer {
- private static final transient Logger LOG = LoggerFactory.getLogger(NewIssuesConsumer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NewIssuesConsumer.class);
private final String jql;
private long latestIssueId = -1;
@@ -58,13 +58,14 @@ public class NewIssuesConsumer extends AbstractJiraConsumer {
// grab only the top
try {
List<Issue> issues = getIssues(jql, 0, 1, 1);
- // in case there aren't any issues...
if (!issues.isEmpty()) {
+ // Issues returned are ordered descendant so this is the newest issue
return issues.get(0).getId();
}
} catch (Exception e) {
// ignore
}
+ // in case there aren't any issues...
return -1;
}
@@ -72,9 +73,8 @@ public class NewIssuesConsumer extends AbstractJiraConsumer {
// it may happen the poll() is called while the route is doing the initial load,
// this way we need to wait for the latestIssueId being associated to the last indexed issue id
List<Issue> newIssues = getNewIssues();
- // In the end, we want only *new* issues oldest to newest.
- for (int i = newIssues.size() - 1; i > -1; i--) {
- Issue newIssue = newIssues.get(i);
+ // In the end, we want only *new* issues oldest to newest. New issues returned are ordered descendant already.
+ for (Issue newIssue : newIssues) {
Exchange e = createExchange(true);
e.getIn().setBody(newIssue);
getProcessor().process(e);
@@ -113,8 +113,8 @@ public class NewIssuesConsumer extends AbstractJiraConsumer {
if (!issues.isEmpty()) {
// remember last id we have processed
- int last = issues.size() - 1;
- latestIssueId = issues.get(last).getId();
+ // issues are ordered descendant so save the first issue in the list as the newest
+ latestIssueId = issues.get(0).getId();
}
return issues;
}
diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
index f91dc8a2aac..774eea83cf3 100644
--- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
+++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
public class WatchUpdatesConsumer extends AbstractJiraConsumer {
- private static final transient Logger LOG = LoggerFactory.getLogger(WatchUpdatesConsumer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(WatchUpdatesConsumer.class);
HashMap<Long, Issue> watchedIssues;
List<String> watchedFieldsList;
String watchedIssuesKeys;
diff --git a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java
index c0accd0f9cf..485c46a2a0b 100644
--- a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java
+++ b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java
@@ -57,7 +57,7 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class NewCommentsConsumerTest extends CamelTestSupport {
- private static List<Issue> issues = new ArrayList<>();
+ private static final List<Issue> ISSUES = new ArrayList<>();
@Mock
private JiraRestClient jiraClient;
@@ -81,13 +81,13 @@ public class NewCommentsConsumerTest extends CamelTestSupport {
@BeforeAll
public static void beforeAll() {
- issues.add(createIssueWithComments(1L, 1));
- issues.add(createIssueWithComments(2L, 1));
- issues.add(createIssueWithComments(3L, 1));
+ ISSUES.add(createIssueWithComments(3L, 1));
+ ISSUES.add(createIssueWithComments(2L, 1));
+ ISSUES.add(createIssueWithComments(1L, 1));
}
public void setMocks() {
- SearchResult result = new SearchResult(0, 50, 100, issues);
+ SearchResult result = new SearchResult(0, 50, 100, ISSUES);
Promise<SearchResult> promiseSearchResult = Promises.promise(result);
Issue issue = createIssueWithComments(4L, 1);
Promise<Issue> promiseIssue = Promises.promise(issue);
@@ -159,9 +159,9 @@ public class NewCommentsConsumerTest extends CamelTestSupport {
Issue issue2 = createIssueWithComments(21L, 3000);
Issue issue3 = createIssueWithComments(22L, 1000);
List<Issue> newIssues = new ArrayList<>();
- newIssues.add(issue1);
- newIssues.add(issue2);
newIssues.add(issue3);
+ newIssues.add(issue2);
+ newIssues.add(issue1);
Issue issueWithNoComments = createIssue(31L);
reset(searchRestClient);
diff --git a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java
index 76a4cfcb105..c3931b8003a 100644
--- a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java
+++ b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.jira.consumer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,6 +35,7 @@ import org.apache.camel.component.jira.JiraComponent;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spi.Registry;
import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -52,7 +54,7 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class NewIssuesConsumerTest extends CamelTestSupport {
- private static List<Issue> issues = new ArrayList<>();
+ private static final List<Issue> ISSUES = new ArrayList<>();
@Mock
private JiraRestClient jiraClient;
@@ -73,13 +75,13 @@ public class NewIssuesConsumerTest extends CamelTestSupport {
@BeforeAll
public static void beforeAll() {
- issues.add(createIssue(1L));
- issues.add(createIssue(2L));
- issues.add(createIssue(3L));
+ ISSUES.add(createIssue(3L));
+ ISSUES.add(createIssue(2L));
+ ISSUES.add(createIssue(1L));
}
public void setMocks() {
- SearchResult result = new SearchResult(0, 50, 100, issues);
+ SearchResult result = new SearchResult(0, 50, 3, ISSUES);
Promise<SearchResult> promiseSearchResult = Promises.promise(result);
when(jiraClient.getSearchClient()).thenReturn(searchRestClient);
@@ -102,7 +104,7 @@ public class NewIssuesConsumerTest extends CamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() {
- from("jira://newIssues?jiraUrl=" + JIRA_CREDENTIALS + "&jql=project=" + PROJECT + "&delay=5000")
+ from("jira://newIssues?jiraUrl=" + JIRA_CREDENTIALS + "&jql=project=" + PROJECT + "&delay=1000")
.to(mockResult);
}
};
@@ -121,12 +123,12 @@ public class NewIssuesConsumerTest extends CamelTestSupport {
reset(searchRestClient);
AtomicBoolean searched = new AtomicBoolean();
when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
- List<Issue> newIissues = new ArrayList<>();
+ List<Issue> newIssues = new ArrayList<>();
if (!searched.get()) {
- newIissues.add(issue);
+ newIssues.add(issue);
searched.set(true);
}
- SearchResult result = new SearchResult(0, 50, 100, newIissues);
+ SearchResult result = new SearchResult(0, 50, 100, newIssues);
return Promises.promise(result);
});
mockResult.expectedBodiesReceived(issue);
@@ -144,9 +146,9 @@ public class NewIssuesConsumerTest extends CamelTestSupport {
when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
List<Issue> newIssues = new ArrayList<>();
if (!searched.get()) {
- newIssues.add(issue1);
- newIssues.add(issue2);
newIssues.add(issue3);
+ newIssues.add(issue2);
+ newIssues.add(issue1);
searched.set(true);
}
SearchResult result = new SearchResult(0, 50, 3, newIssues);
@@ -157,4 +159,88 @@ public class NewIssuesConsumerTest extends CamelTestSupport {
mockResult.assertIsSatisfied();
}
+ @Test
+ public void multipleIssuesPaginationTest() throws Exception {
+ Issue issue1 = createIssue(31);
+ Issue issue2 = createIssue(32);
+ Issue issue3 = createIssue(33);
+ Issue issue4 = createIssue(34);
+ Issue issue5 = createIssue(35);
+
+ reset(searchRestClient);
+ when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
+ int startAt = invocation.getArgument(2);
+ Assertions.assertEquals(0, startAt);
+
+ // return getTotal=100 to force next page query
+ SearchResult result = new SearchResult(0, 50, 100, List.of(issue5, issue4, issue3));
+ return Promises.promise(result);
+ }).then(invocation -> {
+ int startAt = invocation.getArgument(2);
+ Assertions.assertEquals(50, startAt);
+ SearchResult result = new SearchResult(0, 50, 100, List.of(issue2, issue1));
+ return Promises.promise(result);
+ }).then(invocation -> {
+ int startAt = invocation.getArgument(2);
+ Assertions.assertEquals(100, startAt);
+ SearchResult result = new SearchResult(0, 50, 0, Collections.emptyList());
+ return Promises.promise(result);
+ });
+
+ mockResult.expectedBodiesReceived(issue5, issue4, issue3, issue2, issue1);
+ mockResult.assertIsSatisfied();
+ }
+
+ @Test
+ public void multipleIssuesAvoidDuplicatesTest() throws Exception {
+ Issue issue1 = createIssue(41);
+ Issue issue2 = createIssue(42);
+ Issue issue3 = createIssue(43);
+
+ reset(searchRestClient);
+ when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
+ // return getTotal=100 to force next page query
+ SearchResult result = new SearchResult(0, 50, 100, List.of(issue3, issue2));
+ return Promises.promise(result);
+ }).then(invocation -> {
+ SearchResult result = new SearchResult(0, 50, 100, List.of(issue3, issue2, issue1));
+ return Promises.promise(result);
+ }).then(invocation -> {
+ SearchResult result = new SearchResult(0, 50, 0, Collections.emptyList());
+ return Promises.promise(result);
+ });
+
+ mockResult.expectedBodiesReceived(issue3, issue2, issue1);
+ mockResult.assertIsSatisfied();
+ }
+
+ @Test
+ public void multipleQueriesOffsetFilterTest() throws Exception {
+ Issue issue1 = createIssue(51);
+ Issue issue2 = createIssue(52);
+ Issue issue3 = createIssue(53);
+ Issue issue4 = createIssue(54);
+
+ reset(searchRestClient);
+ when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
+ SearchResult result = new SearchResult(0, 50, 3, List.of(issue3, issue2, issue1));
+ return Promises.promise(result);
+ }).then(invocation -> {
+ int startAt = invocation.getArgument(2);
+ Assertions.assertEquals(0, startAt);
+
+ String jqlFilter = invocation.getArgument(0);
+ Assertions.assertTrue(jqlFilter.startsWith("id > 53"));
+ SearchResult result = new SearchResult(0, 50, 1, Collections.singletonList(issue4));
+ return Promises.promise(result);
+ });
+
+ mockResult.expectedBodiesReceived(issue3, issue2, issue1);
+ mockResult.assertIsSatisfied();
+
+ mockResult.reset();
+
+ mockResult.expectedBodiesReceived(issue4);
+ mockResult.assertIsSatisfied();
+ }
}
diff --git a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java
index 369393fae28..b74cb5d3bae 100644
--- a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java
+++ b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java
@@ -52,7 +52,7 @@ import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
public class WatchUpdatesConsumerTest extends CamelTestSupport {
- private static List<Issue> issues = new ArrayList<>();
+ private static final List<Issue> ISSUES = new ArrayList<>();
@Mock
private JiraRestClient jiraClient;
@@ -73,14 +73,14 @@ public class WatchUpdatesConsumerTest extends CamelTestSupport {
@BeforeAll
public static void beforeAll() {
- issues.clear();
- issues.add(createIssue(1L));
- issues.add(createIssue(2L));
- issues.add(createIssue(3L));
+ ISSUES.clear();
+ ISSUES.add(createIssue(1L));
+ ISSUES.add(createIssue(2L));
+ ISSUES.add(createIssue(3L));
}
public void setMocks() {
- SearchResult result = new SearchResult(0, 50, 100, issues);
+ SearchResult result = new SearchResult(0, 50, 100, ISSUES);
Promise<SearchResult> promiseSearchResult = Promises.promise(result);
when(jiraClient.getSearchClient()).thenReturn(searchRestClient);
@@ -118,17 +118,17 @@ public class WatchUpdatesConsumerTest extends CamelTestSupport {
@Test
public void singleChangeTest() throws Exception {
- Issue issue = setPriority(issues.get(0), new Priority(
+ Issue issue = setPriority(ISSUES.get(0), new Priority(
null, 4L, "High", null, null, null));
reset(searchRestClient);
AtomicBoolean searched = new AtomicBoolean();
when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
if (!searched.get()) {
- issues.remove(0);
- issues.add(0, issue);
+ ISSUES.remove(0);
+ ISSUES.add(0, issue);
}
- SearchResult result = new SearchResult(0, 50, 100, issues);
+ SearchResult result = new SearchResult(0, 50, 100, ISSUES);
return Promises.promise(result);
});
@@ -141,23 +141,23 @@ public class WatchUpdatesConsumerTest extends CamelTestSupport {
@Test
public void multipleChangesWithAddedNewIssueTest() throws Exception {
- final Issue issue = transitionIssueDone(issues.get(1));
- final Issue issue2 = setPriority(issues.get(2), new Priority(
+ final Issue issue = transitionIssueDone(ISSUES.get(1));
+ final Issue issue2 = setPriority(ISSUES.get(2), new Priority(
null, 4L, "High", null, null, null));
reset(searchRestClient);
AtomicBoolean searched = new AtomicBoolean();
when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> {
if (!searched.get()) {
- issues.add(createIssue(4L));
- issues.remove(1);
- issues.add(1, issue);
- issues.remove(2);
- issues.add(2, issue2);
+ ISSUES.add(createIssue(4L));
+ ISSUES.remove(1);
+ ISSUES.add(1, issue);
+ ISSUES.remove(2);
+ ISSUES.add(2, issue2);
searched.set(true);
}
- SearchResult result = new SearchResult(0, 50, 3, issues);
+ SearchResult result = new SearchResult(0, 50, 3, ISSUES);
return Promises.promise(result);
});