You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:21 UTC
[25/52] [abbrv] incubator-eagle git commit: EAGLE-506 : AlertEngine :
Make policy definition handler extensible
EAGLE-506 : AlertEngine : Make policy definition handler extensible
Author : ralphsu
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/da7f5520
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/da7f5520
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/da7f5520
Branch: refs/heads/master
Commit: da7f5520613c4860fb2dfb2cb5fad64f9d5d727b
Parents: 3cc1830
Author: Ralph, Su <su...@gmail.com>
Authored: Mon Aug 29 23:33:27 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Mon Aug 29 23:33:27 2016 +0800
----------------------------------------------------------------------
.../engine/coordinator/PolicyDefinition.java | 9 ++
.../evaluator/CompositePolicyHandler.java | 2 +-
.../engine/evaluator/PolicyStreamHandlers.java | 25 ++++-
.../eagle/alert/engine/runner/AlertBolt.java | 1 +
.../alert/engine/router/CustomizedHandler.java | 51 +++++++++
.../alert/engine/router/TestAlertBolt.java | 109 +++++++++++++++----
6 files changed, 168 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 363264e..827172f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -162,6 +162,7 @@ public class PolicyDefinition implements Serializable{
public String type;
public String value;
+ public String handlerClass;
private List<String> inputStreams = new ArrayList<String>();
private List<String> outputStreams = new ArrayList<String>();
@@ -228,6 +229,14 @@ public class PolicyDefinition implements Serializable{
return outputStreams;
}
+ public String getHandlerClass() {
+ return handlerClass;
+ }
+
+ public void setHandlerClass(String handlerClass) {
+ this.handlerClass = handlerClass;
+ }
+
@Override
public String toString() {
return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }",type,value, inputStreams, outputStreams);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
index 047ee6f..4d69bca 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
@@ -49,7 +49,7 @@ public class CompositePolicyHandler implements PolicyStreamHandler {
public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
this.collector = collector;
// TODO: create two handlers
- policyHandler = PolicyStreamHandlers.createHandler(context.getPolicyDefinition().getDefinition().type, sds);
+ policyHandler = PolicyStreamHandlers.createHandler(context.getPolicyDefinition().getDefinition(), sds);
policyHandler.prepare(collector, context);
handlers.add(policyHandler);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index 93327b7..ef9caf0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -16,11 +16,14 @@
*/
package org.apache.eagle.alert.engine.evaluator;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyStateHandler;
import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
@@ -28,19 +31,31 @@ import java.util.Map;
* TODO/FIXME: to support multiple stage definition in single policy. The methods in this class is not good to understand now.(Hard code of 0/1).
*/
public class PolicyStreamHandlers {
+ private static final Logger LOG = LoggerFactory.getLogger(PolicyStreamHandlers.class);
+
public static final String SIDDHI_ENGINE = "siddhi";
public static final String NO_DATA_ALERT_ENGINE = "nodataalert";
public static final String ABSENCE_ALERT_ENGINE = "absencealert";
+ public static final String CUSTOMIZED_ENGINE = "Custom";
- public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds) {
- if (SIDDHI_ENGINE.equals(type)) {
+ public static PolicyStreamHandler createHandler(PolicyDefinition.Definition definition, Map<String, StreamDefinition> sds) {
+ if (SIDDHI_ENGINE.equals(definition.getType())) {
return new SiddhiPolicyHandler(sds, 0);// // FIXME: 8/2/16
- } else if (NO_DATA_ALERT_ENGINE.equals(type)) {
+ } else if (NO_DATA_ALERT_ENGINE.equals(definition.getType())) {
return new NoDataPolicyHandler(sds);
- } else if (ABSENCE_ALERT_ENGINE.equals(type)) {
+ } else if (ABSENCE_ALERT_ENGINE.equals(definition.getType())) {
return new AbsencePolicyHandler(sds);
+ } else if (CUSTOMIZED_ENGINE.equals(definition.getType())) {
+ try {
+ Class<?> handlerClz = Class.forName(definition.getHandlerClass());
+ PolicyStreamHandler handler = (PolicyStreamHandler) handlerClz.getConstructor(Map.class).newInstance(sds);
+ return handler;
+ } catch (Exception e) {
+ LOG.error("Not able to create policy handler for handler class " + definition.getHandlerClass(), e);
+ throw new IllegalArgumentException("Illegal extended policy handler class!" + definition.getHandlerClass());
+ }
}
- throw new IllegalArgumentException("Illegal policy stream handler type " + type);
+ throw new IllegalArgumentException("Illegal policy stream handler type " + definition.getType());
}
public static PolicyStreamHandler createStateHandler(String type, Map<String, StreamDefinition> sds) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index ce5125b..fecb2f1 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -70,6 +70,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies
private AlertBoltSpec spec;
+
public AlertBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService){
super(boltId, changeNotifyService, config);
this.boltId = boltId;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
new file mode 100644
index 0000000..be69ffb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+import java.util.Map;
+
+/**
+ * Created on 8/29/16.
+ */
+public class CustomizedHandler implements PolicyStreamHandler {
+ private Collector<AlertStreamEvent> collector;
+
+ public CustomizedHandler(Map<String, StreamDefinition> sds) {
+ }
+
+ @Override
+ public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+ this.collector = collector;
+ }
+
+ @Override
+ public void send(StreamEvent event) throws Exception {
+ this.collector.emit(new AlertStreamEvent());
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 9d2bb38..f3548d8 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -41,13 +41,14 @@ import org.apache.eagle.alert.engine.runner.AlertBolt;
import org.apache.eagle.alert.engine.runner.TestStreamRouterBolt;
import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.Matchers.any;
@@ -59,6 +60,9 @@ import static org.mockito.Mockito.when;
*/
@SuppressWarnings({"rawtypes", "unused"})
public class TestAlertBolt {
+
+ public static final String TEST_STREAM = "test-stream";
+
/**
* Following knowledge is guaranteed in
*
@@ -164,7 +168,6 @@ public class TestAlertBolt {
bolt.cleanup();
}
- @NotNull
public static AlertBolt createAlertBolt(OutputCollector collector) {
Config config = ConfigFactory.load();
PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
@@ -260,6 +263,7 @@ public class TestAlertBolt {
}
}
+ //TODO: no data alert failed, need to check when no data alert merged.
@Test
public void testMetaversionConflict() throws Exception {
AtomicInteger failedCount = new AtomicInteger();
@@ -283,25 +287,6 @@ public class TestAlertBolt {
});
AlertBolt bolt = createAlertBolt(collector);
- GeneralTopologyContext context = mock(GeneralTopologyContext.class);
- int taskId = 1;
- when(context.getComponentId(taskId)).thenReturn("comp1");
- when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
- // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange)
- PartitionedEvent pe = new PartitionedEvent();
- pe.setPartitionKey(1);
- pe.setPartition(createPartition());
- StreamEvent streamEvent = new StreamEvent();
- streamEvent.setStreamId("test-stream");
- streamEvent.setTimestamp(System.currentTimeMillis());
- streamEvent.setMetaVersion("spec_version_"+System.currentTimeMillis());
- pe.setEvent(streamEvent);
-
- PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt);
- byte[] serializedEvent = peSer.serialize(pe);
- Tuple input = new TupleImpl(context, Collections.singletonList(serializedEvent), taskId, "default");
-
-
Map<String, StreamDefinition> sds = new HashMap();
StreamDefinition sdTest = new StreamDefinition();
String streamId = "test-stream";
@@ -318,13 +303,16 @@ public class TestAlertBolt {
PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
definition.setType(PolicyStreamHandlers.NO_DATA_ALERT_ENGINE);
- definition.setValue("PT0M,plain,1,host,host1");
+ definition.setValue("PT0M,provided,1,host,host1");
def.setDefinition(definition);
+ def.setPartitionSpec(Arrays.asList(createPartition()));
+ def.setOutputStreams(Arrays.asList("out"));
boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
bolt = createAlertBolt(collector);
bolt.onAlertBoltSpecChange(boltSpecs, sds);
+ Tuple input = createTuple(bolt, boltSpecs.getVersion());
bolt.execute(input);
// Sleep 10s to wait thread in bolt.execute() to finish works
@@ -335,12 +323,87 @@ public class TestAlertBolt {
}
- @NotNull
+ private Tuple createTuple(AlertBolt bolt, String version) throws IOException {
+ GeneralTopologyContext context = mock(GeneralTopologyContext.class);
+ int taskId = 1;
+ when(context.getComponentId(taskId)).thenReturn("comp1");
+ when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
+ // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange)
+ PartitionedEvent pe = new PartitionedEvent();
+ pe.setPartitionKey(1);
+ pe.setPartition(createPartition());
+ StreamEvent streamEvent = new StreamEvent();
+ streamEvent.setStreamId(TEST_STREAM);
+ streamEvent.setTimestamp(System.currentTimeMillis());
+ streamEvent.setMetaVersion(version);
+ pe.setEvent(streamEvent);
+
+ PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt);
+ byte[] serializedEvent = peSer.serialize(pe);
+ return new TupleImpl(context, Collections.singletonList(serializedEvent), taskId, "default");
+ }
+
private StreamPartition createPartition() {
StreamPartition sp = new StreamPartition();
+ sp.setStreamId(TEST_STREAM);
sp.setType(StreamPartition.Type.GROUPBY);
return sp;
}
+ @Test
+ public void testExtendDefinition() throws IOException {
+ PolicyDefinition def = new PolicyDefinition();
+ def.setName("policy-definition");
+ def.setInputStreams(Arrays.asList(TEST_STREAM));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
+ definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler");
+ definition.setValue("PT0M,plain,1,host,host1");
+ def.setDefinition(definition);
+ def.setPartitionSpec(Arrays.asList(createPartition()));
+
+ AlertBoltSpec boltSpecs = new AlertBoltSpec();
+
+ AtomicBoolean recieved = new AtomicBoolean(false);
+ OutputCollector collector = new OutputCollector(new IOutputCollector() {
+ @Override
+ public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ recieved.set(true);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {}
+
+ @Override
+ public void ack(Tuple input) {}
+
+ @Override
+ public void fail(Tuple input) {}
+
+ @Override
+ public void reportError(Throwable error) {}
+ });
+ AlertBolt bolt = createAlertBolt(collector);
+
+ boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
+ boltSpecs.setVersion("spec_"+System.currentTimeMillis());
+ // stream def map
+ Map<String, StreamDefinition> sds = new HashMap();
+ StreamDefinition sdTest = new StreamDefinition();
+ sdTest.setStreamId(TEST_STREAM);
+ sds.put(sdTest.getStreamId(), sdTest);
+
+ bolt.onAlertBoltSpecChange(boltSpecs, sds);
+
+ // how to assert
+ Tuple t = createTuple(bolt, boltSpecs.getVersion());
+
+ bolt.execute(t);
+
+ Assert.assertTrue(recieved.get());
+ }
}
+