You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/12/14 03:38:33 UTC

incubator-eagle git commit: [EAGLE-837] fix duplicated policy update and add tests

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 84ceeb150 -> d6987af2b


[EAGLE-837] fix duplicated policy update and add tests

fix duplicated policy populated during alertBolt reload
refine alert bolt reload test

Author: Li, Garrett
Reviewer: ralphsu

This closes #738


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/d6987af2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/d6987af2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/d6987af2

Branch: refs/heads/master
Commit: d6987af2bbd04e1fe44263720d0b8eb41905bd02
Parents: 84ceeb1
Author: Xiancheng Li <xi...@ebay.com>
Authored: Wed Dec 14 11:01:41 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Wed Dec 14 11:29:06 2016 +0800

----------------------------------------------------------------------
 .../engine/evaluator/PolicyChangeListener.java  |  3 +-
 .../impl/PolicyGroupEvaluatorImpl.java          |  8 ++---
 .../eagle/alert/engine/runner/AlertBolt.java    | 15 +++++----
 .../alert/engine/router/TestAlertBolt.java      | 35 ++++++++++++++++----
 4 files changed, 42 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6987af2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
index 40351ad..20c2e1d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
@@ -23,7 +23,8 @@ import java.util.List;
 import java.util.Map;
 
 public interface PolicyChangeListener {
-    void onPolicyChange(List<PolicyDefinition> added,
+    void onPolicyChange(String version,
+                        List<PolicyDefinition> added,
                         List<PolicyDefinition> removed,
                         List<PolicyDefinition> modified, Map<String, StreamDefinition> sds);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6987af2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
index 9b1d76c..af35551 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -110,7 +110,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
     }
 
     @Override
-    public void onPolicyChange(List<PolicyDefinition> added, List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, StreamDefinition> sds) {
+    public void onPolicyChange(String version, List<PolicyDefinition> added, List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, StreamDefinition> sds) {
         Map<String, PolicyDefinition> copyPolicies = new HashMap<>(policyDefinitionMap);
         Map<String, CompositePolicyHandler> copyHandlers = new HashMap<>(policyStreamHandlerMap);
         for (PolicyDefinition pd : added) {
@@ -125,7 +125,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
         }
 
         // logging
-        LOG.info("Policy metadata updated with added={}, removed={}, modified={}", added, removed, modified);
+        LOG.info("{} with {} Policy metadata updated with added={}, removed={}, modified={}", policyEvaluatorId, version, added, removed, modified);
 
         // switch reference
         this.policyDefinitionMap = copyPolicies;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6987af2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index edf1b6f..144f9aa 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -204,16 +205,16 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         List<PolicyDefinition> cachedPoliciesTemp = new ArrayList<>(cachedPolicies.values());
         addOrUpdatedStreams.forEach(s -> {
             cachedPoliciesTemp.stream().filter(p -> p.getInputStreams().contains(s.getStreamId())
-                || p.getOutputStreams().contains(s.getStreamId())).forEach(
-                    p -> {
-                        if (!comparator.getModified().contains(p) && !comparator.getAdded().contains(p)) {
-                            comparator.getModified().add(p);
-                        }
-                    });
+                || p.getOutputStreams().contains(s.getStreamId())).forEach(p -> {
+                    if (comparator.getModified().stream().filter(x -> x.getName().equals(p.getName())).count() <= 0
+                        && comparator.getAdded().stream().filter(x -> x.getName().equals(p.getName())).count() <= 0) {
+                        comparator.getModified().add(p);
+                    }
+                });
             ;
         });
 
-        policyGroupEvaluator.onPolicyChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds);
+        policyGroupEvaluator.onPolicyChange(spec.getVersion(), comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds);
 
         // update alert output collector
         Set<PublishPartition> newPublishPartitions = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6987af2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 8ae29d5..440f555 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -500,13 +500,6 @@ public class TestAlertBolt {
         boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", "testAlertPublish", null);
 
         bolt.onAlertBoltSpecChange(boltSpecs, sds);
-
-        // how to assert
-        Tuple t = createTuple(bolt, boltSpecs.getVersion());
-
-        bolt.execute(t);
-
-        Assert.assertTrue(recieved.get());
         
         LOG.info("Update stream");
         sds = new HashMap();
@@ -516,6 +509,27 @@ public class TestAlertBolt {
         sdTest.setDescription("update the stream");
         bolt.onAlertBoltSpecChange(boltSpecs, sds);
         
+        LOG.info("Update stream & update policy");
+        sds = new HashMap();
+        sdTest = new StreamDefinition();
+        sdTest.setStreamId(TEST_STREAM);
+        sds.put(sdTest.getStreamId(), sdTest);
+        sdTest.setDescription("update the stream & update policy");
+        
+        def = new PolicyDefinition();
+        def.setName("policy-definition");
+        def.setInputStreams(Arrays.asList(TEST_STREAM));
+
+        definition = new PolicyDefinition.Definition();
+        definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
+        definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler");
+        definition.setValue("PT0M,plain,1,host,host2");
+        def.setDefinition(definition);
+        def.setPartitionSpec(Arrays.asList(createPartition()));
+        boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
+        
+        bolt.onAlertBoltSpecChange(boltSpecs, sds);
+        
         LOG.info("No any change");
         sds = new HashMap();
         sdTest = new StreamDefinition();
@@ -523,6 +537,13 @@ public class TestAlertBolt {
         sds.put(sdTest.getStreamId(), sdTest);
         sdTest.setDescription("update the stream");
         bolt.onAlertBoltSpecChange(boltSpecs, sds);
+        
+        // how to assert
+        Tuple t = createTuple(bolt, boltSpecs.getVersion());
+
+        bolt.execute(t);
+
+        Assert.assertTrue(recieved.get());
     }
 
     @Test @Ignore