You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/01/06 08:09:55 UTC
[unomi] branch unomi-1.x updated: UNOMI-723: Backport UNOMI-597 on 1-x to avoid persisting unwanted events into ES (#564)
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch unomi-1.x
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/unomi-1.x by this push:
new fd774006e UNOMI-723: Backport UNOMI-597 on 1-x to avoid persisting unwanted events into ES (#564)
fd774006e is described below
commit fd774006e85a6ddcb5c391a272422371d6dfe243
Author: kevan Jahanshahi <jk...@apache.org>
AuthorDate: Fri Jan 6 09:09:50 2023 +0100
UNOMI-723: Backport UNOMI-597 on 1-x to avoid persisting unwanted events into ES (#564)
* UNOMI-597: do not persist internal events (#441)
* UNOMI-597: do not persist internal events
* feedback
* UNOMI-597: rollback missing function to make the iTests working on 1.x
* UNOMI-597: fix backport by backporting profile deserialization
Co-authored-by: jsinovassin <58...@users.noreply.github.com>
---
.../unomi/lists/actions/AddToListsAction.java | 4 +-
.../unomi/privacy/internal/PrivacyServiceImpl.java | 12 +-
.../test/java/org/apache/unomi/itests/AllITs.java | 3 +-
.../test/java/org/apache/unomi/itests/BaseIT.java | 27 +++++
.../org/apache/unomi/itests/SendEventActionIT.java | 124 +++++++++++++++++++++
.../unomi/persistence/spi/CustomObjectMapper.java | 1 +
.../actions/MergeProfilesOnPropertyAction.java | 3 +-
.../baseplugin/actions/SendEventAction.java | 10 +-
.../unomi/rest/endpoints/ContextJsonEndpoint.java | 2 +-
.../services/impl/goals/GoalsServiceImpl.java | 1 +
10 files changed, 168 insertions(+), 19 deletions(-)
diff --git a/extensions/lists-extension/actions/src/main/java/org/apache/unomi/lists/actions/AddToListsAction.java b/extensions/lists-extension/actions/src/main/java/org/apache/unomi/lists/actions/AddToListsAction.java
index f120310dd..7f60f0881 100644
--- a/extensions/lists-extension/actions/src/main/java/org/apache/unomi/lists/actions/AddToListsAction.java
+++ b/extensions/lists-extension/actions/src/main/java/org/apache/unomi/lists/actions/AddToListsAction.java
@@ -65,9 +65,7 @@ public class AddToListsAction implements ActionExecutor {
if (listsChanged) {
profile.getSystemProperties().put("lists", existingListIdentifiers);
- Event profileUpdated = new Event("profileUpdated", null, profile, event.getScope(), null, profile, new Date());
- profileUpdated.setPersistent(false);
- eventService.send(profileUpdated);
+ eventService.send(new Event("profileUpdated", null, profile, event.getScope(), null, profile, null,new Date(), false));
return EventService.PROFILE_UPDATED;
} else {
return EventService.NO_CHANGE;
diff --git a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
index 3d3d68d7f..db57eb20b 100644
--- a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
+++ b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
@@ -88,9 +88,7 @@ public class PrivacyServiceImpl implements PrivacyService {
if (profile == null) {
return false;
}
- Event profileDeletedEvent = new Event("profileDeleted", null, profile, null, null, profile, new Date());
- profileDeletedEvent.setPersistent(true);
- eventService.send(profileDeletedEvent);
+ eventService.send(new Event("profileDeleted", null, profile, null, null, profile,null, new Date(), false));
// we simply overwrite the existing profile with an empty one.
Profile emptyProfile = new Profile(profileId);
profileService.save(emptyProfile);
@@ -105,15 +103,11 @@ public class PrivacyServiceImpl implements PrivacyService {
}
// first we send out the anonymize profile event to make sure other systems can still use external identifiers to lookup the profile and anonymize it.
- Event anonymizeProfileEvent = new Event("anonymizeProfile", null, profile, scope, null, profile, new Date());
- anonymizeProfileEvent.setPersistent(true);
- eventService.send(anonymizeProfileEvent);
+ eventService.send(new Event("anonymizeProfile", null, profile, scope, null, profile, null, new Date(), false));
boolean res = profile.getProperties().keySet().removeAll(getDeniedProperties(profile.getItemId()));
- Event profileUpdatedEvent = new Event("profileUpdated", null, profile, scope, null, profile, new Date());
- profileUpdatedEvent.setPersistent(false);
- eventService.send(profileUpdatedEvent);
+ eventService.send(new Event("profileUpdated", null, profile, scope, null, profile, null, new Date(), false));
profileService.save(profile);
diff --git a/itests/src/test/java/org/apache/unomi/itests/AllITs.java b/itests/src/test/java/org/apache/unomi/itests/AllITs.java
index e6db9969c..19c4744fb 100644
--- a/itests/src/test/java/org/apache/unomi/itests/AllITs.java
+++ b/itests/src/test/java/org/apache/unomi/itests/AllITs.java
@@ -49,7 +49,8 @@ import org.junit.runners.Suite.SuiteClasses;
ContextServletIT.class,
SecurityIT.class,
RuleServiceIT.class,
- GroovyActionsServiceIT.class
+ GroovyActionsServiceIT.class,
+ SendEventActionIT.class
})
public class AllITs {
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index 584d7e2fb..27588c61a 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -76,6 +76,8 @@ public abstract class BaseIT {
protected static final String URL = "http://localhost:" + HTTP_PORT;
protected static final String KARAF_DIR = "target/exam";
protected static final String UNOMI_KEY = "670c26d1cc413346c3b2fd9ce65dab41";
+ protected static final int DEFAULT_TRYING_TIMEOUT = 2000;
+ protected static final int DEFAULT_TRYING_TRIES = 30;
@Inject
@Filter(timeout = 600000)
@@ -257,6 +259,31 @@ public abstract class BaseIT {
return value;
}
+ protected <T> void waitForNullValue(String failMessage, Supplier<T> call, int timeout, int retries) throws InterruptedException {
+ int count = 0;
+ while (call.get() != null) {
+ if (count++ > retries) {
+ Assert.fail(failMessage);
+ }
+ Thread.sleep(timeout);
+ }
+ }
+
+ protected <T> T shouldBeTrueUntilEnd(String failMessage, Supplier<T> call, Predicate<T> predicate, int timeout, int retries)
+ throws InterruptedException {
+ int count = 0;
+ T value = null;
+ while (count <= retries) {
+ count++;
+ value = call.get();
+ if (!predicate.test(value)) {
+ Assert.fail(failMessage);
+ }
+ Thread.sleep(timeout);
+ }
+ return value;
+ }
+
protected String bundleResourceAsString(final String resourcePath) throws IOException {
final java.net.URL url = bundleContext.getBundle().getResource(resourcePath);
if (url != null) {
diff --git a/itests/src/test/java/org/apache/unomi/itests/SendEventActionIT.java b/itests/src/test/java/org/apache/unomi/itests/SendEventActionIT.java
new file mode 100644
index 000000000..bae27b344
--- /dev/null
+++ b/itests/src/test/java/org/apache/unomi/itests/SendEventActionIT.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.unomi.itests;
+
+import org.apache.unomi.api.Event;
+import org.apache.unomi.api.Metadata;
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.api.actions.Action;
+import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.rules.Rule;
+import org.apache.unomi.api.services.EventService;
+import org.apache.unomi.api.services.ProfileService;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerSuite;
+import org.ops4j.pax.exam.util.Filter;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerSuite.class)
+public class SendEventActionIT extends BaseIT {
+
+ private final static String TEST_RULE_ID = "sendEventTest";
+ private final static String EVENT_ID = "sendEventTestId";
+ private final static String TEST_EVENT_TYPE = "sendEventTestEventType";
+ private final static String TEST_PROFILE_ID = "sendEventTestProfileId";
+
+ @Inject
+ @Filter(timeout = 600000)
+ protected ProfileService profileService;
+ @Inject
+ @Filter(timeout = 600000)
+ protected EventService eventService;
+
+ @After
+ public void tearDown() throws InterruptedException {
+ eventService.removeProfileEvents(TEST_PROFILE_ID);
+ rulesService.removeRule(TEST_RULE_ID);
+ waitForNullValue("Event has not been deleted", () -> eventService.getEvent(EVENT_ID), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ waitForNullValue("Rule " + TEST_RULE_ID + "has not been deleted", () -> rulesService.getRule(TEST_RULE_ID), DEFAULT_TRYING_TIMEOUT,
+ DEFAULT_TRYING_TRIES);
+ }
+
+ @Test
+ public void testSendEventNotPersisted() throws InterruptedException {
+ createAndWaitForRule(createSendEventRule(false));
+
+ Assert.assertEquals(TEST_PROFILE_ID, sendEvent().getProfile().getItemId());
+
+ shouldBeTrueUntilEnd("Event should not have been persisted", () -> eventService.searchEvents(getSearchCondition(), 0, 1),
+ (eventPartialList -> eventPartialList.size() == 0), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+
+ @Test
+ public void testSendEventPersisted() throws InterruptedException {
+ createAndWaitForRule(createSendEventRule(true));
+
+ Assert.assertEquals(TEST_PROFILE_ID, sendEvent().getProfile().getItemId());
+
+ keepTrying("Event should have been persisted", () -> eventService.searchEvents(getSearchCondition(), 0, 1),
+ (eventPartialList -> eventPartialList.size() == 1), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ }
+
+ private Event sendEvent() {
+ Profile profile = new Profile();
+ profile.setProperties(new HashMap<>());
+ profile.setItemId(TEST_PROFILE_ID);
+ profile.setProperty("j:nodename", "michel");
+ Event testEvent = new Event(TEST_EVENT_TYPE, null, profile, null, null, profile, new Date());
+ testEvent.setItemId(EVENT_ID);
+ eventService.send(testEvent);
+ return testEvent;
+ }
+
+ private Condition getSearchCondition() {
+ Condition condition = new Condition(definitionsService.getConditionType("eventPropertyCondition"));
+ condition.setParameter("propertyName", "eventType");
+ condition.setParameter("propertyValue", "sentFromAction");
+ condition.setParameter("comparisonOperator", "equals");
+
+ return condition;
+ }
+
+ private Rule createSendEventRule(boolean toBePersisted) {
+ Rule sendEventRule = new Rule();
+ sendEventRule.setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID, "Test rule for testing SendEventAction"));
+
+ Condition condition = new Condition(definitionsService.getConditionType("eventTypeCondition"));
+ condition.setParameter("eventTypeId", TEST_EVENT_TYPE);
+ sendEventRule.setCondition(condition);
+
+ final Action action = new Action(definitionsService.getActionType("sendEventAction"));
+ action.setParameter("eventType", "sentFromAction");
+ action.setParameter("eventTarget", profileService.load(TEST_PROFILE_ID));
+ action.setParameter("eventProperties", new HashMap<String, Object>());
+ action.setParameter("toBePersisted", toBePersisted);
+ sendEventRule.setActions(Collections.singletonList(action));
+
+ return sendEventRule;
+ }
+}
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/CustomObjectMapper.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/CustomObjectMapper.java
index 3e33f13e6..c84da9d70 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/CustomObjectMapper.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/CustomObjectMapper.java
@@ -70,6 +70,7 @@ public class CustomObjectMapper extends ObjectMapper {
classes.put(Event.ITEM_TYPE, Event.class);
classes.put(Goal.ITEM_TYPE, Goal.class);
classes.put(Persona.ITEM_TYPE, Persona.class);
+ classes.put(Profile.ITEM_TYPE, Profile.class);
classes.put(Rule.ITEM_TYPE, Rule.class);
classes.put(Scoring.ITEM_TYPE, Scoring.class);
classes.put(Segment.ITEM_TYPE, Segment.class);
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 92d7c6983..7a3782079 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -124,7 +124,8 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor {
if (currentSession != null) {
currentSession.setProfile(profile);
- eventService.send(new Event("sessionReassigned", currentSession, profile, event.getScope(), event, currentSession, event.getTimeStamp()));
+ eventService.send(new Event("sessionReassigned", currentSession, profile, event.getScope(), event, currentSession,
+ null, event.getTimeStamp(), false));
}
return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED;
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SendEventAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SendEventAction.java
index 9f522464f..909fb59a5 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SendEventAction.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SendEventAction.java
@@ -17,6 +17,7 @@
package org.apache.unomi.plugins.baseplugin.actions;
+import org.apache.commons.lang3.StringUtils;
import org.apache.unomi.api.Event;
import org.apache.unomi.api.Item;
import org.apache.unomi.api.actions.Action;
@@ -36,18 +37,19 @@ public class SendEventAction implements ActionExecutor {
@Override
public int execute(Action action, Event event) {
String eventType = (String) action.getParameterValues().get("eventType");
+ Boolean toBePersisted = (Boolean) action.getParameterValues().get("toBePersisted");
+
@SuppressWarnings("unchecked")
Map<String, Object> eventProperties = (Map<String, Object>) action.getParameterValues().get("eventProperties");
Item target = (Item) action.getParameterValues().get("eventTarget");
-// String type = (String) target.get("type");
-
-// Item targetItem = new CustomItem();
-// BeanUtils.populate(targetItem, target);
Event subEvent = new Event(eventType, event.getSession(), event.getProfile(), event.getScope(), event, target, event.getTimeStamp());
subEvent.setProfileId(event.getProfileId());
subEvent.getAttributes().putAll(event.getAttributes());
subEvent.getProperties().putAll(eventProperties);
+ if (toBePersisted != null && !toBePersisted) {
+ subEvent.setPersistent(false);
+ }
return eventService.send(subEvent);
}
diff --git a/rest/src/main/java/org/apache/unomi/rest/endpoints/ContextJsonEndpoint.java b/rest/src/main/java/org/apache/unomi/rest/endpoints/ContextJsonEndpoint.java
index bc87d0f3f..a7ebea0da 100644
--- a/rest/src/main/java/org/apache/unomi/rest/endpoints/ContextJsonEndpoint.java
+++ b/rest/src/main/java/org/apache/unomi/rest/endpoints/ContextJsonEndpoint.java
@@ -267,7 +267,7 @@ public class ContextJsonEndpoint {
// Only save session and send event if a session id was provided, otherwise keep transient session
session = new Session(sessionId, sessionProfile, timestamp, scope);
changes |= EventService.SESSION_UPDATED;
- Event event = new Event("sessionCreated", session, profile, scope, null, session, timestamp);
+ Event event = new Event("sessionCreated", session, profile, scope, null, session, null, timestamp, false);
if (sessionProfile.isAnonymousProfile()) {
// Do not keep track of profile in event
event.setProfileId(null);
diff --git a/services/src/main/java/org/apache/unomi/services/impl/goals/GoalsServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/goals/GoalsServiceImpl.java
index 538fe5d1c..37da7a314 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/goals/GoalsServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/goals/GoalsServiceImpl.java
@@ -189,6 +189,7 @@ public class GoalsServiceImpl implements GoalsService, SynchronousBundleListener
action3.setParameter("eventType", "goal");
action3.setParameter("eventTarget", goal);
action3.setParameter("eventProperties", new HashMap<String, Object>());
+ action3.setParameter("toBePersisted", false);
rule.setActions(Arrays.asList(action1,action2,action3));
}