You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:21 UTC

[25/52] [abbrv] incubator-eagle git commit: EAGLE-506 : AlertEngine : Make policy definition handler extensible

EAGLE-506 : AlertEngine : Make policy definition handler extensible

Author : ralphsu


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/da7f5520
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/da7f5520
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/da7f5520

Branch: refs/heads/master
Commit: da7f5520613c4860fb2dfb2cb5fad64f9d5d727b
Parents: 3cc1830
Author: Ralph, Su <su...@gmail.com>
Authored: Mon Aug 29 23:33:27 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Mon Aug 29 23:33:27 2016 +0800

----------------------------------------------------------------------
 .../engine/coordinator/PolicyDefinition.java    |   9 ++
 .../evaluator/CompositePolicyHandler.java       |   2 +-
 .../engine/evaluator/PolicyStreamHandlers.java  |  25 ++++-
 .../eagle/alert/engine/runner/AlertBolt.java    |   1 +
 .../alert/engine/router/CustomizedHandler.java  |  51 +++++++++
 .../alert/engine/router/TestAlertBolt.java      | 109 +++++++++++++++----
 6 files changed, 168 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 363264e..827172f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -162,6 +162,7 @@ public class PolicyDefinition implements Serializable{
 
         public String type;
         public String value;
+        public String handlerClass;
 
         private List<String> inputStreams = new ArrayList<String>();
         private List<String> outputStreams = new ArrayList<String>();
@@ -228,6 +229,14 @@ public class PolicyDefinition implements Serializable{
             return outputStreams;
         }
 
+        public String getHandlerClass() {
+            return handlerClass;
+        }
+
+        public void setHandlerClass(String handlerClass) {
+            this.handlerClass = handlerClass;
+        }
+
         @Override
         public String toString() {
             return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }",type,value, inputStreams, outputStreams);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
index 047ee6f..4d69bca 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
@@ -49,7 +49,7 @@ public class CompositePolicyHandler implements PolicyStreamHandler {
     public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
         this.collector = collector;
         // TODO: create two handlers
-        policyHandler = PolicyStreamHandlers.createHandler(context.getPolicyDefinition().getDefinition().type, sds);
+        policyHandler = PolicyStreamHandlers.createHandler(context.getPolicyDefinition().getDefinition(), sds);
         policyHandler.prepare(collector, context);
         handlers.add(policyHandler);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index 93327b7..ef9caf0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -16,11 +16,14 @@
  */
 package org.apache.eagle.alert.engine.evaluator;
 
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
 import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
 import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyStateHandler;
 import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
@@ -28,19 +31,31 @@ import java.util.Map;
  * TODO/FIXME: to support multiple stage definition in single policy. The methods in this class is not good to understand now.(Hard code of 0/1).
  */
 public class PolicyStreamHandlers {
+    private static final Logger LOG = LoggerFactory.getLogger(PolicyStreamHandlers.class);
+
     public static final String SIDDHI_ENGINE = "siddhi";
     public static final String NO_DATA_ALERT_ENGINE = "nodataalert";
     public static final String ABSENCE_ALERT_ENGINE = "absencealert";
+    public static final String CUSTOMIZED_ENGINE = "Custom";
 
-    public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds) {
-        if (SIDDHI_ENGINE.equals(type)) {
+    public static PolicyStreamHandler createHandler(PolicyDefinition.Definition definition, Map<String, StreamDefinition> sds) {
+        if (SIDDHI_ENGINE.equals(definition.getType())) {
             return new SiddhiPolicyHandler(sds, 0);// // FIXME: 8/2/16 
-        } else if (NO_DATA_ALERT_ENGINE.equals(type)) {
+        } else if (NO_DATA_ALERT_ENGINE.equals(definition.getType())) {
             return new NoDataPolicyHandler(sds);
-        } else if (ABSENCE_ALERT_ENGINE.equals(type)) {
+        } else if (ABSENCE_ALERT_ENGINE.equals(definition.getType())) {
             return new AbsencePolicyHandler(sds);
+        } else if (CUSTOMIZED_ENGINE.equals(definition.getType())) {
+            try {
+                Class<?> handlerClz = Class.forName(definition.getHandlerClass());
+                PolicyStreamHandler handler = (PolicyStreamHandler) handlerClz.getConstructor(Map.class).newInstance(sds);
+                return handler;
+            } catch (Exception e) {
+                LOG.error("Not able to create policy handler for handler class " + definition.getHandlerClass(), e);
+                throw new IllegalArgumentException("Illegal extended policy handler class!" + definition.getHandlerClass());
+            }
         }
-        throw new IllegalArgumentException("Illegal policy stream handler type " + type);
+        throw new IllegalArgumentException("Illegal policy stream handler type " + definition.getType());
     }
 
     public static PolicyStreamHandler createStateHandler(String type, Map<String, StreamDefinition> sds) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index ce5125b..fecb2f1 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -70,6 +70,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
     private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies
 
     private AlertBoltSpec spec;
+
     public AlertBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService){
         super(boltId, changeNotifyService, config);
         this.boltId = boltId;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
new file mode 100644
index 0000000..be69ffb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+import java.util.Map;
+
+/**
+ * Created on 8/29/16.
+ */
+public class CustomizedHandler implements PolicyStreamHandler {
+    private Collector<AlertStreamEvent> collector;
+
+    public CustomizedHandler(Map<String, StreamDefinition> sds) {
+    }
+
+    @Override
+    public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+        this.collector = collector;
+    }
+
+    @Override
+    public void send(StreamEvent event) throws Exception {
+        this.collector.emit(new AlertStreamEvent());
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da7f5520/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 9d2bb38..f3548d8 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -41,13 +41,14 @@ import org.apache.eagle.alert.engine.runner.AlertBolt;
 import org.apache.eagle.alert.engine.runner.TestStreamRouterBolt;
 import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
 import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.mockito.Matchers.any;
@@ -59,6 +60,9 @@ import static org.mockito.Mockito.when;
  */
 @SuppressWarnings({"rawtypes", "unused"})
 public class TestAlertBolt {
+
+    public static final String TEST_STREAM = "test-stream";
+
     /**
      * Following knowledge is guaranteed in
      *
@@ -164,7 +168,6 @@ public class TestAlertBolt {
         bolt.cleanup();
     }
 
-    @NotNull
     public static AlertBolt createAlertBolt(OutputCollector collector) {
         Config config = ConfigFactory.load();
         PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
@@ -260,6 +263,7 @@ public class TestAlertBolt {
         }
     }
 
+    //TODO: no data alert failed, need to check when no data alert merged.
     @Test
     public void testMetaversionConflict() throws Exception {
         AtomicInteger failedCount = new AtomicInteger();
@@ -283,25 +287,6 @@ public class TestAlertBolt {
         });
         AlertBolt bolt = createAlertBolt(collector);
 
-        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
-        int taskId = 1;
-        when(context.getComponentId(taskId)).thenReturn("comp1");
-        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
-        // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange)
-        PartitionedEvent pe = new PartitionedEvent();
-        pe.setPartitionKey(1);
-        pe.setPartition(createPartition());
-        StreamEvent streamEvent = new StreamEvent();
-        streamEvent.setStreamId("test-stream");
-        streamEvent.setTimestamp(System.currentTimeMillis());
-        streamEvent.setMetaVersion("spec_version_"+System.currentTimeMillis());
-        pe.setEvent(streamEvent);
-
-        PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt);
-        byte[] serializedEvent = peSer.serialize(pe);
-        Tuple input = new TupleImpl(context, Collections.singletonList(serializedEvent), taskId, "default");
-
-
         Map<String, StreamDefinition> sds = new HashMap();
         StreamDefinition sdTest = new StreamDefinition();
         String streamId = "test-stream";
@@ -318,13 +303,16 @@ public class TestAlertBolt {
 
         PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
         definition.setType(PolicyStreamHandlers.NO_DATA_ALERT_ENGINE);
-        definition.setValue("PT0M,plain,1,host,host1");
+        definition.setValue("PT0M,provided,1,host,host1");
         def.setDefinition(definition);
+        def.setPartitionSpec(Arrays.asList(createPartition()));
+        def.setOutputStreams(Arrays.asList("out"));
 
         boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
         bolt = createAlertBolt(collector);
         bolt.onAlertBoltSpecChange(boltSpecs, sds);
 
+        Tuple input = createTuple(bolt, boltSpecs.getVersion());
         bolt.execute(input);
 
         // Sleep 10s to wait thread in bolt.execute() to finish works
@@ -335,12 +323,87 @@ public class TestAlertBolt {
 
     }
 
-    @NotNull
+    private Tuple createTuple(AlertBolt bolt, String version) throws IOException {
+        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
+        int taskId = 1;
+        when(context.getComponentId(taskId)).thenReturn("comp1");
+        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
+        // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange)
+        PartitionedEvent pe = new PartitionedEvent();
+        pe.setPartitionKey(1);
+        pe.setPartition(createPartition());
+        StreamEvent streamEvent = new StreamEvent();
+        streamEvent.setStreamId(TEST_STREAM);
+        streamEvent.setTimestamp(System.currentTimeMillis());
+        streamEvent.setMetaVersion(version);
+        pe.setEvent(streamEvent);
+
+        PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt);
+        byte[] serializedEvent = peSer.serialize(pe);
+        return new TupleImpl(context, Collections.singletonList(serializedEvent), taskId, "default");
+    }
+
     private StreamPartition createPartition() {
         StreamPartition sp = new StreamPartition();
+        sp.setStreamId(TEST_STREAM);
         sp.setType(StreamPartition.Type.GROUPBY);
         return sp;
     }
 
+    @Test
+    public void testExtendDefinition() throws IOException {
+        PolicyDefinition def = new PolicyDefinition();
+        def.setName("policy-definition");
+        def.setInputStreams(Arrays.asList(TEST_STREAM));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
+        definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler");
+        definition.setValue("PT0M,plain,1,host,host1");
+        def.setDefinition(definition);
+        def.setPartitionSpec(Arrays.asList(createPartition()));
+
+        AlertBoltSpec boltSpecs = new AlertBoltSpec();
+
+        AtomicBoolean recieved = new AtomicBoolean(false);
+        OutputCollector collector = new OutputCollector(new IOutputCollector() {
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+                recieved.set(true);
+                return Collections.emptyList();
+            }
+
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {}
+
+            @Override
+            public void ack(Tuple input) {}
+
+            @Override
+            public void fail(Tuple input) {}
+
+            @Override
+            public void reportError(Throwable error) {}
+        });
+        AlertBolt bolt = createAlertBolt(collector);
+
+        boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
+        boltSpecs.setVersion("spec_"+System.currentTimeMillis());
+        // stream def map
+        Map<String, StreamDefinition> sds = new HashMap();
+        StreamDefinition sdTest = new StreamDefinition();
+        sdTest.setStreamId(TEST_STREAM);
+        sds.put(sdTest.getStreamId(), sdTest);
+
+        bolt.onAlertBoltSpecChange(boltSpecs, sds);
+
+        // how to assert
+        Tuple t = createTuple(bolt, boltSpecs.getVersion());
+
+        bolt.execute(t);
+
+        Assert.assertTrue(recieved.get());
+    }
 
 }
+