You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:34 UTC
[26/84] [partial] eagle git commit: Clean repo for eagle site
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
deleted file mode 100755
index 628b2e4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-
-import java.util.List;
-import java.util.Map;
-
-public class SiddhiPolicyHandler implements PolicyStreamHandler {
- private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyHandler.class);
- private ExecutionPlanRuntime executionRuntime;
- private SiddhiManager siddhiManager;
- private Map<String, StreamDefinition> sds;
- private PolicyDefinition policy;
- private PolicyHandlerContext context;
-
- private int currentIndex = 0; // the index of current definition statement inside the policy definition
-
- public SiddhiPolicyHandler(Map<String, StreamDefinition> sds, int index) {
- this.sds = sds;
- this.currentIndex = index;
- }
-
- protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamNotDefinedException {
- return SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition,sds);
- }
-
- @Override
- public void prepare(final Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
- LOG.info("Initializing handler for policy {}", context.getPolicyDefinition());
- this.policy = context.getPolicyDefinition();
- this.siddhiManager = new SiddhiManager();
- String plan = generateExecutionPlan(policy, sds);
- try {
- this.executionRuntime = siddhiManager.createExecutionPlanRuntime(plan);
- LOG.info("Created siddhi runtime {}", executionRuntime.getName());
- } catch (Exception parserException) {
- LOG.error("Failed to create siddhi runtime for policy: {}, siddhi plan: \n\n{}\n", context.getPolicyDefinition().getName(), plan, parserException);
- throw parserException;
- }
-
- // add output stream callback
- List<String> outputStreams = getOutputStreams(policy);
- for (final String outputStream : outputStreams) {
- if (executionRuntime.getStreamDefinitionMap().containsKey(outputStream)) {
- StreamDefinition streamDefinition = SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream));
- this.executionRuntime.addCallback(outputStream,
- new AlertStreamCallback(outputStream, streamDefinition,
- collector, context, currentIndex));
- } else {
- throw new IllegalStateException("Undefined output stream " + outputStream);
- }
- }
- this.executionRuntime.start();
- this.context = context;
- LOG.info("Initialized policy handler for policy: {}", policy.getName());
- }
-
- protected List<String> getOutputStreams(PolicyDefinition policy) {
- return policy.getOutputStreams().isEmpty() ? policy.getDefinition().getOutputStreams() : policy.getOutputStreams();
- }
-
- public void send(StreamEvent event) throws Exception {
- context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "receive_count"));
- String streamId = event.getStreamId();
- InputHandler inputHandler = executionRuntime.getInputHandler(streamId);
- if (inputHandler != null) {
- context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "eval_count"));
- inputHandler.send(event.getTimestamp(), event.getData());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("sent event to siddhi stream {} ", streamId);
- }
- } else {
- context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "drop_count"));
- LOG.warn("No input handler found for stream {}", streamId);
- }
- }
-
- public void close() throws Exception {
- LOG.info("Closing handler for policy {}", this.policy.getName());
- this.executionRuntime.shutdown();
- LOG.info("Shutdown siddhi runtime {}", this.executionRuntime.getName());
- this.siddhiManager.shutdown();
- LOG.info("Shutdown siddhi manager {}", this.siddhiManager);
- LOG.info("Closed handler for policy {}", this.policy.getName());
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("SiddhiPolicyHandler for policy: ");
- sb.append(this.policy == null ? "" : this.policy.getName());
- return sb.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
deleted file mode 100644
index 141c819..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created on 7/27/16.
- */
-public class SiddhiPolicyStateHandler extends SiddhiPolicyHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyStateHandler.class);
-
- public SiddhiPolicyStateHandler(Map<String, StreamDefinition> sds, int index) {
- super(sds, index);
- }
-
- @Override
- protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamNotDefinedException {
- StringBuilder builder = new StringBuilder();
- PolicyDefinition.Definition stateDefiniton = policyDefinition.getStateDefinition();
- List<String> inputStreams = stateDefiniton.getInputStreams();
- for (String inputStream : inputStreams) { // the state stream follow the output stream of the policy definition
- builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream)));
- builder.append("\n");
- }
- builder.append(stateDefiniton.value);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Generated siddhi state execution plan: {} from definiton: {}", builder.toString(), stateDefiniton);
- }
- return builder.toString();
- }
-
- @Override
- protected List<String> getOutputStreams(PolicyDefinition policy) {
- return policy.getStateDefinition().getOutputStreams();
- }
-
- // more validation on prepare
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
deleted file mode 100644
index ef806fb..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class DistinctValuesInTimeBatchWindow {
-
- private static final Logger LOG = LoggerFactory.getLogger(DistinctValuesInTimeBatchWindow.class);
-
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-
- // wisb (what is should be) set for expected full set value of multiple columns
- @SuppressWarnings("rawtypes")
- private volatile Set wisb = new HashSet();
-
- private NoDataPolicyTimeBatchHandler handler;
-
- /**
- * map from value to max timestamp for this value.
- */
- private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
-
- private long startTime = -1;
- private long nextEmitTime = -1;
- private long timeInMilliSeconds;
-
- public DistinctValuesInTimeBatchWindow(NoDataPolicyTimeBatchHandler handler,
- long timeInMilliSeconds, @SuppressWarnings("rawtypes") Set wisb) {
- this.handler = handler;
- this.timeInMilliSeconds = timeInMilliSeconds;
- if (wisb != null) {
- this.wisb = wisb;
- }
- }
-
- public Map<Object, Long> distinctValues() {
- return valueMaxTimeMap;
- }
-
- public void send(StreamEvent event, Object value, long timestamp) {
- synchronized (this) {
- if (startTime < 0) {
- startTime = System.currentTimeMillis();
-
- scheduler.scheduleAtFixedRate(new Runnable() {
-
- @SuppressWarnings( {"unchecked", "rawtypes"})
- @Override
- public void run() {
- try {
- LOG.info("{}/{}: {}", startTime, nextEmitTime, valueMaxTimeMap.keySet());
- synchronized (valueMaxTimeMap) {
- boolean sendAlerts = false;
-
- if (nextEmitTime < 0) {
- nextEmitTime = startTime + timeInMilliSeconds;
- }
-
- if (System.currentTimeMillis() > nextEmitTime) {
- startTime = nextEmitTime;
- nextEmitTime += timeInMilliSeconds;
- sendAlerts = true;
- } else {
- sendAlerts = false;
- }
-
- if (sendAlerts) {
- // alert
- handler.compareAndEmit(wisb, distinctValues().keySet(), event);
- LOG.info("alert for wiri: {} compares to wisb: {}", distinctValues().keySet(), wisb);
-
- if (distinctValues().keySet().size() > 0) {
- wisb = new HashSet(distinctValues().keySet());
- }
- valueMaxTimeMap.clear();
- LOG.info("Clear wiri & update wisb to {}", wisb);
- }
- }
- } catch (Throwable t) {
- LOG.error("failed to run batch window for gap alert", t);
- }
- }
-
- }, 0, timeInMilliSeconds / 2, TimeUnit.MILLISECONDS);
- }
- }
-
- if (valueMaxTimeMap.containsKey(value)) {
- // remove that entry with old timestamp in timeSortedMap
- long oldTime = valueMaxTimeMap.get(value);
- if (oldTime >= timestamp) {
- // no any effect as the new timestamp is equal or even less than
- // old timestamp
- return;
- }
- }
- // update new timestamp in valueMaxTimeMap
- valueMaxTimeMap.put(value, timestamp);
-
- LOG.info("sent: {} with start: {}/next: {}", valueMaxTimeMap.keySet(), startTime, nextEmitTime);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
deleted file mode 100644
index 4aae040..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-import java.util.*;
-
-/**
- * Since 6/28/16.
- * to get distinct values within a specified time window
- * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
- * timeSortedMap : map sorted by timestamp first and then value
- * With the above 2 data structure, we can get distinct values in LOG(N).
- */
-public class DistinctValuesInTimeWindow {
- public static class ValueAndTime {
- Object value;
- long timestamp;
-
- public ValueAndTime(Object value, long timestamp) {
- this.value = value;
- this.timestamp = timestamp;
- }
-
- public String toString() {
- return "[" + value + "," + timestamp + "]";
- }
-
- public int hashCode() {
- return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
- }
-
- public boolean equals(Object that) {
- if (!(that instanceof ValueAndTime)) {
- return false;
- }
- ValueAndTime another = (ValueAndTime) that;
- return another.timestamp == this.timestamp && another.value.equals(this.value);
- }
- }
-
- public static class ValueAndTimeComparator implements Comparator<ValueAndTime> {
- @Override
- public int compare(ValueAndTime o1, ValueAndTime o2) {
- if (o1.timestamp != o2.timestamp) {
- return (o1.timestamp > o2.timestamp) ? 1 : -1;
- }
- if (o1.value.equals(o2.value)) {
- return 0;
- } else {
- // this is not strictly correct, but I don't want to write too many comparators here :-)
- if (o1.hashCode() > o2.hashCode()) {
- return 1;
- } else {
- return -1;
- }
- }
- }
- }
-
- /**
- * map from value to max timestamp for this value.
- */
- private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
- /**
- * map sorted by time(max timestamp for the value) and then value.
- */
- private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
- private long maxTimestamp = 0L;
- private long window;
- private boolean windowSlided;
-
- /**
- * @param window - milliseconds.
- */
- public DistinctValuesInTimeWindow(long window) {
- this.window = window;
- }
-
- public void send(Object value, long timestamp) {
- ValueAndTime vt = new ValueAndTime(value, timestamp);
-
- // todo think of time out of order
- if (valueMaxTimeMap.containsKey(value)) {
- // remove that entry with old timestamp in timeSortedMap
- long oldTime = valueMaxTimeMap.get(value);
- if (oldTime >= timestamp) {
- // no any effect as the new timestamp is equal or even less than old timestamp
- return;
- }
- timeSortedMap.remove(new ValueAndTime(value, oldTime));
- }
- // insert entry with new timestamp in timeSortedMap
- timeSortedMap.put(vt, vt);
- // update new timestamp in valueMaxTimeMap
- valueMaxTimeMap.put(value, timestamp);
-
- // evict old entries
- // store max timestamp if possible
- maxTimestamp = Math.max(maxTimestamp, timestamp);
-
- // check if some values should be evicted because of time window
- Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
- if (entry.getKey().timestamp < maxTimestamp - window) {
- // should remove the entry in valueMaxTimeMap and timeSortedMap
- valueMaxTimeMap.remove(entry.getKey().value);
- windowSlided = true;
-
- it.remove();
- } else {
- break;
- }
- }
- }
-
- public Map<Object, Long> distinctValues() {
- return valueMaxTimeMap;
- }
-
- public boolean windowSlided() {
- return windowSlided;
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
deleted file mode 100644
index ec6e6e9..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.apache.commons.collections.CollectionUtils;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Since 6/28/16.
- * No Data Policy engine
- * based on the following information
- * 1. stream definition: group by columns
- * 2. timestamp field: timestamp column
- * 3. wiri safe time window: how long window is good for full set of wiri
- * 4. wisb: full set
- * No data policy definition should include
- * fixed fields and dynamic fields
- * fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, f2_name
- * dynamic fields depend on wisb type.
- * policy would be like:
- * {
- * "name": "noDataAlertPolicy",
- * "description": "noDataAlertPolicy",
- * "inputStreams": [
- * "noDataAlertStream"
- * ],
- * "outputStreams": [
- * "noDataAlertStream_out"
- * ],
- * "definition": {
- * "type": "nodataalert",
- * "value": "PT1M,plain,1,host,host1,host2" // or "value": "PT1M,dynamic,1,host"
- * },
- * "partitionSpec": [
- * {
- * "streamId": "noDataAlertStream",
- * "type": "GROUPBY"
- * }
- * ],
- * "parallelismHint": 2
- * }
- * "name": "noDataAlertPolicy",
- * "description": "noDataAlertPolicy",
- * "inputStreams": [
- * "noDataAlertStream"
- * ],
- * "outputStreams": [
- * "noDataAlertStream_out"
- * ],
- * "definition": {
- * "type": "nodataalert",
- * "value": "PT1M,plain,1,host,host1,host2" // or "value": "PT1M,dynamic,1,host"
- * },
- * "partitionSpec": [
- * {
- * "streamId": "noDataAlertStream",
- * "type": "GROUPBY"
- * }
- * ],
- * "parallelismHint": 2
- * }
- */
-public class NoDataPolicyHandler implements PolicyStreamHandler {
- private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class);
- private Map<String, StreamDefinition> sds;
-
- // wisb(what is should be) set for expected full set value of multiple columns
- @SuppressWarnings("rawtypes")
- private volatile Set wisbValues = null;
- private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
- // reuse PolicyDefinition.defintion.value field to store full set of values separated by comma
- private volatile PolicyDefinition policyDef;
- private volatile Collector<AlertStreamEvent> collector;
- private volatile PolicyHandlerContext context;
- private volatile NoDataWisbType wisbType;
- private volatile DistinctValuesInTimeWindow distinctWindow;
-
- public NoDataPolicyHandler(Map<String, StreamDefinition> sds) {
- this.sds = sds;
- }
-
- @Override
- public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
- this.collector = collector;
- this.context = context;
- this.policyDef = context.getPolicyDefinition();
- List<String> inputStreams = policyDef.getInputStreams();
- // validate inputStreams has to contain only one stream
- if (inputStreams.size() != 1) {
- throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert");
- }
- // validate outputStream has to contain only one stream
- if (policyDef.getOutputStreams().size() != 1) {
- throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert");
- }
-
- String is = inputStreams.get(0);
- StreamDefinition sd = sds.get(is);
-
- String policyValue = policyDef.getDefinition().getValue();
- // assume that no data alert policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value}
- String[] segments = policyValue.split(",");
- long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
- distinctWindow = new DistinctValuesInTimeWindow(windowPeriod);
- this.wisbType = NoDataWisbType.valueOf(segments[1]);
- // for provided wisb values, need to parse, for dynamic wisb values, it is computed through a window
- if (wisbType == NoDataWisbType.provided) {
- wisbValues = new NoDataWisbProvidedParser().parse(segments);
- }
- // populate wisb field names
- int numOfFields = Integer.parseInt(segments[2]);
- for (int i = 3; i < 3 + numOfFields; i++) {
- String fn = segments[i];
- wisbFieldIndices.add(sd.getColumnIndex(fn));
- }
- }
-
- @SuppressWarnings( {"rawtypes", "unchecked"})
- @Override
- public void send(StreamEvent event) throws Exception {
- Object[] data = event.getData();
- List<Object> columnValues = new ArrayList<>();
- for (int i = 0; i < wisbFieldIndices.size(); i++) {
- Object o = data[wisbFieldIndices.get(i)];
- // convert value to string
- columnValues.add(o.toString());
- }
- distinctWindow.send(columnValues, event.getTimestamp());
- Set wiriValues = distinctWindow.distinctValues().keySet();
-
- LOG.debug("window slided: {}, with wiri: {}", distinctWindow.windowSlided(), distinctWindow.distinctValues());
-
- if (distinctWindow.windowSlided()) {
- compareAndEmit(wisbValues, wiriValues, event);
- }
-
- if (wisbType == NoDataWisbType.dynamic) {
- // deep copy
- wisbValues = new HashSet<>(wiriValues);
- }
- }
-
- @SuppressWarnings("rawtypes")
- private void compareAndEmit(Set wisb, Set wiri, StreamEvent event) {
- // compare with wisbValues if wisbValues are already there for dynamic type
- Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
- LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri);
- if (noDataValues != null && noDataValues.size() > 0) {
- LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisbValues);
- AlertStreamEvent alertEvent = createAlertEvent(event.getTimestamp(), event.getData());
- collector.emit(alertEvent);
- }
- }
-
- private AlertStreamEvent createAlertEvent(long timestamp, Object[] triggerEvent) {
- String is = policyDef.getInputStreams().get(0);
- final StreamDefinition sd = sds.get(is);
-
- AlertStreamEvent event = new AlertStreamEvent();
- event.setTimestamp(timestamp);
- event.setData(triggerEvent);
- event.setStreamId(policyDef.getOutputStreams().get(0));
- event.setPolicyId(context.getPolicyDefinition().getName());
- if (this.context.getPolicyEvaluator() != null) {
- event.setCreatedBy(context.getPolicyEvaluator().getName());
- }
- event.setCreatedTime(System.currentTimeMillis());
- event.setSchema(sd);
- return event;
- }
-
- @Override
- public void close() throws Exception {
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
deleted file mode 100644
index b1e32bd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Joiner;
-
-public class NoDataPolicyTimeBatchHandler implements PolicyStreamHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyTimeBatchHandler.class);
- private Map<String, StreamDefinition> sds;
-
- private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
- // reuse PolicyDefinition.defintion.value field to store full set of values
- // separated by comma
- private volatile PolicyDefinition policyDef;
- private volatile Collector<AlertStreamEvent> collector;
- private volatile PolicyHandlerContext context;
- private volatile NoDataWisbType wisbType;
- private volatile DistinctValuesInTimeBatchWindow distinctWindow;
-
- public NoDataPolicyTimeBatchHandler(Map<String, StreamDefinition> sds) {
- this.sds = sds;
- }
-
- @Override
- public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
- this.collector = collector;
- this.context = context;
- this.policyDef = context.getPolicyDefinition();
- List<String> inputStreams = policyDef.getInputStreams();
- // validate inputStreams has to contain only one stream
- if (inputStreams.size() != 1) {
- throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert");
- }
- // validate outputStream has to contain only one stream
- if (policyDef.getOutputStreams().size() != 1) {
- throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert");
- }
-
- String policyValue = policyDef.getDefinition().getValue();
- // assume that no data alert policy value consists of "windowPeriod,
- // type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value,
- // f2_value}
- String[] segments = policyValue.split(",");
- this.wisbType = NoDataWisbType.valueOf(segments[1]);
- // for provided wisb values, need to parse, for dynamic wisb values, it
- // is computed through a window
- Set<String> wisbValues = new HashSet<String>();
- if (wisbType == NoDataWisbType.provided) {
- for (int i = 2; i < segments.length; i++) {
- wisbValues.add(segments[i]);
- }
- }
-
- long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
- distinctWindow = new DistinctValuesInTimeBatchWindow(this, windowPeriod, wisbValues);
- // populate wisb field names
- String is = inputStreams.get(0);
- StreamDefinition sd = sds.get(is);
- String nodataColumnNameKey = "nodataColumnName";
- if (!policyDef.getDefinition().getProperties().containsKey(nodataColumnNameKey)) {
- throw new IllegalArgumentException("policy nodata column name has to be defined for no data alert");
- }
- wisbFieldIndices.add(sd.getColumnIndex((String) policyDef.getDefinition().getProperties().get(nodataColumnNameKey)));
- }
-
- @Override
- public void send(StreamEvent event) throws Exception {
- Object[] data = event.getData();
-
- List<Object> columnValues = new ArrayList<>();
- for (int i = 0; i < wisbFieldIndices.size(); i++) {
- Object o = data[wisbFieldIndices.get(i)];
- // convert value to string
- columnValues.add(o.toString());
- }
- // use local timestamp rather than event timestamp
- distinctWindow.send(event, columnValues, System.currentTimeMillis());
- LOG.debug("event sent to window with wiri: {}", distinctWindow.distinctValues());
- }
-
- @SuppressWarnings("rawtypes")
- public void compareAndEmit(Set wisb, Set wiri, StreamEvent event) {
- // compare with wisbValues if wisbValues are already there for dynamic
- // type
- Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
- LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri);
- if (noDataValues != null && noDataValues.size() > 0) {
- LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisb);
-
- String is = policyDef.getOutputStreams().get(0);
- StreamDefinition sd = sds.get(is);
- int timestampIndex = sd.getColumnIndex("timestamp");
- int hostIndex = sd.getColumnIndex("host");
- int originalStreamNameIndex = sd.getColumnIndex("originalStreamName");
-
- for (Object one : noDataValues) {
- Object[] triggerEvent = new Object[sd.getColumns().size()];
- for (int i = 0; i < sd.getColumns().size(); i++) {
- if (i == timestampIndex) {
- triggerEvent[i] = System.currentTimeMillis();
- } else if (i == hostIndex) {
- triggerEvent[hostIndex] = ((List) one).get(0);
- } else if (i == originalStreamNameIndex) {
- triggerEvent[originalStreamNameIndex] = event.getStreamId();
- } else if (sd.getColumns().size() < i) {
- LOG.error("strema event data have different lenght compare to column definition!");
- } else {
- triggerEvent[i] = sd.getColumns().get(i).getDefaultValue();
- }
- }
- AlertStreamEvent alertEvent = createAlertEvent(sd, event.getTimestamp(), triggerEvent);
- LOG.info(String.format("Nodata alert %s generated and will be emitted", Joiner.on(",").join(triggerEvent)));
- collector.emit(alertEvent);
- }
-
- }
- }
-
- private AlertStreamEvent createAlertEvent(StreamDefinition sd, long timestamp, Object[] triggerEvent) {
- AlertStreamEvent event = new AlertStreamEvent();
- event.setTimestamp(timestamp);
- event.setData(triggerEvent);
- event.setStreamId(policyDef.getOutputStreams().get(0));
- event.setPolicyId(context.getPolicyDefinition().getName());
- if (this.context.getPolicyEvaluator() != null) {
- event.setCreatedBy(context.getPolicyEvaluator().getName());
- }
- event.setCreatedTime(System.currentTimeMillis());
- event.setSchema(sd);
- return event;
- }
-
- @Override
- public void close() throws Exception {
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
deleted file mode 100644
index fa27108..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import java.util.List;
-import java.util.Set;
-
-/**
- * Since 6/29/16.
- */
-public interface NoDataWisbParser {
- /**
- * parse policy definition and return WISB values for one or multiple fields
- * for example host and data center are 2 fields for no data alert, then WISB is a list of two values.
- *
- * @param args some information parsed from policy defintion
- * @return list of list of field values
- */
- Set<List<String>> parse(String[] args);
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
deleted file mode 100644
index 4f54358..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Since 6/29/16.
- */
-public class NoDataWisbProvidedParser implements NoDataWisbParser {
- /**
- * policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value".
- */
- @Override
- public Set<List<String>> parse(String[] args) {
- int numOfFields = Integer.parseInt(args[2]);
- Set<List<String>> wisbValues = new HashSet<>();
- int i = 3 + numOfFields;
- while (i < args.length) {
- List<String> fields = new ArrayList<>();
- for (int j = 0; j < numOfFields; j++) {
- fields.add(args[i + j]);
- }
- wisbValues.add(fields);
- i += numOfFields;
- }
- return wisbValues;
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
deleted file mode 100644
index 887d099..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-/**
- * Since 6/29/16.
- */
-public enum NoDataWisbType {
- provided,
- dynamic
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
deleted file mode 100644
index 7ecc36f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.wso2.siddhi.query.api.ExecutionPlan;
-
-import java.util.List;
-import java.util.Map;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyExecutionPlan {
- /**
- * Actual input streams.
- */
- private Map<String, List<StreamColumn>> inputStreams;
-
- /**
- * Actual output streams.
- */
- private Map<String, List<StreamColumn>> outputStreams;
-
- /**
- * Execution plan source.
- */
- private String executionPlanSource;
-
- /**
- * Execution plan.
- */
- private ExecutionPlan internalExecutionPlan;
-
- private String executionPlanDesc;
-
- private List<StreamPartition> streamPartitions;
-
- public String getExecutionPlanSource() {
- return executionPlanSource;
- }
-
- public void setExecutionPlanSource(String executionPlanSource) {
- this.executionPlanSource = executionPlanSource;
- }
-
- public ExecutionPlan getInternalExecutionPlan() {
- return internalExecutionPlan;
- }
-
- public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) {
- this.internalExecutionPlan = internalExecutionPlan;
- }
-
- public String getExecutionPlanDesc() {
- return executionPlanDesc;
- }
-
- public void setExecutionPlanDesc(String executionPlanDesc) {
- this.executionPlanDesc = executionPlanDesc;
- }
-
- public List<StreamPartition> getStreamPartitions() {
- return streamPartitions;
- }
-
- public void setStreamPartitions(List<StreamPartition> streamPartitions) {
- this.streamPartitions = streamPartitions;
- }
-
- public Map<String, List<StreamColumn>> getInputStreams() {
- return inputStreams;
- }
-
- public void setInputStreams(Map<String, List<StreamColumn>> inputStreams) {
- this.inputStreams = inputStreams;
- }
-
- public Map<String, List<StreamColumn>> getOutputStreams() {
- return outputStreams;
- }
-
- public void setOutputStreams(Map<String, List<StreamColumn>> outputStreams) {
- this.outputStreams = outputStreams;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
deleted file mode 100644
index b8e5e42..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-/**
- * Keep PolicyExecutionPlanner as simple and fast as possible (avoid any backend data exchanging).
- */
-interface PolicyExecutionPlanner {
- /**
- * @return PolicyExecutionPlan.
- */
- PolicyExecutionPlan getExecutionPlan();
-
- static PolicyExecutionPlan parseExecutionPlan(String executionPlan) throws Exception {
- return new PolicyExecutionPlannerImpl(executionPlan).getExecutionPlan();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
deleted file mode 100644
index 4e6901d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.collections.ListUtils;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.exception.DefinitionNotExistException;
-import org.wso2.siddhi.query.api.ExecutionPlan;
-import org.wso2.siddhi.query.api.exception.DuplicateDefinitionException;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-import org.wso2.siddhi.query.api.execution.ExecutionElement;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
-import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
-import org.wso2.siddhi.query.api.execution.query.input.state.*;
-import org.wso2.siddhi.query.api.execution.query.input.stream.*;
-import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-import org.wso2.siddhi.query.api.execution.query.selection.Selector;
-import org.wso2.siddhi.query.api.expression.Expression;
-import org.wso2.siddhi.query.api.expression.Variable;
-import org.wso2.siddhi.query.api.expression.condition.Compare;
-import org.wso2.siddhi.query.api.expression.constant.IntConstant;
-import org.wso2.siddhi.query.api.expression.constant.LongConstant;
-import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
-import org.wso2.siddhi.query.compiler.SiddhiCompiler;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
-
- private static final Logger LOG = LoggerFactory.getLogger(PolicyExecutionPlannerImpl.class);
-
- /**
- * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
- */
- private static final String WINDOW_EXTERNAL_TIME = "externalTime";
-
- private final String executionPlan;
- private final Map<String,List<StreamColumn>> effectiveInputStreams;
- private final Map<String, String> effectiveInputStreamsAlias;
- private final Map<String,List<StreamColumn>> effectiveOutputStreams;
- private final Map<String,StreamPartition> effectivePartitions;
- private final PolicyExecutionPlan policyExecutionPlan;
-
- public PolicyExecutionPlannerImpl(String executionPlan) throws Exception {
- this.executionPlan = executionPlan;
- this.effectiveInputStreams = new HashMap<>();
- this.effectiveInputStreamsAlias = new HashMap<>();
- this.effectiveOutputStreams = new HashMap<>();
- this.effectivePartitions = new HashMap<>();
- this.policyExecutionPlan = doParse();
- }
-
- @Override
- public PolicyExecutionPlan getExecutionPlan() {
- return policyExecutionPlan;
- }
-
- private PolicyExecutionPlan doParse() throws Exception {
- PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
- try {
- ExecutionPlan executionPlan = SiddhiCompiler.parse(this.executionPlan);
-
- policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
-
- // Set current execution plan as valid
- policyExecutionPlan.setExecutionPlanSource(this.executionPlan);
- policyExecutionPlan.setInternalExecutionPlan(executionPlan);
-
-
- // Go through execution element
- for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
- // -------------
- // Explain Query
- // -------------
- if (executionElement instanceof Query) {
- // -----------------------
- // Query Level Variables
- // -----------------------
- InputStream inputStream = ((Query) executionElement).getInputStream();
- Selector selector = ((Query) executionElement).getSelector();
- Map<String, SingleInputStream> queryLevelAliasToStreamMapping = new HashMap<>();
-
- // Inputs stream definitions
- for (String streamId : inputStream.getUniqueStreamIds()) {
- if (!effectiveInputStreams.containsKey(streamId)) {
- org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
- if (streamDefinition != null) {
- effectiveInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
- } else {
- effectiveInputStreams.put(streamId, null);
- }
- }
- }
-
- // Window Spec and Partition
- if (inputStream instanceof SingleInputStream) {
- retrieveAliasForQuery((SingleInputStream) inputStream, queryLevelAliasToStreamMapping);
- retrievePartition(findStreamPartition((SingleInputStream) inputStream, selector));
- } else {
- if (inputStream instanceof JoinInputStream) {
- // Only Support JOIN/INNER_JOIN Now
- if (((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.INNER_JOIN) || ((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.JOIN)) {
- SingleInputStream leftInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getLeftInputStream();
- SingleInputStream rightInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getRightInputStream();
-
- retrievePartition(findStreamPartition(leftInputStream, selector));
- retrievePartition(findStreamPartition(rightInputStream, selector));
- retrieveAliasForQuery(leftInputStream, queryLevelAliasToStreamMapping);
- retrieveAliasForQuery(rightInputStream, queryLevelAliasToStreamMapping);
-
- } else {
- throw new ExecutionPlanValidationException("Not support " + ((JoinInputStream) inputStream).getType() + " yet, currently support: INNER JOIN");
- }
-
- Expression joinCondition = ((JoinInputStream) inputStream).getOnCompare();
-
- if (joinCondition != null) {
- if (joinCondition instanceof Compare) {
- if (((Compare) joinCondition).getOperator().equals(Compare.Operator.EQUAL)) {
- Variable leftExpression = (Variable) ((Compare) joinCondition).getLeftExpression();
- Preconditions.checkNotNull(leftExpression.getStreamId());
- Preconditions.checkNotNull(leftExpression.getAttributeName());
-
- StreamPartition leftPartition = new StreamPartition();
- leftPartition.setType(StreamPartition.Type.GROUPBY);
- leftPartition.setColumns(Collections.singletonList(leftExpression.getAttributeName()));
- leftPartition.setStreamId(retrieveStreamId(leftExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));
- retrievePartition(leftPartition);
-
- Variable rightExpression = (Variable) ((Compare) joinCondition).getRightExpression();
- Preconditions.checkNotNull(rightExpression.getStreamId());
- Preconditions.checkNotNull(rightExpression.getAttributeName());
- StreamPartition rightPartition = new StreamPartition();
- rightPartition.setType(StreamPartition.Type.GROUPBY);
- rightPartition.setColumns(Collections.singletonList(rightExpression.getAttributeName()));
- rightPartition.setStreamId(retrieveStreamId(rightExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));
- retrievePartition(leftPartition);
- } else {
- throw new ExecutionPlanValidationException("Only support \"EQUAL\" condition in INNER JOIN" + joinCondition);
- }
- } else {
- throw new ExecutionPlanValidationException("Only support \"Compare\" on INNER JOIN condition in INNER JOIN: " + joinCondition);
- }
- }
- } else if (inputStream instanceof StateInputStream) {
- // Group By Spec
- List<Variable> groupBy = selector.getGroupByList();
- if (groupBy.size() >= 0) {
- Map<String, List<Variable>> streamGroupBy = new HashMap<>();
- for (String streamId : inputStream.getUniqueStreamIds()) {
- streamGroupBy.put(streamId, new ArrayList<>());
- }
-
- collectStreamReferenceIdMapping(((StateInputStream)inputStream).getStateElement());
-
- for (Variable variable : groupBy) {
- // Not stream not set, then should be all streams' same field
- if (variable.getStreamId() == null) {
- for (String streamId : inputStream.getUniqueStreamIds()) {
- streamGroupBy.get(streamId).add(variable);
- }
- } else {
- String streamId = variable.getStreamId();
- if (!this.effectiveInputStreamsAlias.containsKey(streamId)) {
- streamId = retrieveStreamId(variable, effectiveInputStreams,queryLevelAliasToStreamMapping);
- } else {
- streamId = this.effectiveInputStreamsAlias.get(streamId);
- }
- if (streamGroupBy.containsKey(streamId)) {
- streamGroupBy.get(streamId).add(variable);
- } else {
- throw new DefinitionNotExistException(streamId);
- }
- }
- }
- for (Map.Entry<String, List<Variable>> entry : streamGroupBy.entrySet()) {
- if (entry.getValue().size() > 0) {
- StreamPartition partition = generatePartition(entry.getKey(), null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()])));
- if (((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.PATTERN)
- || ((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.SEQUENCE)) {
- if (effectivePartitions.containsKey(partition.getStreamId())) {
- StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());
- if (!existingPartition.equals(partition)
- && existingPartition.getType().equals(partition.getType())
- && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns())) {
- partition.setSortSpec(existingPartition.getSortSpec());
- }
- }
- }
- retrievePartition(partition);
- }
- }
- }
- }
- }
-
- // Output streams
- OutputStream outputStream = ((Query) executionElement).getOutputStream();
- effectiveOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
- } else {
- LOG.warn("Unhandled execution element: {}", executionElement.toString());
- }
- }
- // Set effective input streams
- policyExecutionPlan.setInputStreams(effectiveInputStreams);
-
- // Set effective output streams
- policyExecutionPlan.setOutputStreams(effectiveOutputStreams);
-
- // Set Partitions
- for (String streamId : effectiveInputStreams.keySet()) {
- // Use shuffle partition by default
- if (!effectivePartitions.containsKey(streamId)) {
- StreamPartition shufflePartition = new StreamPartition();
- shufflePartition.setStreamId(streamId);
- shufflePartition.setType(StreamPartition.Type.SHUFFLE);
- effectivePartitions.put(streamId, shufflePartition);
- }
- }
- policyExecutionPlan.setStreamPartitions(new ArrayList<>(effectivePartitions.values()));
- } catch (Exception ex) {
- LOG.error("Got error to parse policy execution plan: \n{}", this.executionPlan, ex);
- throw ex;
- }
- return policyExecutionPlan;
- }
-
- private void collectStreamReferenceIdMapping(StateElement stateElement) {
- if (stateElement instanceof LogicalStateElement) {
- collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement1());
- collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement2());
- } else if (stateElement instanceof CountStateElement) {
- collectStreamReferenceIdMapping(((CountStateElement) stateElement).getStreamStateElement());
- } else if (stateElement instanceof EveryStateElement) {
- collectStreamReferenceIdMapping(((EveryStateElement) stateElement).getStateElement());
- } else if (stateElement instanceof NextStateElement) {
- collectStreamReferenceIdMapping(((NextStateElement) stateElement).getStateElement());
- collectStreamReferenceIdMapping(((NextStateElement) stateElement).getNextStateElement());
- } else if (stateElement instanceof StreamStateElement) {
- BasicSingleInputStream basicSingleInputStream = ((StreamStateElement) stateElement).getBasicSingleInputStream();
- this.effectiveInputStreamsAlias.put(basicSingleInputStream.getStreamReferenceId(), basicSingleInputStream.getStreamId());
- }
- }
-
- private String retrieveStreamId(Variable variable, Map<String, List<StreamColumn>> streamMap, Map<String, SingleInputStream> aliasMap) {
- Preconditions.checkNotNull(variable.getStreamId(), "streamId");
- if (streamMap.containsKey(variable.getStreamId()) && aliasMap.containsKey(variable.getStreamId())) {
- throw new DuplicateDefinitionException("Duplicated streamId and alias: " + variable.getStreamId());
- } else if (streamMap.containsKey(variable.getStreamId())) {
- return variable.getStreamId();
- } else if (aliasMap.containsKey(variable.getStreamId())) {
- return aliasMap.get(variable.getStreamId()).getStreamId();
- } else {
- throw new DefinitionNotExistException(variable.getStreamId());
- }
- }
-
- private StreamPartition findStreamPartition(SingleInputStream inputStream, Selector selector) {
- // Window Spec
- List<Window> windows = new ArrayList<>();
- for (StreamHandler streamHandler : inputStream.getStreamHandlers()) {
- if (streamHandler instanceof Window) {
- windows.add((Window) streamHandler);
- }
- }
-
- // Group By Spec
- List<Variable> groupBy = selector.getGroupByList();
- if (windows.size() > 0 || groupBy.size() >= 0) {
- return generatePartition(inputStream.getStreamId(), windows, groupBy);
- } else {
- return null;
- }
- }
-
- private void retrievePartition(StreamPartition partition) {
- if (partition == null) {
- return;
- }
-
- if (!effectivePartitions.containsKey(partition.getStreamId())) {
- effectivePartitions.put(partition.getStreamId(), partition);
- } else if (!effectivePartitions.get(partition.getStreamId()).equals(partition)) {
- StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());
- // If same Type & Columns but different sort spec, then use larger
- if (existingPartition.getType().equals(partition.getType())
- && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns())
- && partition.getSortSpec().getWindowPeriodMillis() > existingPartition.getSortSpec().getWindowPeriodMillis()
- || existingPartition.getType().equals(StreamPartition.Type.SHUFFLE)) {
- effectivePartitions.put(partition.getStreamId(), partition);
- } else {
- // Throw exception as it unable to conflict effectivePartitions on same stream will not be able to run in distributed mode
- throw new ExecutionPlanValidationException("You have incompatible partitions on stream " + partition.getStreamId()
- + ": [1] " + effectivePartitions.get(partition.getStreamId()).toString() + " [2] " + partition.toString() + "");
- }
- }
- }
-
- private void retrieveAliasForQuery(SingleInputStream inputStream, Map<String, SingleInputStream> aliasStreamMapping) {
- if (inputStream.getStreamReferenceId() != null) {
- if (aliasStreamMapping.containsKey(inputStream.getStreamReferenceId())) {
- throw new ExecutionPlanValidationException("Duplicated stream alias " + inputStream.getStreamId() + " -> " + inputStream);
- } else {
- aliasStreamMapping.put(inputStream.getStreamReferenceId(), inputStream);
- }
- }
- }
-
- private StreamPartition generatePartition(String streamId, List<Window> windows, List<Variable> groupBy) {
- StreamPartition partition = new StreamPartition();
- partition.setStreamId(streamId);
- StreamSortSpec sortSpec = null;
- if (windows != null && windows.size() > 0) {
- for (Window window : windows) {
- if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) {
- sortSpec = new StreamSortSpec();
- sortSpec.setWindowPeriodMillis(getExternalTimeWindowSize(window));
- sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 5);
- }
- }
- }
- partition.setSortSpec(sortSpec);
- if (groupBy != null && groupBy.size() > 0) {
- partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
- partition.setType(StreamPartition.Type.GROUPBY);
- } else {
- partition.setType(StreamPartition.Type.SHUFFLE);
- }
- return partition;
- }
-
- private static int getExternalTimeWindowSize(Window window) {
- Expression windowSize = window.getParameters()[1];
- if (windowSize instanceof TimeConstant) {
- return ((TimeConstant) windowSize).getValue().intValue();
- } else if (windowSize instanceof IntConstant) {
- return ((IntConstant) windowSize).getValue();
- } else if (windowSize instanceof LongConstant) {
- return ((LongConstant) windowSize).getValue().intValue();
- } else {
- throw new UnsupportedOperationException("Illegal type of window size expression:" + windowSize.toString());
- }
- }
-
- private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
- return outputAttributeList.stream().map(outputAttribute -> {
- StreamColumn streamColumn = new StreamColumn();
- streamColumn.setName(outputAttribute.getRename());
- streamColumn.setDescription(outputAttribute.getExpression().toString());
- return streamColumn;
- }).collect(Collectors.toList());
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
deleted file mode 100644
index 4add3ff..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-import com.google.common.base.Preconditions;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * PolicyInterpreter Helper Methods:
- * <ul>
- * <li>Parse: parse siddhi query and generate static execution plan</li>
- * <li>Validate: validate policy definition with execution plan and metadata</li>
- * </ul>
- *
- * @see PolicyExecutionPlanner
- * @see <a href="https://docs.wso2.com/display/CEP300/WSO2+Complex+Event+Processor+Documentation">WSO2 Complex Event Processor Documentation</a>
- */
-public class PolicyInterpreter {
- private static final Logger LOG = LoggerFactory.getLogger(PolicyInterpreter.class);
-
- /**
- * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
- */
- private static final String WINDOW_EXTERNAL_TIME = "externalTime";
-
- public static PolicyParseResult parse(String executionPlanQuery) {
- PolicyParseResult policyParseResult = new PolicyParseResult();
- try {
- policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
- policyParseResult.setSuccess(true);
- policyParseResult.setMessage("Parsed successfully");
- } catch (Exception exception) {
- LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
- policyParseResult.setSuccess(false);
- policyParseResult.setMessage(exception.getMessage());
- policyParseResult.setStackTrace(exception);
- }
- return policyParseResult;
- }
-
- /**
- * Quick parseExecutionPlan policy.
- */
- public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
- // Validate inputStreams are valid
- Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
- return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
- }
-
- public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
- return PolicyExecutionPlanner.parseExecutionPlan(executionPlanQuery);
- }
-
- public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
- Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
- PolicyValidationResult policyValidationResult = new PolicyValidationResult();
- policyValidationResult.setPolicyDefinition(policy);
- try {
- if (policy.getInputStreams() != null) {
- for (String streamId : policy.getInputStreams()) {
- if (allDefinitions.containsKey(streamId)) {
- inputDefinitions.put(streamId, allDefinitions.get(streamId));
- } else {
- throw new StreamNotDefinedException(streamId);
- }
- }
- }
-
- PolicyExecutionPlan policyExecutionPlan = null;
- if (PolicyStreamHandlers.SIDDHI_ENGINE.equalsIgnoreCase(policy.getDefinition().getType())) {
- policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
- // Validate output
- if (policy.getOutputStreams() != null) {
- for (String outputStream : policy.getOutputStreams()) {
- if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
- throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
- }
- }
- }
- }
- policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
- policyValidationResult.setSuccess(true);
- policyValidationResult.setMessage("Validated successfully");
- } catch (Exception exception) {
- LOG.error("Got error to validate policy definition: {}", policy, exception);
- policyValidationResult.setSuccess(false);
- policyValidationResult.setMessage(exception.getMessage());
- policyValidationResult.setStackTrace(exception);
- }
-
- return policyValidationResult;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
deleted file mode 100644
index a0f3ad2..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyParseResult {
- private boolean success;
- private String message;
- private String exception;
-
- private PolicyExecutionPlan policyExecutionPlan;
-
- public String getException() {
- return exception;
- }
-
- public void setException(String exception) {
- this.exception = exception;
- }
-
- public String getMessage() {
- return message;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
-
- public void setStackTrace(Throwable throwable) {
- this.setException(ExceptionUtils.getStackTrace(throwable));
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- public void setSuccess(boolean success) {
- this.success = success;
- }
-
- public PolicyExecutionPlan getPolicyExecutionPlan() {
- return policyExecutionPlan;
- }
-
- public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
- this.policyExecutionPlan = policyExecutionPlan;
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
deleted file mode 100644
index 17f6091..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyValidationResult {
- private boolean success;
- private String message;
- private String exception;
-
- private PolicyExecutionPlan policyExecutionPlan;
- private PolicyDefinition policyDefinition;
-
- public String getException() {
- return exception;
- }
-
- public void setException(String exception) {
- this.exception = exception;
- }
-
- public String getMessage() {
- return message;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
-
- public void setStackTrace(Throwable throwable) {
- this.setException(ExceptionUtils.getStackTrace(throwable));
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- public void setSuccess(boolean success) {
- this.success = success;
- }
-
- public PolicyExecutionPlan getPolicyExecutionPlan() {
- return policyExecutionPlan;
- }
-
- public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
- this.policyExecutionPlan = policyExecutionPlan;
- }
-
- public PolicyDefinition getPolicyDefinition() {
- return policyDefinition;
- }
-
- public void setPolicyDefinition(PolicyDefinition policyDefinition) {
- this.policyDefinition = policyDefinition;
- }
-}
\ No newline at end of file