You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/21 04:19:38 UTC
[james-project] 01/06: JAMES-3777 Incremental change POJO for JMAP filtering
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 07545e0e201d8da02a799cd3b15fb34e739cecb4
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Apr 18 17:10:25 2023 +0700
JAMES-3777 Incremental change POJO for JMAP filtering
TODO:
- Incremental changes for rule modification
- Plug it in the event source system (aggregate, cassadra
event serialization)
- Conditionally disable incremental change via ENV and
explain that it should be disable during rolling updates
of the upgrade.
- Rebase the subscriber JAMES-3777: subscriber first might
need to load the aggregate again?
---
.../api/filtering/impl/IncrementalRuleChange.java | 205 ++++++++++++++++++++
.../james/jmap/api/filtering/RuleFixture.java | 1 +
.../filtering/impl/IncrementalRuleChangeTest.java | 207 +++++++++++++++++++++
3 files changed, 413 insertions(+)
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/IncrementalRuleChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/IncrementalRuleChange.java
new file mode 100644
index 0000000000..792b0fb8b9
--- /dev/null
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/IncrementalRuleChange.java
@@ -0,0 +1,205 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.api.filtering.impl;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.james.eventsourcing.AggregateId;
+import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventId;
+import org.apache.james.jmap.api.filtering.Rule;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Sets;
+
+public class IncrementalRuleChange implements Event {
+ public static Optional<IncrementalRuleChange> ofDiff(FilteringAggregateId aggregateId, EventId eventId, List<Rule> before, List<Rule> after) {
+ ImmutableSet<Rule.Id> idsBefore = before.stream().map(Rule::getId).collect(ImmutableSet.toImmutableSet());
+ ImmutableSet<Rule.Id> idsAfter = after.stream().map(Rule::getId).collect(ImmutableSet.toImmutableSet());
+
+ ImmutableMap<Rule.Id, Rule> beforeIndexed = before.stream()
+ .collect(ImmutableMap.toImmutableMap(Rule::getId, rule -> rule));
+
+ ImmutableMap<Rule.Id, Rule> afterIndexed = after.stream()
+ .collect(ImmutableMap.toImmutableMap(Rule::getId, rule -> rule));
+
+ // Deleted elements appears in
+ ImmutableSet<Rule.Id> deleted = Sets.difference(idsBefore, idsAfter).immutableCopy();
+
+ List<Rule.Id> commonElements = ImmutableList.copyOf(Sets.intersection(idsBefore, idsAfter).immutableCopy());
+
+ ImmutableList<Rule.Id> idsAfterList = idsAfter.asList();
+ int prependedItems = 0;
+ int postPendedItems = 0;
+ boolean inPrepended = true;
+ boolean inCommonSection = false;
+ boolean inPostpended = false;
+ int position = 0;
+ while (position < idsAfter.size()) {
+ Rule.Id id = idsAfterList.get(position);
+ if (inPrepended) {
+ if (commonElements.contains(id)) {
+ inPrepended = false;
+ inCommonSection = true;
+ continue;
+ } else {
+ prependedItems++;
+ position++;
+ continue;
+ }
+ }
+ if (inPostpended) {
+ if (commonElements.contains(id)) {
+ return Optional.empty();
+ } else {
+ postPendedItems++;
+ position++;
+ continue;
+ }
+ }
+ if (inCommonSection) {
+ if (!commonElements.contains(id)) {
+ inCommonSection = false;
+ inPostpended = true;
+ continue;
+ }
+ int positionInCommonElements = position - prependedItems;
+ if (positionInCommonElements > commonElements.size()) {
+ // Safeguard
+ return Optional.empty();
+ }
+ if (!commonElements.get(positionInCommonElements).equals(id)) {
+ // Order of commons items changed
+ return Optional.empty();
+ }
+ if (!beforeIndexed.get(id).equals(afterIndexed.get(id))) {
+ // Rule content changed
+ return Optional.empty();
+ }
+ // All fine
+ position++;
+ continue;
+ }
+ throw new RuntimeException("Unexpected status");
+ }
+
+ ImmutableList<Rule> preprended = idsAfter.stream()
+ .limit(prependedItems)
+ .map(afterIndexed::get)
+ .collect(ImmutableList.toImmutableList());
+
+ ImmutableList<Rule> postPended = idsAfter.asList()
+ .reverse()
+ .stream()
+ .limit(postPendedItems)
+ .map(afterIndexed::get)
+ .collect(ImmutableList.toImmutableList())
+ .reverse();
+
+ return Optional.of(new IncrementalRuleChange(aggregateId, eventId,
+ preprended, postPended, deleted));
+ }
+
+ private final FilteringAggregateId aggregateId;
+ private final EventId eventId;
+ private final ImmutableList<Rule> rulesPrepended;
+ private final ImmutableList<Rule> rulesPostpended;
+ private final ImmutableSet<Rule.Id> rulesDeleted;
+
+ public IncrementalRuleChange(FilteringAggregateId aggregateId, EventId eventId, ImmutableList<Rule> rulesPrepended, ImmutableList<Rule> rulesPostpended, ImmutableSet<Rule.Id> rulesDeleted) {
+ this.aggregateId = aggregateId;
+ this.eventId = eventId;
+ this.rulesPrepended = rulesPrepended;
+ this.rulesPostpended = rulesPostpended;
+ this.rulesDeleted = rulesDeleted;
+ }
+
+ @Override
+ public EventId eventId() {
+ return eventId;
+ }
+
+ @Override
+ public AggregateId getAggregateId() {
+ return aggregateId;
+ }
+
+ public ImmutableList<Rule> getRulesPrepended() {
+ return rulesPrepended;
+ }
+
+ public ImmutableList<Rule> getRulesPostPended() {
+ return rulesPostpended;
+ }
+
+ public ImmutableSet<Rule.Id> getRulesDeleted() {
+ return rulesDeleted;
+ }
+
+ public ImmutableList<Rule> apply(ImmutableList<Rule> rules) {
+ return ImmutableList.<Rule>builder()
+ .addAll(rulesPrepended)
+ .addAll(rules.stream()
+ .filter(rule -> !rulesDeleted.contains(rule.getId()))
+ .collect(ImmutableList.toImmutableList()))
+ .addAll(rulesPostpended)
+ .build();
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IncrementalRuleChange that = (IncrementalRuleChange) o;
+ return Objects.equals(aggregateId, that.aggregateId) &&
+ Objects.equals(eventId, that.eventId) &&
+ Objects.equals(rulesDeleted, that.rulesDeleted) &&
+ Objects.equals(rulesPrepended, that.rulesPrepended) &&
+ Objects.equals(rulesPostpended, that.rulesPostpended);
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(aggregateId, eventId, rulesPrepended, rulesPostpended, rulesDeleted);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("aggregateId", aggregateId)
+ .add("eventId", eventId)
+ .add("rulesDeleted", rulesDeleted)
+ .add("rulesPrepended", rulesPrepended)
+ .add("rulesPostpended", rulesPostpended)
+ .toString();
+ }
+}
diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/RuleFixture.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/RuleFixture.java
index b318f63f0a..5166a2cefb 100644
--- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/RuleFixture.java
+++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/RuleFixture.java
@@ -25,6 +25,7 @@ public interface RuleFixture {
Rule.Action ACTION = Rule.Action.of(Rule.Action.AppendInMailboxes.withMailboxIds("id-01"));
Rule.Builder RULE_BUILDER = Rule.builder().name(NAME).condition(CONDITION).action(ACTION);
Rule RULE_1 = RULE_BUILDER.id(Rule.Id.of("1")).build();
+ Rule RULE_1_MODIFIED = RULE_BUILDER.id(Rule.Id.of("1")).name("newname").build();
Rule RULE_2 = RULE_BUILDER.id(Rule.Id.of("2")).build();
Rule RULE_3 = RULE_BUILDER.id(Rule.Id.of("3")).build();
diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/impl/IncrementalRuleChangeTest.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/impl/IncrementalRuleChangeTest.java
new file mode 100644
index 0000000000..6fa4c3fd09
--- /dev/null
+++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/impl/IncrementalRuleChangeTest.java
@@ -0,0 +1,207 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.api.filtering.impl;
+
+import static org.apache.james.jmap.api.filtering.RuleFixture.RULE_1;
+import static org.apache.james.jmap.api.filtering.RuleFixture.RULE_1_MODIFIED;
+import static org.apache.james.jmap.api.filtering.RuleFixture.RULE_2;
+import static org.apache.james.jmap.api.filtering.RuleFixture.RULE_3;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.james.core.Username;
+import org.apache.james.eventsourcing.EventId;
+import org.apache.james.jmap.api.filtering.Rule;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+class IncrementalRuleChangeTest {
+ public static final FilteringAggregateId AGGREGATE_ID = new FilteringAggregateId(Username.of("bob"));
+ public static final EventId EVENT_ID = EventId.first();
+
+ @Test
+ void removingOneRule() {
+ assertThat(IncrementalRuleChange.ofDiff(AGGREGATE_ID, EVENT_ID,
+ ImmutableList.of(RULE_1),
+ ImmutableList.of()))
+ .contains(new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(), ImmutableList.of(),
+ ImmutableSet.of(RULE_1.getId())));
+ }
+
+ @Test
+ void removingOneRuleOutOfTwo() {
+ assertThat(IncrementalRuleChange.ofDiff(AGGREGATE_ID, EVENT_ID,
+ ImmutableList.of(RULE_1, RULE_2),
+ ImmutableList.of(RULE_2)))
+ .contains(new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(), ImmutableList.of(),
+ ImmutableSet.of(RULE_1.getId())));
+ }
+
+ @Test
+ void removingMiddleRule() {
+ assertThat(IncrementalRuleChange.ofDiff(AGGREGATE_ID, EVENT_ID,
+ ImmutableList.of(RULE_1, RULE_2, RULE_3),
+ ImmutableList.of(RULE_1, RULE_3)))
+ .contains(new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(), ImmutableList.of(),
+ ImmutableSet.of(RULE_2.getId())));
+ }
+
+ @Test
+ void noop() {
+ assertThat(IncrementalRuleChange.ofDiff(AGGREGATE_ID, EVENT_ID,
+ ImmutableList.of(RULE_1, RULE_2, RULE_3),
+ ImmutableList.of(RULE_1, RULE_2, RULE_3)))
+ .contains(new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(), ImmutableList.of(), ImmutableSet.of()));
+ }
+
+ @Test
+ void reorderingRuleIsNotManagedByIncrements() {
+ assertThat(IncrementalRuleChange.ofDiff(AGGREGATE_ID, EVENT_ID,
+ ImmutableList.of(RULE_1, RULE_2, RULE_3),
+ ImmutableList.of(RULE_1, RULE_3, RULE_2)))
+ .isEmpty();
+ }
+
+ @Test
+ void addingRuleInTheMiddleIsNotManagedByIncrement() {
+ assertThat(IncrementalRuleChange.ofDiff(AGGREGATE_ID, EVENT_ID,
+ ImmutableList.of(RULE_1, RULE_2),
+ ImmutableList.of(RULE_1, RULE_3, RULE_2)))
+ .isEmpty();
+ }
+
+ @Test
+ void postPendingOneRule() {
+ assertThat(IncrementalRuleChange.ofDiff(AGGREGATE_ID, EVENT_ID,
+ ImmutableList.of(RULE_1, RULE_2),
+ ImmutableList.of(RULE_1, RULE_2, RULE_3)))
+ .contains(new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(), ImmutableList.of(RULE_3), ImmutableSet.of()));
+ }
+
+ @Test
+ void prependingOneRule() {
+ assertThat(IncrementalRuleChange.ofDiff(AGGREGATE_ID, EVENT_ID,
+ ImmutableList.of(RULE_1, RULE_2),
+ ImmutableList.of(RULE_3, RULE_1, RULE_2)))
+ .contains(new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(RULE_3), ImmutableList.of(), ImmutableSet.of()));
+ }
+
+ @Test
+ void prependingAndPostpending() {
+ assertThat(IncrementalRuleChange.ofDiff(AGGREGATE_ID, EVENT_ID,
+ ImmutableList.of(RULE_1),
+ ImmutableList.of(RULE_3, RULE_1, RULE_2)))
+ .contains(new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(RULE_3), ImmutableList.of(RULE_2), ImmutableSet.of()));
+ }
+
+ @Test
+ void prependingAndPostpendingAndRemoval() {
+ assertThat(IncrementalRuleChange.ofDiff(AGGREGATE_ID, EVENT_ID,
+ ImmutableList.of(RULE_1),
+ ImmutableList.of(RULE_3, RULE_2)))
+ .contains(new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(RULE_3, RULE_2), ImmutableList.of(), ImmutableSet.of(RULE_1.getId())));
+ }
+
+ @Test
+ void ruleModificationIsNotManagedByIncrement() {
+ assertThat(IncrementalRuleChange.ofDiff(AGGREGATE_ID, EVENT_ID,
+ ImmutableList.of(RULE_1),
+ ImmutableList.of(RULE_1_MODIFIED)))
+ .isEmpty();
+ }
+
+ @Test
+ void removingOneRuleShouldBeWellApplied() {
+ ImmutableList<Rule> origin = ImmutableList.of(RULE_1);
+ IncrementalRuleChange incrementalChange = new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(), ImmutableList.of(),
+ ImmutableSet.of(RULE_1.getId()));
+ assertThat(incrementalChange.apply(origin))
+ .isEqualTo(ImmutableList.of());
+ }
+
+ @Test
+ void removingOneRuleOutOfTwoShouldBeWellApplied() {
+ ImmutableList<Rule> origin = ImmutableList.of(RULE_1, RULE_2);
+ IncrementalRuleChange incrementalChange = new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(), ImmutableList.of(),
+ ImmutableSet.of(RULE_1.getId()));
+ assertThat(incrementalChange.apply(origin))
+ .isEqualTo(ImmutableList.of(RULE_2));
+ }
+
+ @Test
+ void removingMiddleRuleShouldBeWellApplied() {
+ ImmutableList<Rule> origin = ImmutableList.of(RULE_1, RULE_2, RULE_3);
+ IncrementalRuleChange incrementalChange = new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(), ImmutableList.of(),
+ ImmutableSet.of(RULE_2.getId()));
+ assertThat(incrementalChange.apply(origin))
+ .isEqualTo(ImmutableList.of(RULE_1, RULE_3));
+ }
+
+ @Test
+ void noopShouldBeWellApplied() {
+ ImmutableList<Rule> origin = ImmutableList.of(RULE_1, RULE_2, RULE_3);
+
+ IncrementalRuleChange incrementalRuleChange = new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(), ImmutableList.of(), ImmutableSet.of());
+
+ assertThat(incrementalRuleChange.apply(origin))
+ .isEqualTo(origin);
+ }
+
+ @Test
+ void postPendingOneRuleShouldBeWellApplied() {
+ ImmutableList<Rule> origin = ImmutableList.of(RULE_1, RULE_2);
+
+ IncrementalRuleChange incrementalRuleChange = new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(), ImmutableList.of(RULE_3), ImmutableSet.of());
+
+ assertThat(incrementalRuleChange.apply(origin))
+ .isEqualTo(ImmutableList.of(RULE_1, RULE_2, RULE_3));
+ }
+
+ @Test
+ void prependingOneRuleShouldBeWellApplied() {
+ ImmutableList<Rule> origin = ImmutableList.of(RULE_1, RULE_2);
+
+ IncrementalRuleChange incrementalRuleChange = new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(RULE_3), ImmutableList.of(), ImmutableSet.of());
+
+ assertThat(incrementalRuleChange.apply(origin))
+ .isEqualTo(ImmutableList.of(RULE_3, RULE_1, RULE_2));
+ }
+
+ @Test
+ void prependingAndPostpendingShouldBeWellApplied() {
+ ImmutableList<Rule> origin = ImmutableList.of(RULE_1);
+
+ IncrementalRuleChange incrementalRuleChange = new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(RULE_3), ImmutableList.of(RULE_2), ImmutableSet.of());
+
+ assertThat(incrementalRuleChange.apply(origin))
+ .isEqualTo(ImmutableList.of(RULE_3, RULE_1, RULE_2));
+ }
+
+ @Test
+ void prependingAndPostpendingAndRemovalShouldBeWellApplied() {
+ ImmutableList<Rule> origin = ImmutableList.of(RULE_1);
+
+ IncrementalRuleChange incrementalRuleChange = new IncrementalRuleChange(AGGREGATE_ID, EVENT_ID, ImmutableList.of(RULE_3), ImmutableList.of(RULE_2), ImmutableSet.of(RULE_1.getId()));
+
+ assertThat(incrementalRuleChange.apply(origin))
+ .isEqualTo(ImmutableList.of(RULE_3, RULE_2));
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org