You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/12/14 03:38:33 UTC
incubator-eagle git commit: [EAGLE-837] fix duplicated policy update
and add tests
Repository: incubator-eagle
Updated Branches:
refs/heads/master 84ceeb150 -> d6987af2b
[EAGLE-837] fix duplicated policy update and add tests
fix duplicated policy populated during alertBolt reload
refine alert bolt reload test
Author: Li, Garrett
Reviewer: ralphsu
This closes #738
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/d6987af2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/d6987af2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/d6987af2
Branch: refs/heads/master
Commit: d6987af2bbd04e1fe44263720d0b8eb41905bd02
Parents: 84ceeb1
Author: Xiancheng Li <xi...@ebay.com>
Authored: Wed Dec 14 11:01:41 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Wed Dec 14 11:29:06 2016 +0800
----------------------------------------------------------------------
.../engine/evaluator/PolicyChangeListener.java | 3 +-
.../impl/PolicyGroupEvaluatorImpl.java | 8 ++---
.../eagle/alert/engine/runner/AlertBolt.java | 15 +++++----
.../alert/engine/router/TestAlertBolt.java | 35 ++++++++++++++++----
4 files changed, 42 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6987af2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
index 40351ad..20c2e1d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
@@ -23,7 +23,8 @@ import java.util.List;
import java.util.Map;
public interface PolicyChangeListener {
- void onPolicyChange(List<PolicyDefinition> added,
+ void onPolicyChange(String version,
+ List<PolicyDefinition> added,
List<PolicyDefinition> removed,
List<PolicyDefinition> modified, Map<String, StreamDefinition> sds);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6987af2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
index 9b1d76c..af35551 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- * <p/>
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -110,7 +110,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
}
@Override
- public void onPolicyChange(List<PolicyDefinition> added, List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, StreamDefinition> sds) {
+ public void onPolicyChange(String version, List<PolicyDefinition> added, List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, StreamDefinition> sds) {
Map<String, PolicyDefinition> copyPolicies = new HashMap<>(policyDefinitionMap);
Map<String, CompositePolicyHandler> copyHandlers = new HashMap<>(policyStreamHandlerMap);
for (PolicyDefinition pd : added) {
@@ -125,7 +125,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
}
// logging
- LOG.info("Policy metadata updated with added={}, removed={}, modified={}", added, removed, modified);
+ LOG.info("{} with {} Policy metadata updated with added={}, removed={}, modified={}", policyEvaluatorId, version, added, removed, modified);
// switch reference
this.policyDefinitionMap = copyPolicies;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6987af2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index edf1b6f..144f9aa 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -204,16 +205,16 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
List<PolicyDefinition> cachedPoliciesTemp = new ArrayList<>(cachedPolicies.values());
addOrUpdatedStreams.forEach(s -> {
cachedPoliciesTemp.stream().filter(p -> p.getInputStreams().contains(s.getStreamId())
- || p.getOutputStreams().contains(s.getStreamId())).forEach(
- p -> {
- if (!comparator.getModified().contains(p) && !comparator.getAdded().contains(p)) {
- comparator.getModified().add(p);
- }
- });
+ || p.getOutputStreams().contains(s.getStreamId())).forEach(p -> {
+ if (comparator.getModified().stream().filter(x -> x.getName().equals(p.getName())).count() <= 0
+ && comparator.getAdded().stream().filter(x -> x.getName().equals(p.getName())).count() <= 0) {
+ comparator.getModified().add(p);
+ }
+ });
;
});
- policyGroupEvaluator.onPolicyChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds);
+ policyGroupEvaluator.onPolicyChange(spec.getVersion(), comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds);
// update alert output collector
Set<PublishPartition> newPublishPartitions = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6987af2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 8ae29d5..440f555 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -500,13 +500,6 @@ public class TestAlertBolt {
boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", "testAlertPublish", null);
bolt.onAlertBoltSpecChange(boltSpecs, sds);
-
- // how to assert
- Tuple t = createTuple(bolt, boltSpecs.getVersion());
-
- bolt.execute(t);
-
- Assert.assertTrue(recieved.get());
LOG.info("Update stream");
sds = new HashMap();
@@ -516,6 +509,27 @@ public class TestAlertBolt {
sdTest.setDescription("update the stream");
bolt.onAlertBoltSpecChange(boltSpecs, sds);
+ LOG.info("Update stream & update policy");
+ sds = new HashMap();
+ sdTest = new StreamDefinition();
+ sdTest.setStreamId(TEST_STREAM);
+ sds.put(sdTest.getStreamId(), sdTest);
+ sdTest.setDescription("update the stream & update policy");
+
+ def = new PolicyDefinition();
+ def.setName("policy-definition");
+ def.setInputStreams(Arrays.asList(TEST_STREAM));
+
+ definition = new PolicyDefinition.Definition();
+ definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
+ definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler");
+ definition.setValue("PT0M,plain,1,host,host2");
+ def.setDefinition(definition);
+ def.setPartitionSpec(Arrays.asList(createPartition()));
+ boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
+
+ bolt.onAlertBoltSpecChange(boltSpecs, sds);
+
LOG.info("No any change");
sds = new HashMap();
sdTest = new StreamDefinition();
@@ -523,6 +537,13 @@ public class TestAlertBolt {
sds.put(sdTest.getStreamId(), sdTest);
sdTest.setDescription("update the stream");
bolt.onAlertBoltSpecChange(boltSpecs, sds);
+
+ // how to assert
+ Tuple t = createTuple(bolt, boltSpecs.getVersion());
+
+ bolt.execute(t);
+
+ Assert.assertTrue(recieved.get());
}
@Test @Ignore