You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/09/27 17:24:35 UTC

incubator-eagle git commit: [EAGLE-569]: AlertPublishImpl: Concurrency : Inplace change metadata cause concurrent modification issue

Repository: incubator-eagle
Updated Branches:
  refs/heads/master c897f74b5 -> 0fac33799


[EAGLE-569]: AlertPublishImpl: Concurrency : Inplace change metadata cause concurrent modification issue

Author : ralphsu

This closes #457


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0fac3379
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0fac3379
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0fac3379

Branch: refs/heads/master
Commit: 0fac33799e21ae92d9b4a7e98e5090b032145b06
Parents: c897f74
Author: Ralph, Su <su...@gmail.com>
Authored: Tue Sep 27 10:18:40 2016 -0700
Committer: Ralph, Su <su...@gmail.com>
Committed: Tue Sep 27 10:23:53 2016 -0700

----------------------------------------------------------------------
 .../publisher/impl/AlertPublisherImpl.java      | 85 +++++++++++++-------
 1 file changed, 55 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fac3379/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index fe1438e..e97a763 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -17,16 +17,19 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
+import com.typesafe.config.Config;
+import org.apache.commons.collections.ListUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import com.typesafe.config.Config;
-import org.apache.commons.collections.ListUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -67,16 +70,18 @@ public class AlertPublisherImpl implements AlertPublisher {
 
     private void notifyAlert(AlertStreamEvent event) {
         String policyId = event.getPolicyId();
-        if (policyId == null || !policyPublishPluginMapping.containsKey(policyId)) {
-            LOG.warn("Policy {} does NOT subscribe any publishments", policyId);
+        if (StringUtils.isEmpty(policyId)) {
+            LOG.warn("policyId cannot be null for event to be published");
             return;
         }
-        for (String id : policyPublishPluginMapping.get(policyId)) {
-            AlertPublishPlugin plugin = publishPluginMapping.get(id);
+        for (String pubId : policyPublishPluginMapping.get(policyId)) {
+            AlertPublishPlugin plugin = pubId != null ? publishPluginMapping.get(pubId) : null;
+            if (plugin == null) {
+                LOG.warn("Policy {} does *NOT* subscribe any publishment!", policyId);
+                continue;
+            }
             try {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Execute alert publisher " + plugin.getClass().getCanonicalName());
-                }
+                LOG.debug("Execute alert publisher {}", plugin.getClass().getCanonicalName());
                 plugin.onAlert(event);
             } catch (Exception ex) {
                 LOG.error("Fail invoking publisher's onAlert, continue ", ex);
@@ -91,7 +96,7 @@ public class AlertPublisherImpl implements AlertPublisher {
 
     @SuppressWarnings("unchecked")
     @Override
-    public void onPublishChange(List<Publishment> added,
+    public synchronized void onPublishChange(List<Publishment> added,
                                 List<Publishment> removed,
                                 List<Publishment> afterModified,
                                 List<Publishment> beforeModified) {
@@ -113,25 +118,31 @@ public class AlertPublisherImpl implements AlertPublisher {
             return;
         }
 
+        // copy and swap to avoid concurrency issue
+        Map<String, List<String>> newPolicyPublishPluginMapping = new HashMap<>(policyPublishPluginMapping);
+        Map<String, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping);
+
+        // added
         for (Publishment publishment : added) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(publishment.toString());
-            }
+            LOG.debug("OnPublishmentChange : add publishment : {} ", publishment);
 
             AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
             if (plugin != null) {
-                publishPluginMapping.put(publishment.getName(), plugin);
-                onPolicyAdded(publishment.getPolicyIds(), publishment.getName());
+                newPublishMap.put(publishment.getName(), plugin);
+                addPublishmentPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(), publishment.getName());
             } else {
-                LOG.error("Initialized alertPublisher {} failed due to invalid format", publishment);
+                LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment);
             }
         }
+        //removed
+        List<AlertPublishPlugin> toBeClosed = new ArrayList<>();
         for (Publishment publishment : removed) {
             String pubName = publishment.getName();
-            onPolicyDeleted(publishment.getPolicyIds(), pubName);
-            publishPluginMapping.get(pubName).close();
-            publishPluginMapping.remove(publishment.getName());
+            removePublihsPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(), pubName);
+            toBeClosed.add(newPublishMap.get(pubName));
+            newPublishMap.remove(publishment.getName());
         }
+        // updated
         for (int i = 0; i < afterModified.size(); i++) {
             String pubName = afterModified.get(i).getName();
             List<String> newPolicies = afterModified.get(i).getPolicyIds();
@@ -139,36 +150,50 @@ public class AlertPublisherImpl implements AlertPublisher {
 
             if (!newPolicies.equals(oldPolicies)) {
                 List<String> deletedPolicies = ListUtils.subtract(oldPolicies, newPolicies);
-                onPolicyDeleted(deletedPolicies, pubName);
+                removePublihsPolicies(newPolicyPublishPluginMapping, deletedPolicies, pubName);
                 List<String> addedPolicies = ListUtils.subtract(newPolicies, oldPolicies);
-                onPolicyAdded(addedPolicies, pubName);
+                addPublishmentPolicies(newPolicyPublishPluginMapping, addedPolicies, pubName);
             }
             Publishment newPub = afterModified.get(i);
-            publishPluginMapping.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties());
+            newPublishMap.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties());
         }
+
+        // now do the swap
+        publishPluginMapping = newPublishMap;
+        policyPublishPluginMapping = newPolicyPublishPluginMapping;
+
+        // safely close : it depend on plugin to check if want to wait all data to be flushed.
+        closePlugins(toBeClosed);
     }
 
-    private synchronized void onPolicyAdded(List<String> addedPolicyIds, String pubName) {
+    private void closePlugins(List<AlertPublishPlugin> toBeClosed) {
+        for (AlertPublishPlugin p : toBeClosed) {
+            try {
+                p.close();
+            } catch (Exception e) {
+                LOG.error(MessageFormat.format("Error when close publish plugin {}, {}!", p.getClass().getCanonicalName()), e);
+            }
+        }
+    }
+
+    private void addPublishmentPolicies(Map<String, List<String>> newPolicyPublishPluginMapping, List<String> addedPolicyIds, String pubName) {
         if (addedPolicyIds == null || pubName == null) {
             return;
         }
 
         for (String policyId : addedPolicyIds) {
-            if (policyPublishPluginMapping.get(policyId) == null) {
-                policyPublishPluginMapping.put(policyId, new ArrayList<>());
-            }
-            List<String> publishIds = policyPublishPluginMapping.get(policyId);
-            publishIds.add(pubName);
+            newPolicyPublishPluginMapping.putIfAbsent(policyId, new ArrayList<>());
+            newPolicyPublishPluginMapping.get(policyId).add(pubName);
         }
     }
 
-    private synchronized void onPolicyDeleted(List<String> deletedPolicyIds, String pubName) {
+    private synchronized void removePublihsPolicies(Map<String, List<String>> newPolicyPublishPluginMapping, List<String> deletedPolicyIds, String pubName) {
         if (deletedPolicyIds == null || pubName == null) {
             return;
         }
 
         for (String policyId : deletedPolicyIds) {
-            List<String> publishIds = policyPublishPluginMapping.get(policyId);
+            List<String> publishIds = newPolicyPublishPluginMapping.get(policyId);
             publishIds.remove(pubName);
         }
     }