You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:54 UTC
[47/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46]
Rename package name as "org.apache.eagle"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyLifecycleMethods.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyLifecycleMethods.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyLifecycleMethods.java
deleted file mode 100644
index af9357d..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyLifecycleMethods.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.Map;
-
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-
-public interface PolicyLifecycleMethods {
- void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added);
- void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed);
- void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyManager.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyManager.java
deleted file mode 100644
index 1080d08..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyManager.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.Module;
-
-public class PolicyManager {
- private final static Logger LOG = LoggerFactory.getLogger(PolicyManager.class);
- private static PolicyManager instance = new PolicyManager();
-
- private ServiceLoader<PolicyEvaluatorServiceProvider> loader;
-
- private Map<String, Class<? extends PolicyEvaluator>> policyEvaluators = new HashMap<String, Class<? extends PolicyEvaluator>>();
- private Map<String, List<Module>> policyModules = new HashMap<String, List<Module>>();
-
- private PolicyManager(){
- loader = ServiceLoader.load(PolicyEvaluatorServiceProvider.class);
- Iterator<PolicyEvaluatorServiceProvider> iter = loader.iterator();
- while(iter.hasNext()){
- PolicyEvaluatorServiceProvider factory = iter.next();
- LOG.info("Supported policy type : " + factory.getPolicyType());
- policyEvaluators.put(factory.getPolicyType(), factory.getPolicyEvaluator());
- policyModules.put(factory.getPolicyType(), factory.getBindingModules());
- }
- }
-
- public static PolicyManager getInstance(){
- return instance;
- }
-
- public Class<? extends PolicyEvaluator> getPolicyEvaluator(String policyType){
- return policyEvaluators.get(policyType);
- }
-
- public List<Module> getPolicyModules(String policyType){
- return policyModules.get(policyType);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyPartitioner.java
deleted file mode 100644
index ecdb667..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyPartitioner.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.io.Serializable;
-
-/**
- * partition policies so that policies can be distributed into different alert evaluators
- */
-public interface PolicyPartitioner extends Serializable {
- int partition(int numTotalPartitions, String policyType, String policyId);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/AttributeType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/AttributeType.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/AttributeType.java
deleted file mode 100644
index c6aeedc..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/AttributeType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-/**
- * @see org.wso2.siddhi.query.api.definition.Attribute.Type
- */
-public enum AttributeType {
- STRING,
- LONG,
- INTEGER,
- BOOL,
- FLOAT,
- DOUBLE
-// , OBJECT
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/EagleAlertContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/EagleAlertContext.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/EagleAlertContext.java
deleted file mode 100644
index a39d991..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/EagleAlertContext.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import eagle.executor.AlertExecutor;
-import eagle.alert.policy.PolicyEvaluator;
-import eagle.datastream.Collector;
-
-public class EagleAlertContext {
-
- public AlertExecutor alertExecutor;
-
- public String policyId;
-
- public PolicyEvaluator evaluator;
-
- public Collector outputCollector;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java
deleted file mode 100644
index 15d3e85..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.alert.entity.AlertStreamSchemaEntity;
-import eagle.alert.notification.UrlBuilder;
-import eagle.common.config.EagleConfigConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import eagle.common.DateTimeUtil;
-import eagle.common.metric.AlertContext;
-import com.typesafe.config.Config;
-
-public class SiddhiAlertAPIEntityRendner {
-
- public static final Logger LOG = LoggerFactory.getLogger(SiddhiAlertAPIEntityRendner.class);
- public static final String source = ManagementFactory.getRuntimeMXBean().getName();
-
- public static AlertAPIEntity render(Config config, List<String> rets, EagleAlertContext siddhiAlertContext, long timestamp) {
- SiddhiPolicyEvaluator evaluator = (SiddhiPolicyEvaluator)siddhiAlertContext.evaluator;
- String alertExecutorId = siddhiAlertContext.alertExecutor.getAlertExecutorId();
- AlertAPIEntity entity = new AlertAPIEntity();
- AlertContext context = new AlertContext();
- String sourceStreams = evaluator.getAdditionalContext().get(AlertConstants.SOURCE_STREAMS);
- String[] sourceStreamsArr = sourceStreams.split(",");
- List<String> attrRenameList = evaluator.getOutputStreamAttrNameList();
- Map<String, String> tags = new HashMap<String, String>();
- for (String sourceStream : sourceStreamsArr) {
- List<AlertStreamSchemaEntity> list = StreamMetadataManager.getInstance().getMetadataEntitiesForStream(sourceStream.trim());
- for (AlertStreamSchemaEntity alertStream : list) {
- if (alertStream.getUsedAsTag() != null && alertStream.getUsedAsTag() == true) {
- String attrName = alertStream.getTags().get(AlertConstants.ATTR_NAME);
- tags.put(attrName, rets.get(attrRenameList.indexOf(attrName)));
- }
- }
- }
-
- for (int index = 0; index < rets.size(); index++) {
- //attrRenameList.get(0) -> "eagleAlertContext". We need to skip "eagleAlertContext", index is from 1 for attRenameList.
- context.addProperty(attrRenameList.get(index + 1), rets.get(index));
- }
-
- StringBuilder sb = new StringBuilder();
- for (Entry<String, String> entry : context.getProperties().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- sb.append(key + "=\"" + value + "\" ");
- }
- context.addAll(evaluator.getAdditionalContext());
- String policyId = context.getProperty(AlertConstants.POLICY_ID);
- String alertMessage = "The Policy \"" + policyId + "\" has been detected with the below information: " + sb.toString() ;
- String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
- String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
- String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
- context.addProperty(AlertConstants.ALERT_EVENT, sb.toString());
- context.addProperty(AlertConstants.ALERT_MESSAGE, alertMessage);
- context.addProperty(AlertConstants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
- context.addProperty(EagleConfigConstants.DATA_SOURCE, dataSource);
- context.addProperty(EagleConfigConstants.SITE, site);
- entity.setTimestamp(timestamp);
- /** If we need to add severity tag, we should add severity filed in AbstractpolicyDefinition, and pass it down **/
- tags.put(EagleConfigConstants.SITE, site);
- tags.put(EagleConfigConstants.DATA_SOURCE, dataSource);
- tags.put(AlertConstants.SOURCE_STREAMS, context.getProperty(AlertConstants.SOURCE_STREAMS));
- tags.put(AlertConstants.POLICY_ID, context.getProperty(AlertConstants.POLICY_ID));
- tags.put(AlertConstants.ALERT_SOURCE, source);
- tags.put(AlertConstants.ALERT_EXECUTOR_ID, alertExecutorId);
- entity.setTags(tags);
-
- context.addProperty(AlertConstants.POLICY_DETAIL_URL, UrlBuilder.buiildPolicyDetailUrl(host, port, tags));
- context.addProperty(AlertConstants.ALERT_DETAIL_URL, UrlBuilder.buildAlertDetailUrl(host, port, entity));
- entity.setAlertContext(context);
- return entity;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiAlertHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiAlertHandler.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiAlertHandler.java
deleted file mode 100644
index 2aa778c..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiAlertHandler.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import eagle.alert.entity.AlertAPIEntity;
-
-import java.util.List;
-
-public interface SiddhiAlertHandler {
-
- void onAlerts(EagleAlertContext context, List<AlertAPIEntity> alerts);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyDefinition.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyDefinition.java
deleted file mode 100644
index b97dead..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyDefinition.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import eagle.alert.config.AbstractPolicyDefinition;
-
-/**
- * siddhi policy definition has the following format
- * {
- "type":"SiddhiCEPEngine",
- "expression" : "from every b1=HeapUsage[metric == 'eagle.metric.gc'] -> a1=FullGCEvent[eventName == 'full gc'] -> b2=HeapUsage[metric == b1.metric and host == b1.host and value >= b1.value * 0.8] within 100 sec select a1.eventName, b1.metric, b2.timestamp, 60 as timerange insert into GCMonitor; "
- }
- */
-public class SiddhiPolicyDefinition extends AbstractPolicyDefinition {
- private String expression;
-
- public String getExpression() {
- return expression;
- }
- public void setExpression(String expression) {
- this.expression = expression;
- }
-
- @Override
- public String toString(){
- return expression;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyEvaluator.java
deleted file mode 100644
index cc27636..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyEvaluator.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import eagle.alert.config.AbstractPolicyDefinition;
-import eagle.alert.entity.AlertStreamSchemaEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.alert.policy.PolicyEvaluator;
-import eagle.alert.policy.PolicyManager;
-import eagle.dataproc.core.JsonSerDeserUtils;
-import eagle.dataproc.core.ValuesArray;
-import com.typesafe.config.Config;
-
-/**
- * when policy is updated or deleted, SiddhiManager.shutdown should be invoked to release resources.
- * during this time, synchronization is important
- */
-public class SiddhiPolicyEvaluator implements PolicyEvaluator{
- private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyEvaluator.class);
- public static final int DEFAULT_QUEUE_SIZE = 1000;
- private final BlockingQueue<AlertAPIEntity> queue = new ArrayBlockingQueue<AlertAPIEntity>(DEFAULT_QUEUE_SIZE);
- private volatile SiddhiRuntime siddhiRuntime;
- private String[] sourceStreams;
- private boolean needValidation;
- private String policyId;
- private Config config;
- private final static String EXECUTION_PLAN_NAME = "query";
-
- /**
- * everything dependent on policyDef should be together and switched in runtime
- */
- public static class SiddhiRuntime{
- QueryCallback callback;
- Map<String, InputHandler> siddhiInputHandlers;
- SiddhiManager siddhiManager;
- SiddhiPolicyDefinition policyDef;
- List<String> outputFields;
- String executionPlanName;
- }
-
- public SiddhiPolicyEvaluator(Config config, String policyName, AbstractPolicyDefinition policyDef, String[] sourceStreams){
- this(config, policyName, policyDef, sourceStreams, false);
- }
-
- public SiddhiPolicyEvaluator(Config config, String policyId, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){
- this.config = config;
- this.policyId = policyId;
- this.needValidation = needValidation;
- this.sourceStreams = sourceStreams;
- init(policyDef);
- }
-
- public void init(AbstractPolicyDefinition policyDef){
- siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef);
- }
-
- public static String addContextFieldIfNotExist(String expression) {
- // select fieldA, fieldB --> select eagleAlertContext, fieldA, fieldB
- int pos = expression.indexOf("select ") + 7;
- int index = pos;
- boolean isSelectStarPattern = true;
- while(index < expression.length()) {
- if (expression.charAt(index) == ' ') index++;
- else if (expression.charAt(index) == '*') break;
- else {
- isSelectStarPattern = false;
- break;
- }
- }
- if (isSelectStarPattern) return expression;
- StringBuilder sb = new StringBuilder();
- sb.append(expression.substring(0, pos));
- sb.append(SiddhiStreamMetadataUtils.EAGLE_ALERT_CONTEXT_FIELD + ",");
- sb.append(expression.substring(pos, expression.length()));
- return sb.toString();
- }
-
- private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition policyDef){
- SiddhiManager siddhiManager = new SiddhiManager();
- Map<String, InputHandler> siddhiInputHandlers = new HashMap<String, InputHandler>();
-
- StringBuilder sb = new StringBuilder();
- for(String sourceStream : sourceStreams){
- String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream);
- LOG.info("Siddhi stream definition : " + streamDef);
- sb.append(streamDef);
- }
-
- String expression = addContextFieldIfNotExist(policyDef.getExpression());
- String executionPlan = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + expression;
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-
- for(String sourceStream : sourceStreams){
- siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream));
- }
- executionPlanRuntime.start();
-
- QueryCallback callback = new SiddhiQueryCallbackImpl(config, this);
-
- LOG.info("Siddhi query: " + expression);
- executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback);
-
- List<String> outputFields = new ArrayList<String>();
- try {
- Field field = QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME);
- field.setAccessible(true);
- Query query = (Query)field.get(callback);
- List<OutputAttribute> list = query.getSelector().getSelectionList();
- for (OutputAttribute output : list) {
- outputFields.add(output.getRename());
- }
- }
- catch (Exception ex) {
- LOG.error("Got an Exception when initial outputFields ", ex);
- }
- SiddhiRuntime runtime = new SiddhiRuntime();
- runtime.siddhiInputHandlers = siddhiInputHandlers;
- runtime.siddhiManager = siddhiManager;
- runtime.callback = callback;
- runtime.policyDef = policyDef;
- runtime.outputFields = outputFields;
- runtime.executionPlanName = executionPlanRuntime.getName();
- return runtime;
- }
-
- /**
- * 1. input has 3 fields, first is siddhi context, second is streamName, the last one is map of attribute name/value
- * 2. runtime check for input data (This is very expensive, so we ignore for now)
- * the size of input map should be equal to size of attributes which stream metadata defines
- * the attribute names should be equal to attribute names which stream metadata defines
- * the input field cannot be null
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public void evaluate(ValuesArray data) throws Exception {
- if(LOG.isDebugEnabled()) LOG.debug("Siddhi policy evaluator consumers data :" + data);
- Object siddhiAlertContext = data.get(0);
- String streamName = (String)data.get(1);
- SortedMap map = (SortedMap)data.get(2);
- validateEventInRuntime(streamName, map);
- synchronized(siddhiRuntime){
- //insert siddhiAlertContext into the first field
- List<Object> input = new ArrayList<>();
- input.add(siddhiAlertContext);
- putAttrsIntoInputStream(input, streamName, map);
- siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0]));
- }
- }
-
- /**
- * this is a heavy operation, we should avoid to use
- * @param sourceStream
- * @param data
- */
- private void validateEventInRuntime(String sourceStream, SortedMap data){
- if(!needValidation)
- return;
- SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(sourceStream);
- if(!map.keySet().equals(data.keySet()))
- throw new IllegalStateException("incoming data schema is different from supported data schema, incoming data: " + data.keySet() + ", schema: " + map.keySet());
- }
-
- private void putAttrsIntoInputStream(List<Object> input, String streamName, SortedMap map) {
- if(!needValidation) {
- input.addAll(map.values());
- return;
- }
- for (Object key : map.keySet()) {
- Object value = map.get(key);
- if (value == null) {
- input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, (String)key));
- }
- else input.add(value);
- }
- }
-
- @Override
- public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef) {
- AbstractPolicyDefinition policyDef = null;
- try {
- policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(),
- AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(AlertConstants.POLICY_TYPE)));
- }
- catch (Exception ex) {
- LOG.error("Initial policy def error, ", ex);
- }
- SiddhiRuntime previous = siddhiRuntime;
- siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef);
- synchronized(previous){
- previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown();
- }
- }
-
- @Override
- public void onPolicyDelete(){
- synchronized(siddhiRuntime){
- LOG.info("Going to shutdown siddhi execution plan, planName: " + siddhiRuntime.executionPlanName);
- siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown();
- LOG.info("Siddhi execution plan " + siddhiRuntime.executionPlanName + " is successfully shutdown ");
- }
- }
-
- @Override
- public String toString(){
- // show the policyDef
- return siddhiRuntime.policyDef.toString();
- }
-
- public String[] getStreamNames() {
- return sourceStreams;
- }
-
- public Map<String, String> getAdditionalContext() {
- Map<String, String> context = new HashMap<String, String>();
- StringBuilder sourceStreams = new StringBuilder();
- for (String streamName : getStreamNames()) {
- sourceStreams.append(streamName + ",");
- }
- if (sourceStreams.length() > 0) {
- sourceStreams.deleteCharAt(sourceStreams.length() - 1);
- }
- context.put(AlertConstants.SOURCE_STREAMS, sourceStreams.toString());
- context.put(AlertConstants.POLICY_ID, policyId);
- return context;
- }
-
- public List<String> getOutputStreamAttrNameList() {
- return siddhiRuntime.outputFields;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
deleted file mode 100644
index e8f4432..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.policy.PolicyEvaluator;
-import eagle.alert.policy.PolicyEvaluatorServiceProvider;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-
-public class SiddhiPolicyEvaluatorServiceProviderImpl implements PolicyEvaluatorServiceProvider {
- @Override
- public String getPolicyType() {
- return AlertConstants.policyType.siddhiCEPEngine.name();
- }
-
- @Override
- public Class<? extends PolicyEvaluator> getPolicyEvaluator() {
- return SiddhiPolicyEvaluator.class;
- }
-
- @Override
- public List<Module> getBindingModules() {
- Module module1 = new SimpleModule(AlertConstants.POLICY_DEFINITION).registerSubtypes(new NamedType(SiddhiPolicyDefinition.class, getPolicyType()));
- return Arrays.asList(module1);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java
deleted file mode 100644
index 5585e08..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.executor.AlertExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-
-public class SiddhiQueryCallbackImpl extends QueryCallback{
-
- private SiddhiPolicyEvaluator evaluator;
- public static final Logger LOG = LoggerFactory.getLogger(SiddhiQueryCallbackImpl.class);
- public static final ObjectMapper mapper = new ObjectMapper();
- public Config config;
-
- public SiddhiQueryCallbackImpl(Config config, SiddhiPolicyEvaluator evaluator) {
- this.config = config;
- this.evaluator = evaluator;
- }
-
- public List<String> getOutputMessage(Event event) {
- Object[] data = event.getData();
- List<String> rets = new ArrayList<String>();
- boolean isFirst = true;
- for (Object object : data) {
- // The first field is siddhiAlertContext, skip it
- if (isFirst) {
- isFirst = false;
- continue;
- }
- String value = null;
- if (object instanceof Double) {
- value = String.valueOf((Double)object);
- }
- else if (object instanceof Integer) {
- value = String.valueOf((Integer)object);
- }
- else if (object instanceof Long) {
- value = String.valueOf((Long)object);
- }
- else if (object instanceof String) {
- value = (String)object;
- }
- else if (object instanceof Boolean) {
- value = String.valueOf((Boolean)object);
- }
- rets.add(value);
- }
- return rets;
- }
-
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- Object[] data = inEvents[0].getData();
- EagleAlertContext siddhiAlertContext = (EagleAlertContext)data[0];
- List<String> rets = getOutputMessage(inEvents[0]);
- AlertAPIEntity alert = SiddhiAlertAPIEntityRendner.render(config, rets, siddhiAlertContext, timeStamp);
- AlertExecutor alertExecutor = siddhiAlertContext.alertExecutor;
- alertExecutor.onAlerts(siddhiAlertContext, Arrays.asList(alert));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java
deleted file mode 100644
index c0ff7b0..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import java.util.Map;
-import java.util.SortedMap;
-
-import eagle.alert.entity.AlertStreamSchemaEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * convert metadata entities for a stream to stream definition for siddhi cep engine
- * define stream HeapUsage (metric string, host string, value double, timestamp long)
- */
-public class SiddhiStreamMetadataUtils {
- private final static Logger LOG = LoggerFactory.getLogger(SiddhiStreamMetadataUtils.class);
-
- public final static String EAGLE_ALERT_CONTEXT_FIELD = "eagleAlertContext";
-
- public static SortedMap<String, AlertStreamSchemaEntity> getAttrMap(String streamName) {
- SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName);
- if(map == null || map.size() == 0){
- throw new IllegalStateException("alert stream schema should never be empty");
- }
- return map;
- }
-
- /**
- * @see org.wso2.siddhi.query.api.definition.Attribute.Type
- * make sure StreamMetadataManager.init is invoked before this method
- * @param streamName
- * @return
- */
- public static String convertToStreamDef(String streamName){
- SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName);
- StringBuilder sb = new StringBuilder();
- sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object,");
- for(Map.Entry<String, AlertStreamSchemaEntity> entry : map.entrySet()){
- String attrName = entry.getKey();
- sb.append(attrName);
- sb.append(" ");
- String attrType = entry.getValue().getAttrType();
- if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){
- sb.append("string");
- }else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){
- sb.append("int");
- }else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){
- sb.append("long");
- }else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){
- sb.append("bool");
- }else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){
- sb.append("float");
- }else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){
- sb.append("double");
- }else{
- LOG.warn("AttrType is not recognized, ignore : " + attrType);
- }
- sb.append(",");
- }
- if(sb.length() > 0){
- sb.deleteCharAt(sb.length()-1);
- }
-
- String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");";
- return String.format(siddhiStreamDefFormat, sb.toString());
- }
-
- public static Object getAttrDefaultValue(String streamName, String attrName){
- SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName);
- AlertStreamSchemaEntity entity = map.get(attrName);
- if (entity.getDefaultValue() != null) {
- return entity.getDefaultValue();
- }
- else {
- String attrType = entity.getAttrType();
- if (attrType.equalsIgnoreCase(AttributeType.STRING.name())) {
- return "NA";
- } else if (attrType.equalsIgnoreCase(AttributeType.INTEGER.name()) || attrType.equalsIgnoreCase(AttributeType.LONG.name())) {
- return -1;
- } else if (attrType.equalsIgnoreCase(AttributeType.BOOL.name())) {
- return true;
- } else {
- LOG.warn("AttrType is not recognized: " + attrType + ", treat it as string");
- return "N/A";
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/StreamMetadataManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/StreamMetadataManager.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/StreamMetadataManager.java
deleted file mode 100644
index 93522de..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/StreamMetadataManager.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.entity.AlertStreamSchemaEntity;
-import com.typesafe.config.Config;
-import eagle.alert.dao.AlertStreamSchemaDAO;
-import eagle.common.config.EagleConfigConstants;
-import org.apache.commons.collections.map.UnmodifiableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * centralized memory where all stream metadata sit on, it is not mutable data
- */
-public class StreamMetadataManager {
- private static final Logger LOG = LoggerFactory.getLogger(StreamMetadataManager.class);
-
- private static StreamMetadataManager instance = new StreamMetadataManager();
- private Map<String, List<AlertStreamSchemaEntity>> map = new HashMap<String, List<AlertStreamSchemaEntity>>();
- private Map<String, SortedMap<String, AlertStreamSchemaEntity>> map2 = new HashMap<String, SortedMap<String, AlertStreamSchemaEntity>>();
- private volatile boolean initialized = false;
-
- private StreamMetadataManager(){
- }
-
- public static StreamMetadataManager getInstance(){
- return instance;
- }
-
- private void internalInit(Config config, AlertStreamSchemaDAO dao){
- try{
- String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
- List<AlertStreamSchemaEntity> list = dao.findAlertStreamSchemaByDataSource(dataSource);
- if(list == null)
- return;
- for (AlertStreamSchemaEntity entity : list) {
- String streamName = entity.getTags().get(AlertConstants.STREAM_NAME);
- if (map.get(streamName) == null) {
- map.put(streamName, new ArrayList<AlertStreamSchemaEntity>());
- map2.put(streamName, new TreeMap<String, AlertStreamSchemaEntity>());
- }
- map.get(streamName).add(entity);
- map2.get(streamName).put(entity.getTags().get(AlertConstants.ATTR_NAME), entity);
- }
- }catch(Exception ex){
- LOG.error("Fail building metadata manger", ex);
- throw new IllegalStateException(ex);
- }
- }
-
- /**
- * singleton with init would be good for unit test as well, and it ensures that
- * initialization happens only once before you use it.
- * @param config
- * @param dao
- */
- public void init(Config config, AlertStreamSchemaDAO dao){
- if(!initialized){
- synchronized(this){
- if(!initialized){
- if(LOG.isDebugEnabled()) LOG.debug("Initializing ...");
- internalInit(config, dao);
- initialized = true;
- LOG.info("Successfully initialized");
- }
- }
- }else{
- LOG.info("Already initialized, skip");
- }
- }
-
- // Only for unit test purpose
- public void reset() {
- synchronized (this) {
- initialized = false;
- map.clear();
- map2.clear();
- }
- }
-
- private void ensureInitialized(){
- if(!initialized)
- throw new IllegalStateException("StreamMetadataManager should be initialized before using it");
- }
-
- public List<AlertStreamSchemaEntity> getMetadataEntitiesForStream(String streamName){
- ensureInitialized();
- return getMetadataEntitiesForAllStreams().get(streamName);
- }
-
- public Map<String, List<AlertStreamSchemaEntity>> getMetadataEntitiesForAllStreams(){
- ensureInitialized();
- return UnmodifiableMap.decorate(map);
- }
-
- public SortedMap<String, AlertStreamSchemaEntity> getMetadataEntityMapForStream(String streamName){
- ensureInitialized();
- return getMetadataEntityMapForAllStreams().get(streamName);
- }
-
- public Map<String, SortedMap<String, AlertStreamSchemaEntity>> getMetadataEntityMapForAllStreams(){
- ensureInitialized();
- return UnmodifiableMap.decorate(map2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java
deleted file mode 100644
index e4f7b71..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package eagle.alert.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-public class ContainsIgnoreCaseExtension extends FunctionExecutor {
-
- Attribute.Type returnType = Attribute.Type.BOOL;
-
- @Override
- protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
- if (attributeExpressionExecutors.length != 2) {
- throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, " +
- "but found " + attributeExpressionExecutors.length);
- }
- if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
- }
- if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
- }
- }
-
- @Override
- protected Object execute(Object[] data) {
- if (data[0] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. First argument cannot be null");
- }
- if (data[1] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. Second argument cannot be null");
- }
- String str1 = (String)data[0];
- String str2 = (String)data[1];
- return str1.toUpperCase().contains(str2.toUpperCase());
- }
-
- @Override
- protected Object execute(Object data) {
- return null; //Since the containsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
- }
-
- @Override
- public void start() {
- //Nothing to start
- }
-
- @Override
- public void stop() {
- //Nothing to stop
- }
-
- @Override
- public Attribute.Type getReturnType() {
- return returnType;
- }
-
- @Override
- public Object[] currentState() {
- return new Object[]{};
- }
-
- @Override
- public void restoreState(Object[] state) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java
deleted file mode 100644
index 19e8b21..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package eagle.alert.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class EqualsIgnoreCaseExtension extends FunctionExecutor {
-
- Attribute.Type returnType = Attribute.Type.BOOL;
-
- @Override
- protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
- if (attributeExpressionExecutors.length != 2) {
- throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, " +
- "but found " + attributeExpressionExecutors.length);
- }
- if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
- }
- if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
- }
- }
-
- @Override
- protected Object execute(Object[] data) {
- if (data[0] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. First argument cannot be null");
- }
- if (data[1] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. Second argument cannot be null");
- }
- String str1 = (String)data[0];
- String str2 = (String)data[1];
- return str1.equalsIgnoreCase(str2);
- }
-
- @Override
- protected Object execute(Object data) {
- return null; //Since the equalsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
- }
-
- @Override
- public void start() {
- //Nothing to start
- }
-
- @Override
- public void stop() {
- //Nothing to stop
- }
-
- @Override
- public Attribute.Type getReturnType() {
- return returnType;
- }
-
- @Override
- public Object[] currentState() {
- return new Object[]{};
- }
-
- @Override
- public void restoreState(Object[] state) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
deleted file mode 100644
index a98674a..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package eagle.alert.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.extension.string.RegexpFunctionExtension;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * regexpIgnoreCase(string, regex)
- * Tells whether or not this 'string' matches the given regular expression 'regex'.
- * Accept Type(s): (STRING,STRING)
- * Return Type(s): BOOLEAN
- */
-public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension {
-
- //state-variables
- boolean isRegexConstant = false;
- String regexConstant;
- Pattern patternConstant;
-
- @Override
- protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
- if (attributeExpressionExecutors.length != 2) {
- throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, " +
- "but found " + attributeExpressionExecutors.length);
- }
- if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
- }
- if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
- }
- if(attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor){
- isRegexConstant = true;
- regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue();
- patternConstant = Pattern.compile(regexConstant, Pattern.CASE_INSENSITIVE);
- }
- }
-
- @Override
- protected Object execute(Object[] data) {
- String regex;
- Pattern pattern;
- Matcher matcher;
-
- if (data[0] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. First argument cannot be null");
- }
- if (data[1] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. Second argument cannot be null");
- }
- String source = (String) data[0];
-
- if(!isRegexConstant){
- regex = (String) data[1];
- pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
- matcher = pattern.matcher(source);
- return matcher.matches();
-
- } else {
- matcher = patternConstant.matcher(source);
- return matcher.matches();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/executor/AlertExecutor.java
deleted file mode 100644
index 4328029..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/executor/AlertExecutor.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.executor;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.config.AbstractPolicyDefinition;
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.dao.AlertStreamSchemaDAOImpl;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.common.config.EagleConfig;
-import eagle.common.config.EagleConfigConstants;
-import eagle.datastream.Collector;
-import eagle.datastream.JavaStormStreamExecutor2;
-import eagle.datastream.Tuple2;
-import eagle.metric.CountingMetric;
-import eagle.metric.Metric;
-import eagle.metric.report.EagleSerivceMetricReport;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import eagle.alert.policy.*;
-import eagle.alert.siddhi.EagleAlertContext;
-import eagle.alert.siddhi.SiddhiAlertHandler;
-import eagle.alert.siddhi.StreamMetadataManager;
-import eagle.dataproc.core.JsonSerDeserUtils;
-import eagle.dataproc.core.ValuesArray;
-import org.apache.commons.lang3.time.DateUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(AlertExecutor.class);
-
- private String alertExecutorId;
- private volatile CopyOnWriteHashMap<String, PolicyEvaluator> policyEvaluators;
- private PolicyPartitioner partitioner;
- private int numPartitions;
- private int partitionSeq;
- private Config config;
- private Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
- private AlertDefinitionDAO alertDefinitionDao;
- private String[] sourceStreams;
- private static String EAGLE_EVENT_COUNT = "eagle.event.count";
- private static String EAGLE_POLICY_EVAL_COUNT = "eagle.policy.eval.count";
- private static String EAGLE_POLICY_EVAL_FAIL_COUNT = "eagle.policy.eval.fail.count";
- private static String EAGLE_ALERT_COUNT = "eagle.alert.count";
- private static String EAGLE_ALERT_FAIL_COUNT = "eagle.alert.fail.count";
- private static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE;
- private Map<String, Metric> metricMap; // metricMap's key = metricName[#policyId]
- private Map<String, Map<String, String>> dimensionsMap; // cache it for performance
- private Map<String, String> baseDimensions;
- private Thread metricReportThread;
- private EagleSerivceMetricReport metricReport;
-
- public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
- AlertDefinitionDAO alertDefinitionDao, String[] sourceStreams){
- this.alertExecutorId = alertExecutorId;
- this.partitioner = partitioner;
- this.numPartitions = numPartitions;
- this.partitionSeq = partitionSeq;
- this.alertDefinitionDao = alertDefinitionDao;
- this.sourceStreams = sourceStreams;
- }
-
- public String getAlertExecutorId(){
- return this.alertExecutorId;
- }
-
- public int getNumPartitions() {
- return this.numPartitions;
- }
-
- public int getPartitionSeq(){
- return this.partitionSeq;
- }
-
- public PolicyPartitioner getPolicyPartitioner() {
- return this.partitioner;
- }
-
- public Map<String, Map<String, AlertDefinitionAPIEntity>> getInitialAlertDefs() {
- return this.initialAlertDefs;
- }
-
- public AlertDefinitionDAO getAlertDefinitionDao() {
- return alertDefinitionDao;
- }
-
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- public void initMetricReportor() {
- String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
- String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) ?
- config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
- String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ?
- config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
- this.metricReport = new EagleSerivceMetricReport(eagleServiceHost, eagleServicePort, username, password);
-
- metricMap = new ConcurrentHashMap<String, Metric>();
- baseDimensions = new HashMap<String, String>();
- baseDimensions.put(AlertConstants.ALERT_EXECUTOR_ID, alertExecutorId);
- baseDimensions.put(AlertConstants.PARTITIONSEQ, String.valueOf(partitionSeq));
- baseDimensions.put(AlertConstants.SOURCE, ManagementFactory.getRuntimeMXBean().getName());
- baseDimensions.put(EagleConfigConstants.DATA_SOURCE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE));
- baseDimensions.put(EagleConfigConstants.SITE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE));
-
- dimensionsMap = new HashMap<String, Map<String, String>>();
- this.metricReportThread = new Thread() {
- @Override
- public void run() {
- runMetricReporter();
- }
- };
- this.metricReportThread.setDaemon(true);
- metricReportThread.start();
- }
-
- @Override
- public void init() {
- // initialize StreamMetadataManager before it is used
- StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAOImpl(config));
- // for each AlertDefinition, to create a PolicyEvaluator
- Map<String, PolicyEvaluator> tmpPolicyEvaluators = new HashMap<String, PolicyEvaluator>();
-
- String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
- String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
- try {
- initialAlertDefs = alertDefinitionDao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
- }
- catch (Exception ex) {
- LOG.error("fail to initialize initialAlertDefs: ", ex);
- throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
- }
- if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
- LOG.warn("No alert definitions was found for site: " + site + ", dataSource: " + dataSource);
- }
- else if (initialAlertDefs.get(alertExecutorId) != null) {
- for(AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()){
- int part = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID));
- if (part == partitionSeq) {
- tmpPolicyEvaluators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), createPolicyEvaluator(alertDef));
- }
- }
- }
-
- policyEvaluators = new CopyOnWriteHashMap<>();
- // for efficency, we don't put single policy evaluator
- policyEvaluators.putAll(tmpPolicyEvaluators);
- DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
-
- policyLoader.init(initialAlertDefs, alertDefinitionDao, config);
- policyLoader.addPolicyChangeListener(alertExecutorId + "_" + partitionSeq, this);
- LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions);
- LOG.info("All policy evaluators: " + policyEvaluators);
-
- initMetricReportor();
- }
-
- /**
- * Create PolicyEvaluator instance according to policyType-mapped policy evaluator class
- *
- * @param alertDef alert definition
- * @return PolicyEvaluator instance
- */
- private PolicyEvaluator createPolicyEvaluator(AlertDefinitionAPIEntity alertDef){
- String policyType = alertDef.getTags().get(AlertConstants.POLICY_TYPE);
- Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType);
- if(evalCls == null){
- String msg = "No policy evaluator defined for policy type : " + policyType;
- LOG.error(msg);
- throw new IllegalStateException(msg);
- }
-
- // check out whether strong incoming data validation is necessary
- String needValidationConfigKey= AlertConstants.ALERT_EXECUTOR_CONFIGS + "." + alertExecutorId + ".needValidation";
-
- // Default: true
- boolean needValidation = !config.hasPath(needValidationConfigKey) || config.getBoolean(needValidationConfigKey);
-
- AbstractPolicyDefinition policyDef = null;
- try {
- policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(policyType));
- } catch (Exception ex) {
- LOG.error("Fail initial alert policy def: "+alertDef.getPolicyDef(), ex);
- }
- PolicyEvaluator pe;
- try{
- // Create evaluator instances
- pe = evalCls.getConstructor(Config.class, String.class, AbstractPolicyDefinition.class, String[].class, boolean.class).newInstance(config, alertDef.getTags().get("policyId"), policyDef, sourceStreams, needValidation);
- }catch(Exception ex){
- LOG.error("Fail creating new policyEvaluator", ex);
- LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef());
- throw new IllegalStateException(ex);
- }
- return pe;
- }
-
- /**
- * verify both alertExecutor logic name and partition id
- * @param alertDef alert definition
- *
- * @return whether accept the alert definition
- */
- private boolean accept(AlertDefinitionAPIEntity alertDef){
- if(!alertDef.getTags().get("alertExecutorId").equals(alertExecutorId)) {
- if(LOG.isDebugEnabled()){
- LOG.debug("alertDef does not belong to this alertExecutorId : " + alertExecutorId + ", alertDef : " + alertDef);
- }
- return false;
- }
- int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID));
- if(targetPartitionSeq == partitionSeq)
- return true;
- return false;
- }
-
- public long trim(long value, long granularity) {
- return value / granularity * granularity;
- }
-
- public void runMetricReporter() {
- while(true) {
- try {
- long current = System.currentTimeMillis();
- List<Metric> metricList = new ArrayList<Metric>();
- synchronized (this.metricMap) {
- for (Entry<String, Metric> entry : metricMap.entrySet()) {
- String name = entry.getKey();
- Metric metric = entry.getValue();
- long previous = metric.getTimestamp();
- if (current > previous + MERITE_GRANULARITY) {
- metricList.add(metric);
- metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDemensions(), metric.getMetricName()));
- }
- }
- }
- if (metricList.size() > 0) {
- LOG.info("Going to persist alert metrics, size: " + metricList.size());
- metricReport.emit(metricList);
- }
- try {
- Thread.sleep(MERITE_GRANULARITY / 2);
- } catch (InterruptedException ex) { /* Do nothing */ }
- }
- catch (Throwable t) {
- LOG.error("Got a throwable in metricReporter " , t);
- }
- }
- }
-
- public void updateCounter(String name, Map<String, String> dimensions, double value) {
- long current = System.currentTimeMillis();
- synchronized (metricMap) {
- if (metricMap.get(name) == null) {
- String metricName = name.split("#")[0];
- metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), dimensions, metricName));
- }
- metricMap.get(name).update(value);
- }
- }
-
- public void updateCounter(String name, Map<String, String> dimensions) {
- updateCounter(name, dimensions, 1.0);
- }
-
- public Map<String, String> getDimensions(String policyId) {
- if (dimensionsMap.get(policyId) == null) {
- Map<String, String> newDimensions = new HashMap<String, String>(baseDimensions);
- newDimensions.put(AlertConstants.POLICY_ID, policyId);
- dimensionsMap.put(policyId, newDimensions);
- }
- return dimensionsMap.get(policyId);
- }
-
- public String getMetricKey(String metricName, String policy) {
- return metricName + "#" + policy;
- }
-
- /**
- * within this single executor, execute all PolicyEvaluator sequentially
- * the contract for input:
- * 1. total # of fields for input is 3, which is fixed
- * 2. the first field is key
- * 3. the second field is stream name
- * 4. the third field is value which is java SortedMap
- */
- @Override
- public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector){
- if(input.size() != 3)
- throw new IllegalStateException("AlertExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)");
- if(LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2));
- if(LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + policyEvaluators.keySet().toString());
-
- updateCounter(EAGLE_EVENT_COUNT, baseDimensions);
- try{
- synchronized(this.policyEvaluators) {
- for(Entry<String, PolicyEvaluator> entry : policyEvaluators.entrySet()){
- String policyId = entry.getKey();
- PolicyEvaluator evaluator = entry.getValue();
- updateCounter(getMetricKey(EAGLE_POLICY_EVAL_COUNT, policyId), getDimensions(policyId));
- try {
- EagleAlertContext siddhiAlertContext = new EagleAlertContext();
- siddhiAlertContext.alertExecutor = this;
- siddhiAlertContext.policyId = policyId;
- siddhiAlertContext.evaluator = evaluator;
- siddhiAlertContext.outputCollector = outputCollector;
- evaluator.evaluate(new ValuesArray(siddhiAlertContext, input.get(1), input.get(2)));
- }
- catch (Exception ex) {
- LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex);
- updateCounter(getMetricKey(EAGLE_POLICY_EVAL_FAIL_COUNT, policyId), getDimensions(policyId));
- }
- }
- }
- } catch(Exception ex){
- LOG.error(alertExecutorId + ", partition " + partitionSeq + ", error fetching alerts, but continue to run", ex);
- updateCounter(EAGLE_ALERT_FAIL_COUNT, baseDimensions);
- }
- }
-
- @Override
- public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
- if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy added : " + added + " policyEvaluators " + policyEvaluators);
- for(AlertDefinitionAPIEntity alertDef : added.values()){
- if(!accept(alertDef))
- continue;
- LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really added " + alertDef);
- PolicyEvaluator newEvaluator = createPolicyEvaluator(alertDef);
- if(newEvaluator != null){
- synchronized(this.policyEvaluators) {
- policyEvaluators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), newEvaluator);
- }
- }
- }
- }
-
- @Override
- public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
- if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy changed : " + changed);
- for(AlertDefinitionAPIEntity alertDef : changed.values()){
- if(!accept(alertDef))
- continue;
- LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really changed " + alertDef);
- synchronized(this.policyEvaluators) {
- PolicyEvaluator pe = policyEvaluators.get(alertDef.getTags().get(AlertConstants.POLICY_ID));
- pe.onPolicyUpdate(alertDef);
- }
- }
- }
-
- @Override
- public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
- if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy deleted : " + deleted);
- for(AlertDefinitionAPIEntity alertDef : deleted.values()){
- if(!accept(alertDef))
- continue;
- LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really deleted " + alertDef);
- String policyId = alertDef.getTags().get(AlertConstants.POLICY_ID);
- synchronized(this.policyEvaluators) {
- if (policyEvaluators.containsKey(policyId)) {
- PolicyEvaluator pe = policyEvaluators.remove(alertDef.getTags().get(AlertConstants.POLICY_ID));
- pe.onPolicyDelete();
- }
- }
- }
- }
-
- @Override
- public void onAlerts(EagleAlertContext context, List<AlertAPIEntity> alerts) {
- if(alerts != null && !alerts.isEmpty()){
- String policyId = context.policyId;
- LOG.info(String.format("Detected %s alerts for policy %s",alerts.size(),policyId));
- Collector outputCollector = context.outputCollector;
- PolicyEvaluator evaluator = context.evaluator;
- updateCounter(getMetricKey(EAGLE_ALERT_COUNT, policyId), getDimensions(policyId), alerts.size());
- for (AlertAPIEntity entity : alerts) {
- synchronized(this) {
- outputCollector.collect(new Tuple2(policyId, entity));
- }
- if(LOG.isDebugEnabled()) LOG.debug("A new alert is triggered: "+alertExecutorId + ", partition " + partitionSeq + ", Got an alert with output context: " + entity.getAlertContext() + ", for policy " + evaluator);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/executor/AlertExecutorCreationUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/executor/AlertExecutorCreationUtils.java
deleted file mode 100644
index 2f68dc9..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/executor/AlertExecutorCreationUtils.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.executor;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.dao.*;
-import eagle.alert.entity.AlertExecutorEntity;
-import eagle.alert.policy.DefaultPolicyPartitioner;
-import eagle.alert.policy.PolicyPartitioner;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-import eagle.common.config.EagleConfigConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
- *
- * <br/><br/>
- * Explanations for programId, alertExecutorId and policy<br/><br/>
- * - programId - distributed or single-process program for example one storm topology<br/>
- * - alertExecutorId - one process/thread which executes multiple policies<br/>
- * - policy - some rules to be evaluated<br/>
- *
- * <br/>
- *
- * Normally the mapping is like following:
- * <pre>
- * programId (1:N) alertExecutorId
- * alertExecutorId (1:N) policy
- * </pre>
- */
-public class AlertExecutorCreationUtils {
- private final static Logger LOG = LoggerFactory.getLogger(AlertExecutorCreationUtils.class);
-
- public static AlertExecutor[] createAlertExecutors(Config config, String alertExecutorId) throws Exception{
- // Read site and dataSource from configuration.
- String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
- LOG.info("Loading alerting definitions for dataSource: " + dataSource);
-
- // Get map from alertExecutorId to alert stream
- // (dataSource) => Map[alertExecutorId:String,streamName:List[String]]
- List<String> streamNames = new ArrayList<String>();
- AlertExecutorDAOImpl alertExecutorDAO = new AlertExecutorDAOImpl(config);
- List<AlertExecutorEntity> alertExecutorEntities = alertExecutorDAO.findAlertExecutor(dataSource, alertExecutorId);
- for(AlertExecutorEntity entity : alertExecutorEntities){
- streamNames.add(entity.getTags().get(AlertConstants.STREAM_NAME));
- }
-
- if(streamNames.isEmpty()){
- throw new IllegalStateException("upstream names should not be empty for alert " + alertExecutorId);
- }
- return createAlertExecutors(config, new AlertDefinitionDAOImpl(config),
- streamNames, alertExecutorId);
- }
-
- /**
- * Build DAG Tasks based on persisted alert definition and schemas from eagle store.
- *
- * <h3>Require configuration:</h3>
- *
- * <ul>
- * <li>eagleProps.site: program site id.</li>
- * <li>eagleProps.dataSource: program data source.</li>
- * <li>alertExecutorConfigs: only configured executor will be built into execution tasks.</li>
- * </ul>
- *
- * <h3>Steps:</h3>
- *
- * <ol>
- * <li>(upstreamTasks) => Map[streamName:String,upstreamTask:Task]</li>
- * <li>(dataSource) => Map[alertExecutorId:String,streamName:List[String]]</li>
- * <li>(site,dataSource) => Map[alertExecutorId,Map[policyId,alertDefinition]]</li>
- * <li>(config["alertExecutorConfigs"]) => AlertExecutor(alertExecutorID, partitioner, numPartitions, partitionSeq, alertDefs, alertDefDAO, sourceStreams)[]</li>
- * </ol>
- */
- public static AlertExecutor[] createAlertExecutors(Config config, AlertDefinitionDAO alertDefDAO,
- List<String> streamNames, String alertExecutorId) throws Exception{
- // Read `alertExecutorConfigs` from configuration and get config for this alertExecutorId
- int numPartitions =1;
- String partitionerCls = DefaultPolicyPartitioner.class.getCanonicalName();
- String alertExecutorConfigsKey = "alertExecutorConfigs";
- if(config.hasPath(alertExecutorConfigsKey)) {
- Map<String, ConfigValue> alertExecutorConfigs = config.getObject(alertExecutorConfigsKey);
- if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorConfigs)) {
- Map<String, Object> alertExecutorConfig = (Map<String, Object>) alertExecutorConfigs.get(alertExecutorId).unwrapped();
- int parts = 0;
- if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism"));
- numPartitions = parts == 0 ? 1 : parts;
- if(alertExecutorConfig.containsKey("partitioner")) partitionerCls = (String) alertExecutorConfig.get("partitioner");
- }
- }
-
- return createAlertExecutors(alertDefDAO, streamNames, alertExecutorId, numPartitions, partitionerCls);
- }
-
- /**
- * Build alert executors and assign alert definitions between these executors by partitioner (alertExecutorConfigs["${alertExecutorId}"]["partitioner"])
- */
- public static AlertExecutor[] createAlertExecutors(AlertDefinitionDAO alertDefDAO, List<String> sourceStreams,
- String alertExecutorID, int numPartitions, String partitionerCls) throws Exception{
- LOG.info("Creating alert executors with alertExecutorID: " + alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: "+ partitionerCls);
-
- PolicyPartitioner partitioner = (PolicyPartitioner)Class.forName(partitionerCls).newInstance();
- AlertExecutor[] alertExecutors = new AlertExecutor[numPartitions];
- String[] _sourceStreams = sourceStreams.toArray(new String[0]);
-
- for(int i = 0; i < numPartitions; i++){
- alertExecutors[i] = new AlertExecutor(alertExecutorID, partitioner, numPartitions, i, alertDefDAO,_sourceStreams);
- }
- return alertExecutors;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/AbstractPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/AbstractPolicyDefinition.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/AbstractPolicyDefinition.java
new file mode 100644
index 0000000..9a9a9e2
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/AbstractPolicyDefinition.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.config;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ * base fields for all policy definition
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AbstractPolicyDefinition {
+ private String type;
+ /**
+ * @return type in string
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * @param type set type value
+ */
+ public void setType(String type) {
+ this.type = type;
+ }
+}
\ No newline at end of file