You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/14 06:23:02 UTC
[02/13] incubator-eagle git commit: EAGLE-341 clean inner process
alert engine code clean inner process alert engine code
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
deleted file mode 100644
index 30d2179..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.executor;
-
-import com.codahale.metrics.MetricRegistry;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.time.DateUtils;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.metric.reportor.EagleCounterMetric;
-import org.apache.eagle.metric.reportor.EagleMetricListener;
-import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener;
-import org.apache.eagle.metric.reportor.MetricKeyCodeDecoder;
-import org.apache.eagle.policy.*;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * The stream process executor based on two types
- * @since Dec 17, 2015
- *
- * @param <T> - The policy definition entity type
- * @param <K> - The stream entity type
- */
-public abstract class PolicyProcessExecutor<T extends AbstractPolicyDefinitionEntity, K>
- extends JavaStormStreamExecutor2<String, K>
- implements PolicyLifecycleMethods<T>, PolicyDistributionReportMethods, IPolicyExecutor<T, K>
-{
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(PolicyProcessExecutor.class);
-
- public static final String EAGLE_EVENT_COUNT = "eagle.event.count";
- public static final String EAGLE_POLICY_EVAL_COUNT = "eagle.policy.eval.count";
- public static final String EAGLE_POLICY_EVAL_FAIL_COUNT = "eagle.policy.eval.fail.count";
- public static final String EAGLE_ALERT_COUNT = "eagle.alert.count";
- public static final String EAGLE_ALERT_FAIL_COUNT = "eagle.alert.fail.count";
-
- private static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE;
-
- private final Class<T> policyDefinitionClz;
- private String executorId;
- private volatile CopyOnWriteHashMap<String, PolicyEvaluator<T>> policyEvaluators;
- private PolicyPartitioner partitioner;
- private int numPartitions;
- private int partitionSeq;
- private Config config;
- private Map<String, Map<String, T>> initialAlertDefs;
- private String[] sourceStreams;
-
- /**
- * metricMap's key = metricName[#policyId]
- */
- private Map<String, Map<String, String>> dimensionsMap; // cache it for performance
- private Map<String, String> baseDimensions;
-
- private MetricRegistry registry;
- private EagleMetricListener listener;
-
- private PolicyDefinitionDAO<T> policyDefinitionDao;
-
- public PolicyProcessExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
- PolicyDefinitionDAO<T> alertDefinitionDao, String[] sourceStreams, Class<T> clz){
- this.executorId = alertExecutorId;
- this.partitioner = partitioner;
- this.numPartitions = numPartitions;
- this.partitionSeq = partitionSeq;
- this.policyDefinitionDao = alertDefinitionDao;
- this.sourceStreams = sourceStreams;
- this.policyDefinitionClz = clz;
- }
-
- public String getExecutorId(){
- return this.executorId;
- }
-
- public int getNumPartitions() {
- return this.numPartitions;
- }
-
- public int getPartitionSeq(){
- return this.partitionSeq;
- }
-
- public PolicyPartitioner getPolicyPartitioner() {
- return this.partitioner;
- }
-
- public Map<String, Map<String, T>> getInitialAlertDefs() {
- return this.initialAlertDefs;
- }
-
- public PolicyDefinitionDAO<T> getPolicyDefinitionDao() {
- return policyDefinitionDao;
- }
-
- public Map<String, PolicyEvaluator<T>> getPolicyEvaluators(){
- return policyEvaluators;
- }
-
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- private void initMetricReportor() {
- String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
- String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) ?
- config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
- String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ?
- config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
-
- registry = new MetricRegistry();
- listener = new EagleServiceReporterMetricListener(host, port, username, password);
-
- baseDimensions = new HashMap<>();
- baseDimensions = new HashMap<String, String>();
- baseDimensions.put(Constants.ALERT_EXECUTOR_ID, executorId);
- baseDimensions.put(Constants.PARTITIONSEQ, String.valueOf(partitionSeq));
- baseDimensions.put(Constants.SOURCE, ManagementFactory.getRuntimeMXBean().getName());
- baseDimensions.put(EagleConfigConstants.APPLICATION, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION));
- baseDimensions.put(EagleConfigConstants.SITE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE));
-
- dimensionsMap = new HashMap<String, Map<String, String>>();
- }
-
- public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
- return new AlertStreamSchemaDAOImpl(config);
- }
-
- @Override
- public void init() {
- // initialize StreamMetadataManager before it is used
- StreamMetadataManager.getInstance().init(config, getAlertStreamSchemaDAO(config));
- // for each AlertDefinition, to create a PolicyEvaluator
- Map<String, PolicyEvaluator<T>> tmpPolicyEvaluators = new HashMap<String, PolicyEvaluator<T>>();
-
- String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
- String application = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION);
- try {
- initialAlertDefs = policyDefinitionDao.findActivePoliciesGroupbyExecutorId(site, application);
- }
- catch (Exception ex) {
- LOG.error("fail to initialize initialAlertDefs: ", ex);
- throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
- }
- if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
- LOG.warn("No alert definitions was found for site: " + site + ", application: " + application);
- }
- else if (initialAlertDefs.get(executorId) != null) {
- for(T alertDef : initialAlertDefs.get(executorId).values()){
- int part = partitioner.partition(numPartitions, alertDef.getTags().get(Constants.POLICY_TYPE), alertDef.getTags().get(Constants.POLICY_ID));
- if (part == partitionSeq) {
- tmpPolicyEvaluators.put(alertDef.getTags().get(Constants.POLICY_ID), createPolicyEvaluator(alertDef));
- }
- }
- }
-
- policyEvaluators = new CopyOnWriteHashMap<>();
- // for efficiency, we don't put single policy evaluator
- policyEvaluators.putAll(tmpPolicyEvaluators);
- DynamicPolicyLoader<T> policyLoader = DynamicPolicyLoader.getInstanceOf(policyDefinitionClz);
- policyLoader.init(initialAlertDefs, policyDefinitionDao, config);
- String fullQualifiedAlertExecutorId = executorId + "_" + partitionSeq;
- policyLoader.addPolicyChangeListener(fullQualifiedAlertExecutorId, this);
- policyLoader.addPolicyDistributionReporter(fullQualifiedAlertExecutorId, this);
- LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions);
- LOG.info("All policy evaluators: " + policyEvaluators);
-
- initMetricReportor();
- }
-
- /**
- * Create PolicyEvaluator instance according to policyType-mapped policy evaluator class
- *
- * @param alertDef alert definition
- * @return PolicyEvaluator instance
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- protected PolicyEvaluator<T> createPolicyEvaluator(T alertDef){
- String policyType = alertDef.getTags().get(Constants.POLICY_TYPE);
- Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType);
- if(evalCls == null){
- String msg = "No policy evaluator defined for policy type : " + policyType;
- LOG.error(msg);
- throw new IllegalStateException(msg);
- }
-
- // check out whether strong incoming data validation is necessary
- String needValidationConfigKey= Constants.ALERT_EXECUTOR_CONFIGS + "." + executorId + ".needValidation";
-
- // Default: true
- boolean needValidation = !config.hasPath(needValidationConfigKey) || config.getBoolean(needValidationConfigKey);
-
- AbstractPolicyDefinition policyDef = null;
- PolicyEvaluator<T> pe;
- try {
- policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class,
- PolicyManager.getInstance().getPolicyModules(policyType));
-
- PolicyEvaluationContext<T, K> context = new PolicyEvaluationContext<>();
- context.policyId = alertDef.getTags().get("policyId");
- context.alertExecutor = this;
- context.resultRender = this.getResultRender();
- // create evaluator instance
- pe = (PolicyEvaluator<T>) evalCls
- .getConstructor(Config.class, PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
- .newInstance(config, context, policyDef, sourceStreams, needValidation);
- if (pe.isMarkdownEnabled()) // updating markdown details only if the policy is found invalid
- updateMarkdownDetails(alertDef, pe.isMarkdownEnabled(), pe.getMarkdownReason());
- } catch(Exception ex) {
- LOG.error("Fail creating new policyEvaluator", ex);
- LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef());
- throw new IllegalStateException(ex);
- }
- return pe;
- }
-
- /**
- * verify both alertExecutor logic name and partition id
- * @param alertDef alert definition
- *
- * @return whether accept the alert definition
- */
- private boolean accept(T alertDef){
- String executorID = alertDef.getTags().containsKey("executorId") ? alertDef.getTags().get("executorId")
- : alertDef.getTags().get("alertExecutorId");
-
- if(!executorID.equals(executorId)) {
- if(LOG.isDebugEnabled()){
- LOG.debug("alertDef does not belong to this alertExecutorId : " + executorId + ", alertDef : " + alertDef);
- }
- return false;
- }
- int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(Constants.POLICY_TYPE), alertDef.getTags().get(Constants.POLICY_ID));
- if(targetPartitionSeq == partitionSeq)
- return true;
- return false;
- }
-
- private void updateCounter(String name, Map<String, String> dimensions, double value) {
- long current = System.currentTimeMillis();
- String metricName = MetricKeyCodeDecoder.codeMetricKey(name, dimensions);
- if (registry.getMetrics().get(metricName) == null) {
- EagleCounterMetric metric = new EagleCounterMetric(current, metricName, value, MERITE_GRANULARITY);
- metric.registerListener(listener);
- registry.register(metricName, metric);
- } else {
- EagleCounterMetric metric = (EagleCounterMetric) registry.getMetrics().get(metricName);
- metric.update(value, current);
- // TODO: need remove unused metric from registry
- }
- }
-
- private void updateCounter(String name, Map<String, String> dimensions) {
- updateCounter(name, dimensions, 1.0);
- }
-
- protected Map<String, String> getDimensions(String policyId) {
- if (dimensionsMap.get(policyId) == null) {
- Map<String, String> newDimensions = new HashMap<String, String>(baseDimensions);
- newDimensions.put(Constants.POLICY_ID, policyId);
- dimensionsMap.put(policyId, newDimensions);
- }
- return dimensionsMap.get(policyId);
- }
-
- /**
- * within this single executor, execute all PolicyEvaluator sequentially
- * the contract for input:
- * 1. total # of fields for input is 3, which is fixed
- * 2. the first field is key
- * 3. the second field is stream name
- * 4. the third field is value which is java SortedMap
- */
- @Override
- public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, K>> outputCollector){
- if(input.size() != 3)
- throw new IllegalStateException("AlertExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)");
- if(LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2));
- if(LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + policyEvaluators.keySet().toString());
-
- updateCounter(EAGLE_EVENT_COUNT, baseDimensions);
- try{
- synchronized(this.policyEvaluators) {
- for(Entry<String, PolicyEvaluator<T>> entry : policyEvaluators.entrySet()){
- String policyId = entry.getKey();
- PolicyEvaluator<T> evaluator = entry.getValue();
- if (!evaluator.isMarkdownEnabled()) { // not evaluated for a marked down policy
- updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(policyId));
- try {
- evaluator.evaluate(new ValuesArray(outputCollector, input.get(1), input.get(2)));
- } catch (Exception ex) {
- LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex);
- updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(policyId));
- }
- }
- }
- }
- } catch(Exception ex){
- LOG.error(executorId + ", partition " + partitionSeq + ", error fetching alerts, but continue to run", ex);
- updateCounter(EAGLE_ALERT_FAIL_COUNT, baseDimensions);
- }
- }
-
- @Override
- public void onPolicyCreated(Map<String, T> added) {
- if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " + partitionSeq + " policy added : " + added + " policyEvaluators " + policyEvaluators);
- for(T alertDef : added.values()){
- if(!accept(alertDef))
- continue;
- LOG.info(executorId + ", partition " + partitionSeq + " policy really added " + alertDef);
- PolicyEvaluator<T> newEvaluator = createPolicyEvaluator(alertDef);
- if(newEvaluator != null){
- synchronized(this.policyEvaluators) {
- policyEvaluators.put(alertDef.getTags().get(Constants.POLICY_ID), newEvaluator);
- }
- }
- }
- }
-
- @Override
- public void onPolicyChanged(Map<String, T> changed) {
- if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " + partitionSeq + " policy changed : " + changed);
- for(T alertDef : changed.values()){
- if(!accept(alertDef))
- continue;
- LOG.info(executorId + ", partition " + partitionSeq + " policy really changed " + alertDef);
- synchronized(this.policyEvaluators) {
- PolicyEvaluator<T> pe = policyEvaluators.get(alertDef.getTags().get(Constants.POLICY_ID));
- boolean previousMarkdown = pe.isMarkdownEnabled();
- String previousMarkdownReason = pe.getMarkdownReason();
- pe.onPolicyUpdate(alertDef);
- if (isMarkdownUpdateRequired(previousMarkdown, pe.isMarkdownEnabled(), previousMarkdownReason, pe.getMarkdownReason()))
- updateMarkdownDetails(alertDef, pe.isMarkdownEnabled(), pe.getMarkdownReason());
- }
- }
- }
-
- @Override
- public void onPolicyDeleted(Map<String, T> deleted) {
- if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " + partitionSeq + " policy deleted : " + deleted);
- for(T alertDef : deleted.values()){
- if(!accept(alertDef))
- continue;
- LOG.info(executorId + ", partition " + partitionSeq + " policy really deleted " + alertDef);
- String policyId = alertDef.getTags().get(Constants.POLICY_ID);
- synchronized(this.policyEvaluators) {
- if (policyEvaluators.containsKey(policyId)) {
- PolicyEvaluator<T> pe = policyEvaluators.remove(alertDef.getTags().get(Constants.POLICY_ID));
- pe.onPolicyDelete();
- }
- }
- }
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public void onEvalEvents(PolicyEvaluationContext<T, K> context, List<K> alerts) {
- if(alerts != null && !alerts.isEmpty()){
- String policyId = context.policyId;
- LOG.info(String.format("Detected %d alerts for policy %s", alerts.size(), policyId));
- Collector outputCollector = context.outputCollector;
- PolicyEvaluator<T> evaluator = context.evaluator;
- updateCounter(EAGLE_ALERT_COUNT, getDimensions(policyId), alerts.size());
- for (K entity : alerts) {
- synchronized(this) {
- outputCollector.collect(new Tuple2(policyId, entity));
- }
- if(LOG.isDebugEnabled()) LOG.debug("A new alert is triggered: " + executorId + ", partition " + partitionSeq + ", Got an alert with output context: " + entity + ", for policy " + evaluator);
- }
- }
- }
-
- public abstract ResultRender<T, K> getResultRender();
-
- @Override
- public void report() {
- PolicyDistroStatsLogReporter appender = new PolicyDistroStatsLogReporter();
- appender.reportPolicyMembership(executorId + "_" + partitionSeq, policyEvaluators.keySet());
- }
-
- /**
- * Method to check if updating markdown details in DB is required.
- * @return boolean: If markdown details needs to be updated for the policy
- */
- private boolean isMarkdownUpdateRequired (boolean previousMarkdown, boolean presentMarkdown, String previousReason, String presentReason) {
- boolean isUpdateRequired = true;
- if (!previousMarkdown && !presentMarkdown) { // not updating when previous/present policies are both valid
- isUpdateRequired = false;
- } else if (previousMarkdown && presentMarkdown) {
- if (previousReason.equals(presentReason))
- isUpdateRequired = false; // not updating when there is no change with the markdown reason
- }
- return isUpdateRequired;
- }
-
- /**
- * Method to invoke Eagle Service call to update the markdown details for the policy.
- */
- private void updateMarkdownDetails(T entity, boolean markdownEnabled, String markdownReason) {
- AlertDefinitionAPIEntity alertEntity = (AlertDefinitionAPIEntity) entity;
- alertEntity.setMarkdownEnabled(markdownEnabled);
- alertEntity.setMarkdownReason(null != markdownReason ? markdownReason : "");
- policyDefinitionDao.updatePolicyDetails(entity);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java
deleted file mode 100644
index 22506aa..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Siddhi stream call back
- *
- * Created on 1/20/16.
- */
-public class SiddhiOutputStreamCallback<T extends AbstractPolicyDefinitionEntity, K> extends StreamCallback {
-
- public static final Logger LOG = LoggerFactory.getLogger(SiddhiOutputStreamCallback.class);
-
- private SiddhiPolicyEvaluator<T, K> evaluator;
- public Config config;
-
- public SiddhiOutputStreamCallback(Config config, SiddhiPolicyEvaluator<T, K> evaluator) {
- this.config = config;
- this.evaluator = evaluator;
- }
-
- @Override
- public void receive(Event[] events) {
- long timeStamp = System.currentTimeMillis();
- List<K> alerts = new LinkedList<>();
- PolicyEvaluationContext<T, K> siddhiContext = null;
-
- for (Event event : events) {
- Object[] data = event.getData();
- List<Object> returns = SiddhiQueryCallbackImpl.getOutputObject(event.getData());
- K alert = siddhiContext.resultRender.render(config, returns, siddhiContext, timeStamp);
- alerts.add(alert);
-
- if (siddhiContext == null) {
- siddhiContext = (PolicyEvaluationContext<T, K>) data[0];
- }
- }
-
- if (siddhiContext != null) {
- siddhiContext.alertExecutor.onEvalEvents(siddhiContext, alerts);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
deleted file mode 100644
index 639c15b..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-
-/**
- * siddhi policy definition has the following format
- * {
- "type":"SiddhiCEPEngine",
- "expression" : "from every b1=HeapUsage[metric == 'eagle.metric.gc'] -> a1=FullGCEvent[eventName == 'full gc'] -> b2=HeapUsage[metric == b1.metric and host == b1.host and value >= b1.value * 0.8] within 100 sec select a1.eventName, b1.metric, b2.timestamp, 60 as timerange insert into GCMonitor; "
- }
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class SiddhiPolicyDefinition extends AbstractPolicyDefinition {
- private String expression;
-
- private boolean containsDefinition;
-
- public boolean isContainsDefinition() {
- return containsDefinition;
- }
-
- public void setContainsDefinition(boolean containsDefinition) {
- this.containsDefinition = containsDefinition;
- }
-
- public String getExpression() {
- return expression;
- }
- public void setExpression(String expression) {
- this.expression = expression;
- }
-
- @Override
- public String toString(){
- return expression;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
deleted file mode 100644
index cbee286..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import com.lmax.disruptor.ExceptionHandler;
-import com.lmax.disruptor.FatalExceptionHandler;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyManager;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-import org.wso2.siddhi.query.compiler.exception.SiddhiParserException;
-
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.*;
-
-/**
- * when policy is updated or deleted, SiddhiManager.shutdown should be invoked to release resources.
- * during this time, synchronization is important
- */
-public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K> implements PolicyEvaluator<T> {
-
- private final static String EXECUTION_PLAN_NAME = "query";
- private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyEvaluator.class);
-
- private volatile SiddhiRuntime siddhiRuntime;
- private final String[] sourceStreams;
- private final boolean needValidation;
- private final Config config;
- private final PolicyEvaluationContext<T, K> context;
-
- /**
- * everything dependent on policyDef should be together and switched in runtime
- */
- public static class SiddhiRuntime {
- QueryCallback queryCallback;
- Map<String, InputHandler> siddhiInputHandlers;
- SiddhiManager siddhiManager;
- SiddhiPolicyDefinition policyDef;
- List<String> outputFields;
- String executionPlanName;
- boolean markdownEnabled;
- String markdownReason;
- }
-
- public SiddhiPolicyEvaluator(Config config, PolicyEvaluationContext<T, K> context, AbstractPolicyDefinition policyDef, String[] sourceStreams) {
- this(config, context, policyDef, sourceStreams, false);
- }
-
- public SiddhiPolicyEvaluator(Config config, PolicyEvaluationContext<T, K> context, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation) {
- this.config = config;
- this.context = context;
- this.context.evaluator = this;
- this.needValidation = needValidation;
- this.sourceStreams = sourceStreams;
- init(policyDef);
- }
-
- public void init(AbstractPolicyDefinition policyDef) {
- siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition) policyDef);
- }
-
- public static String addContextFieldIfNotExist(String expression) {
- // select fieldA, fieldB --> select eagleAlertContext, fieldA, fieldB
- int pos = expression.indexOf("select ") + 7;
- int index = pos;
- boolean isSelectStarPattern = true;
- while (index < expression.length()) {
- if (expression.charAt(index) == ' ') index++;
- else if (expression.charAt(index) == '*') break;
- else {
- isSelectStarPattern = false;
- break;
- }
- }
- if (isSelectStarPattern) return expression;
- StringBuilder sb = new StringBuilder();
- sb.append(expression.substring(0, pos));
- sb.append(SiddhiStreamMetadataUtils.EAGLE_ALERT_CONTEXT_FIELD + ",");
- sb.append(expression.substring(pos, expression.length()));
- return sb.toString();
- }
-
- private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition policyDef) {
- SiddhiManager siddhiManager = new SiddhiManager();
- Map<String, InputHandler> siddhiInputHandlers = new HashMap<String, InputHandler>();
- SiddhiRuntime runtime = new SiddhiRuntime();
-
- // compose execution plan sql
- String executionPlan = policyDef.getExpression();
- if (!policyDef.isContainsDefinition()) {
- StringBuilder sb = new StringBuilder();
- for (String sourceStream : sourceStreams) {
- String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream);
- LOG.info("Siddhi stream definition : " + streamDef);
- sb.append(streamDef);
- }
-
- String expression = policyDef.getExpression();
- executionPlan = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + expression;
- }
-
- ExecutionPlanRuntime executionPlanRuntime = null;
-
- try {
- executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
- executionPlanRuntime.handleExceptionWith(new SiddhiPolicyExceptionHandler());
-
- for (String sourceStream : sourceStreams) {
- siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream));
- }
-
- executionPlanRuntime.start();
- LOG.info("Siddhi query: " + executionPlan);
- attachCallback(runtime, executionPlanRuntime, context);
-
- runtime.markdownEnabled = false;
- runtime.markdownReason = null;
- } catch (SiddhiParserException exception) { // process is not interrupted in case of an invalid policy defined by marking down
- LOG.error("Exception in parsing Siddhi query: " + executionPlan + ", reason being: " + exception.getMessage());
- runtime.queryCallback = null;
- runtime.outputFields = null;
- runtime.markdownEnabled = true;
- runtime.markdownReason = exception.getMessage();
- }
-
- runtime.siddhiInputHandlers = siddhiInputHandlers;
- runtime.siddhiManager = siddhiManager;
- runtime.policyDef = policyDef;
- runtime.executionPlanName = (null != executionPlanRuntime) ? executionPlanRuntime.getName() : null; // executionPlanRuntime will be set to null in case of an invalid policy
- return runtime;
- }
-
- private void attachCallback(SiddhiRuntime runtime, ExecutionPlanRuntime executionPlanRuntime, PolicyEvaluationContext<T, K> context) {
- List<String> outputFields = new ArrayList<>();
-// String outputStreamName = config.getString("alertExecutorConfigs." + executorId + "." + "outputStream");
-// if (StringUtils.isNotEmpty(outputStreamName)) {
-// StreamCallback streamCallback = new SiddhiOutputStreamCallback<>(config, this);
-// executionPlanRuntime.addCallback(outputStreamName, streamCallback);
-// runtime.outputStreamCallback = streamCallback;
-// // find output attribute from stream call back
-// try {
-// Field field = StreamCallback.class.getDeclaredField("streamDefinition");
-// field.setAccessible(true);
-// AbstractDefinition outStreamDef = (AbstractDefinition) field.get(streamCallback);
-// outputFields = Arrays.asList(outStreamDef.getAttributeNameArray());
-// } catch (Exception ex) {
-// LOG.error("Got an Exception when initial outputFields ", ex);
-// }
-// } else {
- QueryCallback callback = new SiddhiQueryCallbackImpl<T, K>(config, context);
- executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback);
- runtime.queryCallback = callback;
- // find output attribute from query call back
- try {
- Field field = QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME);
- field.setAccessible(true);
- Query query = (Query) field.get(callback);
- List<OutputAttribute> list = query.getSelector().getSelectionList();
- for (OutputAttribute output : list) {
- outputFields.add(output.getRename());
- }
- } catch (Exception ex) {
- LOG.error("Got an Exception when initial outputFields ", ex);
- }
-// }
- runtime.outputFields = outputFields;
- }
-
- /**
- * 1. input has 3 fields, first is siddhi context, second is streamName, the last one is map of attribute name/value
- * 2. runtime check for input data (This is very expensive, so we ignore for now)
- * the size of input map should be equal to size of attributes which stream metadata defines
- * the attribute names should be equal to attribute names which stream metadata defines
- * the input field cannot be null
- */
- @SuppressWarnings({"rawtypes"})
- @Override
- public void evaluate(ValuesArray data) throws Exception {
- if (!siddhiRuntime.markdownEnabled) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Siddhi policy evaluator consumers data :" + data);
- }
- Collector outputCollector = (Collector) data.get(0);
- String streamName = (String) data.get(1);
- SortedMap dataMap = (SortedMap) data.get(2);
-
- // Get metadata keyset for the stream.
- Set<String> metadataKeys = StreamMetadataManager.getInstance()
- .getMetadataEntityMapForStream(streamName).keySet();
-
- validateEventInRuntime(streamName, dataMap, metadataKeys);
-
- synchronized (siddhiRuntime) {
- // retain the collector in the context. This assignment is idempotent
- context.outputCollector = outputCollector;
-
- List<Object> input = new ArrayList<Object>();
- putAttrsIntoInputStream(input, streamName, metadataKeys, dataMap);
- siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0]));
- }
- }
- }
-
- /**
- * This is a heavy operation, we should avoid to use.
- * <p/>
- * This validation method will skip invalid fields in event which are not declared in stream schema otherwise it will cause exception for siddhi engine.
- *
- * @param sourceStream source steam id
- * @param data input event
- * @see <a href="https://issues.apache.org/jira/browse/EAGLE-49">https://issues.apache.org/jira/browse/EAGLE-49</a>
- */
- private void validateEventInRuntime(String sourceStream, SortedMap data, Set<String> metadataKeys) {
- if (!needValidation) {
- return;
- }
-
- if (!metadataKeys.equals(data.keySet())) {
- Set<Object> badKeys = new TreeSet<>();
- for (Object key : data.keySet()) {
- if (!metadataKeys.contains(key)) {
- badKeys.add(key);
- }
- }
- LOG.warn(String.format("Ignore invalid fields %s in event: %s from stream: %s, valid fields are: %s",
- badKeys.toString(), data.toString(), sourceStream, metadataKeys.toString()));
-
- for (Object key : badKeys) {
- data.remove(key);
- }
- }
- }
-
- private void putAttrsIntoInputStream(List<Object> input, String streamName, Set<String> metadataKeys, SortedMap dataMap) {
- if (!needValidation) {
- input.addAll(dataMap.values());
- return;
- }
-
- // If a metadata field is not set, we put null for the field's value.
- for (String key : metadataKeys) {
- Object value = dataMap.get(key);
- if (value == null) {
- input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, key));
- } else {
- input.add(value);
- }
- }
- }
-
- @Override
- public void onPolicyUpdate(T newAlertDef) {
- AbstractPolicyDefinition policyDef = null;
- try {
- policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(),
- AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(Constants.POLICY_TYPE)));
- } catch (Exception ex) {
- LOG.error("Initial policy def error, ", ex);
- }
- SiddhiRuntime previous = siddhiRuntime;
- siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition) policyDef);
- synchronized (previous) {
- if (!previous.markdownEnabled) // condition to check if previous SiddhiRuntime was started after policy validation
- previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown();
- }
- }
-
- @Override
- public void onPolicyDelete() {
- synchronized (siddhiRuntime) {
- LOG.info("Going to shutdown siddhi execution plan, planName: " + siddhiRuntime.executionPlanName);
- if (!siddhiRuntime.markdownEnabled) // condition to check if previous SiddhiRuntime was started after policy validation
- siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown();
- LOG.info("Siddhi execution plan " + siddhiRuntime.executionPlanName + " is successfully shutdown ");
- }
- }
-
- @Override
- public String toString() {
- return siddhiRuntime.policyDef.toString();
- }
-
- public String[] getStreamNames() {
- return sourceStreams;
- }
-
- public Map<String, String> getAdditionalContext() {
- Map<String, String> context = new HashMap<String, String>();
- StringBuilder sourceStreams = new StringBuilder();
- for (String streamName : getStreamNames()) {
- sourceStreams.append(streamName + ",");
- }
- if (sourceStreams.length() > 0) {
- sourceStreams.deleteCharAt(sourceStreams.length() - 1);
- }
- context.put(Constants.SOURCE_STREAMS, sourceStreams.toString());
- context.put(Constants.POLICY_ID, this.context.policyId);
- return context;
- }
-
- public List<String> getOutputStreamAttrNameList() {
- return siddhiRuntime.outputFields;
- }
-
- @Override
- public boolean isMarkdownEnabled() { return siddhiRuntime.markdownEnabled; }
-
- @Override
- public String getMarkdownReason() { return siddhiRuntime.markdownReason; }
-
- private static class SiddhiPolicyExceptionHandler implements Serializable, ExceptionHandler<Object> {
- private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyExceptionHandler.class);
-
- public void handleEventException(Throwable ex, long sequence, Object event) {
- LOG.warn("Exception processing event: " + sequence + " " + event, ex);
- }
-
- public void handleOnStartException(Throwable ex) {
- LOG.warn("Exception during onStart()", ex);
- }
-
- public void handleOnShutdownException(Throwable ex) {
- LOG.warn("Exception during onShutdown()", ex);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
deleted file mode 100644
index 1bd5830..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.policy.PolicyEvaluatorServiceProvider;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-
-public class SiddhiPolicyEvaluatorServiceProviderImpl<T extends AbstractPolicyDefinitionEntity> implements PolicyEvaluatorServiceProvider<T> {
- @Override
- public String getPolicyType() {
- return Constants.policyType.siddhiCEPEngine.name();
- }
-
- @Override
- public Class getPolicyEvaluator() {
- return SiddhiPolicyEvaluator.class;
- }
-
- @Override
- public List<Module> getBindingModules() {
- Module module1 = new SimpleModule(Constants.POLICY_DEFINITION).registerSubtypes(new NamedType(SiddhiPolicyDefinition.class, getPolicyType()));
- return Arrays.asList(module1);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
deleted file mode 100644
index 43422f8..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Siddhi call back implementation
- *
- * @param <T> - The policy definition type
- * @param <K> - K the alert entity type
- */
-public class SiddhiQueryCallbackImpl<T extends AbstractPolicyDefinitionEntity, K> extends QueryCallback{
-
- private static final Logger LOG = LoggerFactory.getLogger(SiddhiQueryCallbackImpl.class);
-
- private final Config config;
- private final PolicyEvaluationContext<T, K> siddhiEvaluateContext;
-
- public SiddhiQueryCallbackImpl(Config config, PolicyEvaluationContext<T, K> siddhiContext) {
- this.config = config;
- this.siddhiEvaluateContext = siddhiContext;
- }
-
- public static List<String> convertToString(List<Object> data) {
- List<String> rets = new ArrayList<String>();
- for (Object object : data) {
- String value = null;
- if (object instanceof Double) {
- value = String.valueOf((Double)object);
- }
- else if (object instanceof Integer) {
- value = String.valueOf((Integer)object);
- }
- else if (object instanceof Long) {
- value = String.valueOf((Long)object);
- }
- else if (object instanceof String) {
- value = (String)object;
- }
- else if (object instanceof Boolean) {
- value = String.valueOf((Boolean)object);
- }
- rets.add(value);
- }
- return rets;
- }
-
- public static List<Object> getOutputObject(Object[] data) {
- List<Object> rets = new ArrayList<>(data.length);
-// boolean isFirst = true;
- for (Object object : data) {
-// // The first field is siddhiAlertContext, skip it
-// if (isFirst) {
-// isFirst = false;
-// continue;
-// }
- rets.add(object);
- }
- return rets;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- List<Object> rets = getOutputObject(inEvents[0].getData());
- K alert = siddhiEvaluateContext.resultRender.render(config, rets, siddhiEvaluateContext, timeStamp);
- SiddhiEvaluationHandler<T, K> handler = siddhiEvaluateContext.alertExecutor;
- handler.onEvalEvents(siddhiEvaluateContext, Arrays.asList(alert));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
deleted file mode 100644
index e4c3481..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.SortedMap;
-
-/**
- * convert metadata entities for a stream to stream definition for siddhi cep engine
- * define stream HeapUsage (metric string, host string, value double, timestamp long)
- */
-public class SiddhiStreamMetadataUtils {
- private final static Logger LOG = LoggerFactory.getLogger(SiddhiStreamMetadataUtils.class);
-
- public final static String EAGLE_ALERT_CONTEXT_FIELD = "eagleAlertContext";
-
- public static SortedMap<String, AlertStreamSchemaEntity> getAttrMap(String streamName) {
- SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName);
- if(map == null || map.size() == 0){
- throw new IllegalStateException("Alert stream schema ["+streamName+"] should never be empty");
- }
- return map;
- }
-
- /**
- * @see org.wso2.siddhi.query.api.definition.Attribute.Type
- * make sure StreamMetadataManager.init is invoked before this method
- * @param streamName
- * @return
- */
- public static String convertToStreamDef(String streamName){
- SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName);
- StringBuilder sb = new StringBuilder();
-// sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object, ");
- for(Map.Entry<String, AlertStreamSchemaEntity> entry : map.entrySet()){
- appendAttributeNameType(sb, entry.getKey(), entry.getValue().getAttrType());
- }
- if(sb.length() > 0){
- sb.deleteCharAt(sb.length()-1);
- }
-
- String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");";
- return String.format(siddhiStreamDefFormat, sb.toString());
- }
-
- public static String convertToStreamDef(String streamName, Map<String, String> eventSchema){
- StringBuilder sb = new StringBuilder();
- sb.append("context" + " object,");
- for(Map.Entry<String, String> entry : eventSchema.entrySet()){
- appendAttributeNameType(sb, entry.getKey(), entry.getValue());
- }
- if(sb.length() > 0){
- sb.deleteCharAt(sb.length()-1);
- }
-
- String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");";
- return String.format(siddhiStreamDefFormat, sb.toString());
- }
-
- private static void appendAttributeNameType(StringBuilder sb, String attrName, String attrType){
- sb.append(attrName);
- sb.append(" ");
- if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){
- sb.append("string");
- }else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){
- sb.append("int");
- }else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){
- sb.append("long");
- }else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){
- sb.append("bool");
- }else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){
- sb.append("float");
- }else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){
- sb.append("double");
- }else{
- LOG.warn("AttrType is not recognized, ignore : " + attrType);
- }
- sb.append(",");
- }
-
- public static Object getAttrDefaultValue(String streamName, String attrName){
- SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName);
- AlertStreamSchemaEntity entity = map.get(attrName);
- if (entity.getDefaultValue() != null) {
- return entity.getDefaultValue();
- }
- else {
- String attrType = entity.getAttrType();
- if (attrType.equalsIgnoreCase(AttributeType.STRING.name())) {
- return "NA";
- } else if (attrType.equalsIgnoreCase(AttributeType.INTEGER.name()) || attrType.equalsIgnoreCase(AttributeType.LONG.name())) {
- return -1;
- } else if (attrType.equalsIgnoreCase(AttributeType.BOOL.name())) {
- return true;
- } else {
- LOG.warn("AttrType is not recognized: " + attrType + ", treat it as string");
- return "N/A";
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java
deleted file mode 100644
index 83d30e0..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import com.typesafe.config.Config;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.commons.collections.map.UnmodifiableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * centralized memory where all stream metadata sit on, it is not mutable data
- */
-public class StreamMetadataManager {
- private static final Logger LOG = LoggerFactory.getLogger(StreamMetadataManager.class);
-
- private static StreamMetadataManager instance = new StreamMetadataManager();
- private Map<String, List<AlertStreamSchemaEntity>> map = new HashMap<String, List<AlertStreamSchemaEntity>>();
- private Map<String, SortedMap<String, AlertStreamSchemaEntity>> map2 = new HashMap<String, SortedMap<String, AlertStreamSchemaEntity>>();
- private volatile boolean initialized = false;
-
- private StreamMetadataManager(){
- }
-
- public static StreamMetadataManager getInstance(){
- return instance;
- }
-
- private void internalInit(Config config, AlertStreamSchemaDAO dao){
- try{
- String application = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION);
- List<AlertStreamSchemaEntity> list = dao.findAlertStreamSchemaByApplication(application);
- if(list == null)
- return;
- for (AlertStreamSchemaEntity entity : list) {
- String streamName = entity.getTags().get(Constants.STREAM_NAME);
- if (map.get(streamName) == null) {
- map.put(streamName, new ArrayList<AlertStreamSchemaEntity>());
- map2.put(streamName, new TreeMap<String, AlertStreamSchemaEntity>());
- }
- map.get(streamName).add(entity);
- map2.get(streamName).put(entity.getTags().get(Constants.ATTR_NAME), entity);
- }
- }catch(Exception ex){
- LOG.error("Fail building metadata manger", ex);
- throw new IllegalStateException(ex);
- }
- }
-
- /**
- * singleton with init would be good for unit test as well, and it ensures that
- * initialization happens only once before you use it.
- * @param config
- * @param dao
- */
- public void init(Config config, AlertStreamSchemaDAO dao){
- if(!initialized){
- synchronized(this){
- if(!initialized){
- if(LOG.isDebugEnabled()) LOG.debug("Initializing ...");
- internalInit(config, dao);
- initialized = true;
- LOG.info("Successfully initialized");
- }
- }
- }else{
- LOG.info("Already initialized, skip");
- }
- }
-
- // Only for unit test purpose
- public void reset() {
- synchronized (this) {
- initialized = false;
- map.clear();
- map2.clear();
- }
- }
-
- private void ensureInitialized(){
- if(!initialized)
- throw new IllegalStateException("StreamMetadataManager should be initialized before using it");
- }
-
- public List<AlertStreamSchemaEntity> getMetadataEntitiesForStream(String streamName){
- ensureInitialized();
- return getMetadataEntitiesForAllStreams().get(streamName);
- }
-
- public Map<String, List<AlertStreamSchemaEntity>> getMetadataEntitiesForAllStreams(){
- ensureInitialized();
- return UnmodifiableMap.decorate(map);
- }
-
- public SortedMap<String, AlertStreamSchemaEntity> getMetadataEntityMapForStream(String streamName){
- ensureInitialized();
- return getMetadataEntityMapForAllStreams().get(streamName);
- }
-
- public Map<String, SortedMap<String, AlertStreamSchemaEntity>> getMetadataEntityMapForAllStreams(){
- ensureInitialized();
- return UnmodifiableMap.decorate(map2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index eac2bfd..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java b/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java
deleted file mode 100644
index c88d9bb..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.dao;
-
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- * Created on 12/31/15.
- */
-public class TestSchemaDao {
-
- @Ignore
- @Test
- public void test() throws Exception {
- AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl("localhost", 9099, "admin", "secret");
- List<AlertStreamSchemaEntity> entities = dao.findAlertStreamSchemaByApplication("hdfsAuditLog");
- System.out.print(entities);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-topology-example/pom.xml b/eagle-examples/eagle-topology-example/pom.xml
deleted file mode 100644
index 5d4a419..0000000
--- a/eagle-examples/eagle-topology-example/pom.xml
+++ /dev/null
@@ -1,68 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>eagle-examples</artifactId>
- <groupId>org.apache.eagle</groupId>
- <version>0.5.0-incubating-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>eagle-topology-example</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-stream-process-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-query-base</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <resources>
- <resource>
- <directory>src/resources</directory>
- </resource>
- </resources>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptor>src/assembly/eagle-topology-example-assembly.xml</descriptor>
- <finalName>eagle-topology-example-${project.version}</finalName>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <tarLongFileMode>posix</tarLongFileMode>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml b/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml
deleted file mode 100644
index 0acf619..0000000
--- a/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml
+++ /dev/null
@@ -1,63 +0,0 @@
-<?xml version="1.0"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>assembly</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <dependencySets>
- <dependencySet>
- <outputDirectory>/</outputDirectory>
- <useProjectArtifact>false</useProjectArtifact>
- <unpack>true</unpack>
- <scope>runtime</scope>
- <unpackOptions>
- <excludes>
- <exclude>**/application.conf</exclude>
- <exclude>**/defaults.yaml</exclude>
- <exclude>**/storm.yaml</exclude>
- <exclude>**/storm.yaml.1</exclude>
- <exclude>**/log4j.properties</exclude>
- </excludes>
- </unpackOptions>
- <excludes>
- <exclude>org.apache.storm:storm-core</exclude>
- <exclude>org.slf4j:slf4j-api</exclude>
- <exclude>org.slf4j:log4j-over-slf4j</exclude>
- <exclude>org.slf4j:slf4j-log4j12</exclude>
- <exclude>log4j:log4j</exclude>
- <exclude>asm:asm</exclude>
- <exclude>org.apache.log4j.wso2:log4j</exclude>
- </excludes>
- </dependencySet>
- </dependencySets>
- <fileSets>
- <fileSet>
- <directory>${project.build.outputDirectory}</directory>
- <outputDirectory>/</outputDirectory>
- <excludes>
- <exclude>application.conf</exclude>
- <exclude>log4j.properties</exclude>
- <exclude>**/storm.yaml.1</exclude>
- </excludes>
- </fileSet>
- </fileSets>
-</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java
deleted file mode 100644
index a5e39a5..0000000
--- a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.example.notificationplugin;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Created on 2/16/16.
- */
-public class NotificationPluginTestMain {
- public static void main(String[] args){
- System.setProperty("config.resource", "/application-plugintest.conf");
- StormExecutionEnvironment env = ExecutionEnvironments.getStorm();
- env.fromSpout(createProvider(env.getConfig())).withOutputFields(2).nameAs("testSpout").alertWithConsumer("testStream", "testExecutor");
- env.execute();
- }
-
- public static StormSpoutProvider createProvider(Config config) {
- return new StormSpoutProvider(){
-
- @Override
- public BaseRichSpout getSpout(Config context) {
- return new TestSpout();
- }
- };
- }
-
- public static class TestSpout extends BaseRichSpout {
- private static final Logger LOG = LoggerFactory.getLogger(TestSpout.class);
- private SpoutOutputCollector collector;
- public TestSpout() {
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void nextTuple() {
- Utils.sleep(5000);
- LOG.info("emitted tuple ...");
- Map<String, Object> map = new TreeMap<>();
- map.put("testAttribute", "testValue");
- collector.emit(new Values("testStream", map));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
deleted file mode 100644
index 58dfe48..0000000
--- a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.example.persist;
-
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
-
-/**
- * Created on 1/4/16.
- */
-public class MetricSerializer implements SpoutKafkaMessageDeserializer {
- @Override
- public Object deserialize(byte[] arg0) {
- String logLine = new String(arg0);
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
deleted file mode 100644
index 8f105fc..0000000
--- a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.example.persist;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.core.StorageType;
-import org.apache.eagle.datastream.core.StreamProducer;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.partition.PartitionStrategy;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Random;
-
-/**
- * Created on 1/4/16.
- *
- * This test demonstrates how user could use the new aggregate and persist feature for case like metrics processing&storage.
- *
- */
-public class PersistTopoTestMain {
-
- public static void main(String[] args) {
-// System.setProperty("config.resource", "application.conf");
- StormExecutionEnvironment env = ExecutionEnvironments.getStorm();
- StormSpoutProvider provider = createProvider(env.getConfig());
- execWithDefaultPartition(env, provider);
- }
-
- @SuppressWarnings("unchecked")
- public static void execWithDefaultPartition(StormExecutionEnvironment env, StormSpoutProvider provider) {
- StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer");
- StreamProducer filter = source;
-
- // required : persistTestEventStream schema be created in metadata manager
- // required : policy for aggregateExecutor1 be created in metadata manager
- StreamProducer aggregate = filter.aggregate(Arrays.asList("persistTestEventStream"), "aggregateExecutor1", new PartitionStrategy() {
- @Override
- public int balance(String key, int buckNum) {
- return 0;
- }
- });
-
- StreamProducer persist = aggregate.persist("persistExecutor1", StorageType.KAFKA());
-
- env.execute();
- }
-
- public static StormSpoutProvider createProvider(Config config) {
-
- return new StormSpoutProvider(){
-
- @Override
- public BaseRichSpout getSpout(Config context) {
- return new StaticMetricSpout();
- }
- };
- }
-
- public static class StaticMetricSpout extends BaseRichSpout {
-
- private long base;
- private SpoutOutputCollector collector;
-
- public StaticMetricSpout() {
- base = System.currentTimeMillis();
- }
-
- private Random cpuRandom = new Random();
- private Random memRandom = new Random();
- private static final long FULL_MEM_SIZE_BYTES = 512 * 1024 * 1024 * 1024;// 16g memory upbound limit
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("timestamp", "host", "cpu", "mem"));
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void nextTuple() {
- Utils.sleep(100);
- base = base + 100;// with fix steps..
- long mem = Double.valueOf(memRandom.nextGaussian() * FULL_MEM_SIZE_BYTES).longValue();
- collector.emit(new Values(base, "host", cpuRandom.nextInt(100), mem));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
deleted file mode 100644
index d9a7bbb..0000000
--- a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.example.persist;
-
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.core.StorageType;
-import org.apache.eagle.datastream.core.StreamProducer;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.partition.PartitionStrategy;
-
-import java.util.Arrays;
-
-/**
- * Created on 1/10/16.
- */
-public class PersistTopoTestMain2 {
-
- public static void main(String[] args) {
- System.setProperty("config.resource", "application.conf");// customize the application.conf
- StormExecutionEnvironment env = ExecutionEnvironments.getStorm();
- StormSpoutProvider provider = PersistTopoTestMain.createProvider(env.getConfig());
- exec(env, provider);
- }
-
- private static void exec(StormExecutionEnvironment env, StormSpoutProvider provider) {
- StreamProducer source = env.fromSpout(provider).withOutputFields(4).nameAs("kafkaMsgConsumer");
- StreamProducer filter = source;
-
- // "timestamp", "host", "cpu", "mem"
- String cql = " define stream eagleQuery(eagleAlertContext object, timestamp long, host string, cpu int, mem long);"
- + " @info(name='query')"
- + " from eagleQuery#window.externalTime(timestamp, 10 min) "
- + " select eagleAlertContext, min(timestamp) as starttime, avg(cpu) as avgCpu, avg(mem) as avgMem insert into tmp;";
- StreamProducer aggregate = filter.aggregate(Arrays.asList("ealgeQuery"), cql, new PartitionStrategy() {
- @Override
- public int balance(String key, int buckNum) {
- return 0;
- }
- });
-
- StreamProducer persist = aggregate.persist("persistExecutor1", StorageType.KAFKA());
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh b/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh
deleted file mode 100644
index e9288fa..0000000
--- a/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh
+++ /dev/null
@@ -1,50 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-CUR_DIR=$(dirname $0)
-source $CUR_DIR/../../../../../eagle-assembly/src/main/bin/eagle-env.sh
-
-##### delete email notification ##########
-echo ""
-echo "Importing policy ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
- {
- "prefix": "alertdef",
- "tags": {
- "site": "sandbox",
- "dataSource": "testSpout",
- "policyId": "pluginTestPolicy",
- "alertExecutorId": "testExecutor",
- "policyType": "siddhiCEPEngine"
- },
- "description": "pluginTest",
- "policyDef": "{\"expression\":\"from testStream[(testAttribute == \\\"testValue\\\")] select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}",
- "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]",
- "remediationDef":"",
- "enabled":true
- }
- ]
- '
-
-## Finished
-echo ""
-echo "Finished initialization for NotificationPluginTest"
-
-exit 0