You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/09/27 17:24:35 UTC
incubator-eagle git commit: [EAGLE-569]: AlertPublishImpl:
Concurrency : Inplace change metadata cause concurrent modification issue
Repository: incubator-eagle
Updated Branches:
refs/heads/master c897f74b5 -> 0fac33799
[EAGLE-569]: AlertPublishImpl: Concurrency : Inplace change metadata cause concurrent modification issue
Author : ralphsu
This closes #457
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0fac3379
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0fac3379
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0fac3379
Branch: refs/heads/master
Commit: 0fac33799e21ae92d9b4a7e98e5090b032145b06
Parents: c897f74
Author: Ralph, Su <su...@gmail.com>
Authored: Tue Sep 27 10:18:40 2016 -0700
Committer: Ralph, Su <su...@gmail.com>
Committed: Tue Sep 27 10:23:53 2016 -0700
----------------------------------------------------------------------
.../publisher/impl/AlertPublisherImpl.java | 85 +++++++++++++-------
1 file changed, 55 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fac3379/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index fe1438e..e97a763 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -17,16 +17,19 @@
package org.apache.eagle.alert.engine.publisher.impl;
+import com.typesafe.config.Config;
+import org.apache.commons.collections.ListUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import com.typesafe.config.Config;
-import org.apache.commons.collections.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -67,16 +70,18 @@ public class AlertPublisherImpl implements AlertPublisher {
private void notifyAlert(AlertStreamEvent event) {
String policyId = event.getPolicyId();
- if (policyId == null || !policyPublishPluginMapping.containsKey(policyId)) {
- LOG.warn("Policy {} does NOT subscribe any publishments", policyId);
+ if (StringUtils.isEmpty(policyId)) {
+ LOG.warn("policyId cannot be null for event to be published");
return;
}
- for (String id : policyPublishPluginMapping.get(policyId)) {
- AlertPublishPlugin plugin = publishPluginMapping.get(id);
+ for (String pubId : policyPublishPluginMapping.get(policyId)) {
+ AlertPublishPlugin plugin = pubId != null ? publishPluginMapping.get(pubId) : null;
+ if (plugin == null) {
+ LOG.warn("Policy {} does *NOT* subscribe any publishment!", policyId);
+ continue;
+ }
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Execute alert publisher " + plugin.getClass().getCanonicalName());
- }
+ LOG.debug("Execute alert publisher {}", plugin.getClass().getCanonicalName());
plugin.onAlert(event);
} catch (Exception ex) {
LOG.error("Fail invoking publisher's onAlert, continue ", ex);
@@ -91,7 +96,7 @@ public class AlertPublisherImpl implements AlertPublisher {
@SuppressWarnings("unchecked")
@Override
- public void onPublishChange(List<Publishment> added,
+ public synchronized void onPublishChange(List<Publishment> added,
List<Publishment> removed,
List<Publishment> afterModified,
List<Publishment> beforeModified) {
@@ -113,25 +118,31 @@ public class AlertPublisherImpl implements AlertPublisher {
return;
}
+ // copy and swap to avoid concurrency issue
+ Map<String, List<String>> newPolicyPublishPluginMapping = new HashMap<>(policyPublishPluginMapping);
+ Map<String, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping);
+
+ // added
for (Publishment publishment : added) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(publishment.toString());
- }
+ LOG.debug("OnPublishmentChange : add publishment : {} ", publishment);
AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
if (plugin != null) {
- publishPluginMapping.put(publishment.getName(), plugin);
- onPolicyAdded(publishment.getPolicyIds(), publishment.getName());
+ newPublishMap.put(publishment.getName(), plugin);
+ addPublishmentPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(), publishment.getName());
} else {
- LOG.error("Initialized alertPublisher {} failed due to invalid format", publishment);
+ LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment);
}
}
+ //removed
+ List<AlertPublishPlugin> toBeClosed = new ArrayList<>();
for (Publishment publishment : removed) {
String pubName = publishment.getName();
- onPolicyDeleted(publishment.getPolicyIds(), pubName);
- publishPluginMapping.get(pubName).close();
- publishPluginMapping.remove(publishment.getName());
+ removePublihsPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(), pubName);
+ toBeClosed.add(newPublishMap.get(pubName));
+ newPublishMap.remove(publishment.getName());
}
+ // updated
for (int i = 0; i < afterModified.size(); i++) {
String pubName = afterModified.get(i).getName();
List<String> newPolicies = afterModified.get(i).getPolicyIds();
@@ -139,36 +150,50 @@ public class AlertPublisherImpl implements AlertPublisher {
if (!newPolicies.equals(oldPolicies)) {
List<String> deletedPolicies = ListUtils.subtract(oldPolicies, newPolicies);
- onPolicyDeleted(deletedPolicies, pubName);
+ removePublihsPolicies(newPolicyPublishPluginMapping, deletedPolicies, pubName);
List<String> addedPolicies = ListUtils.subtract(newPolicies, oldPolicies);
- onPolicyAdded(addedPolicies, pubName);
+ addPublishmentPolicies(newPolicyPublishPluginMapping, addedPolicies, pubName);
}
Publishment newPub = afterModified.get(i);
- publishPluginMapping.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties());
+ newPublishMap.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties());
}
+
+ // now do the swap
+ publishPluginMapping = newPublishMap;
+ policyPublishPluginMapping = newPolicyPublishPluginMapping;
+
+ // safely close : it depend on plugin to check if want to wait all data to be flushed.
+ closePlugins(toBeClosed);
}
- private synchronized void onPolicyAdded(List<String> addedPolicyIds, String pubName) {
+ private void closePlugins(List<AlertPublishPlugin> toBeClosed) {
+ for (AlertPublishPlugin p : toBeClosed) {
+ try {
+ p.close();
+ } catch (Exception e) {
+ LOG.error(MessageFormat.format("Error when close publish plugin {}, {}!", p.getClass().getCanonicalName()), e);
+ }
+ }
+ }
+
+ private void addPublishmentPolicies(Map<String, List<String>> newPolicyPublishPluginMapping, List<String> addedPolicyIds, String pubName) {
if (addedPolicyIds == null || pubName == null) {
return;
}
for (String policyId : addedPolicyIds) {
- if (policyPublishPluginMapping.get(policyId) == null) {
- policyPublishPluginMapping.put(policyId, new ArrayList<>());
- }
- List<String> publishIds = policyPublishPluginMapping.get(policyId);
- publishIds.add(pubName);
+ newPolicyPublishPluginMapping.putIfAbsent(policyId, new ArrayList<>());
+ newPolicyPublishPluginMapping.get(policyId).add(pubName);
}
}
- private synchronized void onPolicyDeleted(List<String> deletedPolicyIds, String pubName) {
+ private synchronized void removePublihsPolicies(Map<String, List<String>> newPolicyPublishPluginMapping, List<String> deletedPolicyIds, String pubName) {
if (deletedPolicyIds == null || pubName == null) {
return;
}
for (String policyId : deletedPolicyIds) {
- List<String> publishIds = policyPublishPluginMapping.get(policyId);
+ List<String> publishIds = newPolicyPublishPluginMapping.get(policyId);
publishIds.remove(pubName);
}
}