You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:34 UTC

[26/84] [partial] eagle git commit: Clean repo for eagle site

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
deleted file mode 100755
index 628b2e4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-
-import java.util.List;
-import java.util.Map;
-
-public class SiddhiPolicyHandler implements PolicyStreamHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyHandler.class);
-    private ExecutionPlanRuntime executionRuntime;
-    private SiddhiManager siddhiManager;
-    private Map<String, StreamDefinition> sds;
-    private PolicyDefinition policy;
-    private PolicyHandlerContext context;
-
-    private int currentIndex = 0; // the index of current definition statement inside the policy definition
-
-    public SiddhiPolicyHandler(Map<String, StreamDefinition> sds, int index) {
-        this.sds = sds;
-        this.currentIndex = index;
-    }
-
-    protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamNotDefinedException {
-        return SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition,sds);
-    }
-
-    @Override
-    public void prepare(final Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
-        LOG.info("Initializing handler for policy {}", context.getPolicyDefinition());
-        this.policy = context.getPolicyDefinition();
-        this.siddhiManager = new SiddhiManager();
-        String plan = generateExecutionPlan(policy, sds);
-        try {
-            this.executionRuntime = siddhiManager.createExecutionPlanRuntime(plan);
-            LOG.info("Created siddhi runtime {}", executionRuntime.getName());
-        } catch (Exception parserException) {
-            LOG.error("Failed to create siddhi runtime for policy: {}, siddhi plan: \n\n{}\n", context.getPolicyDefinition().getName(), plan, parserException);
-            throw parserException;
-        }
-
-        // add output stream callback
-        List<String> outputStreams = getOutputStreams(policy);
-        for (final String outputStream : outputStreams) {
-            if (executionRuntime.getStreamDefinitionMap().containsKey(outputStream)) {
-                StreamDefinition streamDefinition = SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream));
-                this.executionRuntime.addCallback(outputStream,
-                    new AlertStreamCallback(outputStream, streamDefinition,
-                        collector, context, currentIndex));
-            } else {
-                throw new IllegalStateException("Undefined output stream " + outputStream);
-            }
-        }
-        this.executionRuntime.start();
-        this.context = context;
-        LOG.info("Initialized policy handler for policy: {}", policy.getName());
-    }
-
-    protected List<String> getOutputStreams(PolicyDefinition policy) {
-        return policy.getOutputStreams().isEmpty() ? policy.getDefinition().getOutputStreams() : policy.getOutputStreams();
-    }
-
-    public void send(StreamEvent event) throws Exception {
-        context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "receive_count"));
-        String streamId = event.getStreamId();
-        InputHandler inputHandler = executionRuntime.getInputHandler(streamId);
-        if (inputHandler != null) {
-            context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "eval_count"));
-            inputHandler.send(event.getTimestamp(), event.getData());
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("sent event to siddhi stream {} ", streamId);
-            }
-        } else {
-            context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "drop_count"));
-            LOG.warn("No input handler found for stream {}", streamId);
-        }
-    }
-
-    public void close() throws Exception {
-        LOG.info("Closing handler for policy {}", this.policy.getName());
-        this.executionRuntime.shutdown();
-        LOG.info("Shutdown siddhi runtime {}", this.executionRuntime.getName());
-        this.siddhiManager.shutdown();
-        LOG.info("Shutdown siddhi manager {}", this.siddhiManager);
-        LOG.info("Closed handler for policy {}", this.policy.getName());
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder("SiddhiPolicyHandler for policy: ");
-        sb.append(this.policy == null ? "" : this.policy.getName());
-        return sb.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
deleted file mode 100644
index 141c819..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created on 7/27/16.
- */
-public class SiddhiPolicyStateHandler extends SiddhiPolicyHandler {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyStateHandler.class);
-
-    public SiddhiPolicyStateHandler(Map<String, StreamDefinition> sds, int index) {
-        super(sds, index);
-    }
-
-    @Override
-    protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamNotDefinedException {
-        StringBuilder builder = new StringBuilder();
-        PolicyDefinition.Definition stateDefiniton = policyDefinition.getStateDefinition();
-        List<String> inputStreams = stateDefiniton.getInputStreams();
-        for (String inputStream : inputStreams) { // the state stream follow the output stream of the policy definition
-            builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream)));
-            builder.append("\n");
-        }
-        builder.append(stateDefiniton.value);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Generated siddhi state execution plan: {} from definiton: {}", builder.toString(), stateDefiniton);
-        }
-        return builder.toString();
-    }
-
-    @Override
-    protected List<String> getOutputStreams(PolicyDefinition policy) {
-        return policy.getStateDefinition().getOutputStreams();
-    }
-
-    // more validation on prepare
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
deleted file mode 100644
index ef806fb..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class DistinctValuesInTimeBatchWindow {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DistinctValuesInTimeBatchWindow.class);
-
-    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-
-    // wisb (what is should be) set for expected full set value of multiple columns
-    @SuppressWarnings("rawtypes")
-    private volatile Set wisb = new HashSet();
-
-    private NoDataPolicyTimeBatchHandler handler;
-
-    /**
-     * map from value to max timestamp for this value.
-     */
-    private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
-
-    private long startTime = -1;
-    private long nextEmitTime = -1;
-    private long timeInMilliSeconds;
-
-    public DistinctValuesInTimeBatchWindow(NoDataPolicyTimeBatchHandler handler,
-                                           long timeInMilliSeconds, @SuppressWarnings("rawtypes") Set wisb) {
-        this.handler = handler;
-        this.timeInMilliSeconds = timeInMilliSeconds;
-        if (wisb != null) {
-            this.wisb = wisb;
-        }
-    }
-
-    public Map<Object, Long> distinctValues() {
-        return valueMaxTimeMap;
-    }
-
-    public void send(StreamEvent event, Object value, long timestamp) {
-        synchronized (this) {
-            if (startTime < 0) {
-                startTime = System.currentTimeMillis();
-
-                scheduler.scheduleAtFixedRate(new Runnable() {
-
-                    @SuppressWarnings( {"unchecked", "rawtypes"})
-                    @Override
-                    public void run() {
-                        try {
-                            LOG.info("{}/{}: {}", startTime, nextEmitTime, valueMaxTimeMap.keySet());
-                            synchronized (valueMaxTimeMap) {
-                                boolean sendAlerts = false;
-
-                                if (nextEmitTime < 0) {
-                                    nextEmitTime = startTime + timeInMilliSeconds;
-                                }
-
-                                if (System.currentTimeMillis() > nextEmitTime) {
-                                    startTime = nextEmitTime;
-                                    nextEmitTime += timeInMilliSeconds;
-                                    sendAlerts = true;
-                                } else {
-                                    sendAlerts = false;
-                                }
-
-                                if (sendAlerts) {
-                                    // alert
-                                    handler.compareAndEmit(wisb, distinctValues().keySet(), event);
-                                    LOG.info("alert for wiri: {} compares to wisb: {}", distinctValues().keySet(), wisb);
-
-                                    if (distinctValues().keySet().size() > 0) {
-                                        wisb = new HashSet(distinctValues().keySet());
-                                    }
-                                    valueMaxTimeMap.clear();
-                                    LOG.info("Clear wiri & update wisb to {}", wisb);
-                                }
-                            }
-                        } catch (Throwable t) {
-                            LOG.error("failed to run batch window for gap alert", t);
-                        }
-                    }
-
-                }, 0, timeInMilliSeconds / 2, TimeUnit.MILLISECONDS);
-            }
-        }
-
-        if (valueMaxTimeMap.containsKey(value)) {
-            // remove that entry with old timestamp in timeSortedMap
-            long oldTime = valueMaxTimeMap.get(value);
-            if (oldTime >= timestamp) {
-                // no any effect as the new timestamp is equal or even less than
-                // old timestamp
-                return;
-            }
-        }
-        // update new timestamp in valueMaxTimeMap
-        valueMaxTimeMap.put(value, timestamp);
-
-        LOG.info("sent: {} with start: {}/next: {}", valueMaxTimeMap.keySet(), startTime, nextEmitTime);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
deleted file mode 100644
index 4aae040..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-import java.util.*;
-
-/**
- * Since 6/28/16.
- * to get distinct values within a specified time window
- * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
- * timeSortedMap : map sorted by timestamp first and then value
- * With the above 2 data structure, we can get distinct values in LOG(N).
- */
-public class DistinctValuesInTimeWindow {
-    public static class ValueAndTime {
-        Object value;
-        long timestamp;
-
-        public ValueAndTime(Object value, long timestamp) {
-            this.value = value;
-            this.timestamp = timestamp;
-        }
-
-        public String toString() {
-            return "[" + value + "," + timestamp + "]";
-        }
-
-        public int hashCode() {
-            return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
-        }
-
-        public boolean equals(Object that) {
-            if (!(that instanceof ValueAndTime)) {
-                return false;
-            }
-            ValueAndTime another = (ValueAndTime) that;
-            return another.timestamp == this.timestamp && another.value.equals(this.value);
-        }
-    }
-
-    public static class ValueAndTimeComparator implements Comparator<ValueAndTime> {
-        @Override
-        public int compare(ValueAndTime o1, ValueAndTime o2) {
-            if (o1.timestamp != o2.timestamp) {
-                return (o1.timestamp > o2.timestamp) ? 1 : -1;
-            }
-            if (o1.value.equals(o2.value)) {
-                return 0;
-            } else {
-                // this is not strictly correct, but I don't want to write too many comparators here :-)
-                if (o1.hashCode() > o2.hashCode()) {
-                    return 1;
-                } else {
-                    return -1;
-                }
-            }
-        }
-    }
-
-    /**
-     * map from value to max timestamp for this value.
-     */
-    private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
-    /**
-     * map sorted by time(max timestamp for the value) and then value.
-     */
-    private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
-    private long maxTimestamp = 0L;
-    private long window;
-    private boolean windowSlided;
-
-    /**
-     * @param window - milliseconds.
-     */
-    public DistinctValuesInTimeWindow(long window) {
-        this.window = window;
-    }
-
-    public void send(Object value, long timestamp) {
-        ValueAndTime vt = new ValueAndTime(value, timestamp);
-
-        // todo think of time out of order
-        if (valueMaxTimeMap.containsKey(value)) {
-            // remove that entry with old timestamp in timeSortedMap
-            long oldTime = valueMaxTimeMap.get(value);
-            if (oldTime >= timestamp) {
-                // no any effect as the new timestamp is equal or even less than old timestamp
-                return;
-            }
-            timeSortedMap.remove(new ValueAndTime(value, oldTime));
-        }
-        // insert entry with new timestamp in timeSortedMap
-        timeSortedMap.put(vt, vt);
-        // update new timestamp in valueMaxTimeMap
-        valueMaxTimeMap.put(value, timestamp);
-
-        // evict old entries
-        // store max timestamp if possible
-        maxTimestamp = Math.max(maxTimestamp, timestamp);
-
-        // check if some values should be evicted because of time window
-        Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
-        while (it.hasNext()) {
-            Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
-            if (entry.getKey().timestamp < maxTimestamp - window) {
-                // should remove the entry in valueMaxTimeMap and timeSortedMap
-                valueMaxTimeMap.remove(entry.getKey().value);
-                windowSlided = true;
-
-                it.remove();
-            } else {
-                break;
-            }
-        }
-    }
-
-    public Map<Object, Long> distinctValues() {
-        return valueMaxTimeMap;
-    }
-
-    public boolean windowSlided() {
-        return windowSlided;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
deleted file mode 100644
index ec6e6e9..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.apache.commons.collections.CollectionUtils;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Since 6/28/16.
- * No Data Policy engine
- * based on the following information
- * 1. stream definition: group by columns
- * 2. timestamp field: timestamp column
- * 3. wiri safe time window: how long window is good for full set of wiri
- * 4. wisb: full set
- * No data policy definition should include
- * fixed fields and dynamic fields
- * fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, f2_name
- * dynamic fields depend on wisb type.
- * policy would be like:
- * {
- * "name": "noDataAlertPolicy",
- * "description": "noDataAlertPolicy",
- * "inputStreams": [
- * "noDataAlertStream"
- * ],
- * "outputStreams": [
- * "noDataAlertStream_out"
- * ],
- * "definition": {
- * "type": "nodataalert",
- * "value": "PT1M,plain,1,host,host1,host2"   // or "value": "PT1M,dynamic,1,host"
- * },
- * "partitionSpec": [
- * {
- * "streamId": "noDataAlertStream",
- * "type": "GROUPBY"
- * }
- * ],
- * "parallelismHint": 2
- * }
- * "name": "noDataAlertPolicy",
- * "description": "noDataAlertPolicy",
- * "inputStreams": [
- * "noDataAlertStream"
- * ],
- * "outputStreams": [
- * "noDataAlertStream_out"
- * ],
- * "definition": {
- * "type": "nodataalert",
- * "value": "PT1M,plain,1,host,host1,host2"   // or "value": "PT1M,dynamic,1,host"
- * },
- * "partitionSpec": [
- * {
- * "streamId": "noDataAlertStream",
- * "type": "GROUPBY"
- * }
- * ],
- * "parallelismHint": 2
- * }
- */
-public class NoDataPolicyHandler implements PolicyStreamHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class);
-    private Map<String, StreamDefinition> sds;
-
-    // wisb(what is should be) set for expected full set value of multiple columns
-    @SuppressWarnings("rawtypes")
-    private volatile Set wisbValues = null;
-    private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
-    // reuse PolicyDefinition.defintion.value field to store full set of values separated by comma
-    private volatile PolicyDefinition policyDef;
-    private volatile Collector<AlertStreamEvent> collector;
-    private volatile PolicyHandlerContext context;
-    private volatile NoDataWisbType wisbType;
-    private volatile DistinctValuesInTimeWindow distinctWindow;
-
-    public NoDataPolicyHandler(Map<String, StreamDefinition> sds) {
-        this.sds = sds;
-    }
-
-    @Override
-    public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
-        this.collector = collector;
-        this.context = context;
-        this.policyDef = context.getPolicyDefinition();
-        List<String> inputStreams = policyDef.getInputStreams();
-        // validate inputStreams has to contain only one stream
-        if (inputStreams.size() != 1) {
-            throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert");
-        }
-        // validate outputStream has to contain only one stream
-        if (policyDef.getOutputStreams().size() != 1) {
-            throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert");
-        }
-
-        String is = inputStreams.get(0);
-        StreamDefinition sd = sds.get(is);
-
-        String policyValue = policyDef.getDefinition().getValue();
-        // assume that no data alert policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value}
-        String[] segments = policyValue.split(",");
-        long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
-        distinctWindow = new DistinctValuesInTimeWindow(windowPeriod);
-        this.wisbType = NoDataWisbType.valueOf(segments[1]);
-        // for provided wisb values, need to parse, for dynamic wisb values, it is computed through a window
-        if (wisbType == NoDataWisbType.provided) {
-            wisbValues = new NoDataWisbProvidedParser().parse(segments);
-        }
-        // populate wisb field names
-        int numOfFields = Integer.parseInt(segments[2]);
-        for (int i = 3; i < 3 + numOfFields; i++) {
-            String fn = segments[i];
-            wisbFieldIndices.add(sd.getColumnIndex(fn));
-        }
-    }
-
-    @SuppressWarnings( {"rawtypes", "unchecked"})
-    @Override
-    public void send(StreamEvent event) throws Exception {
-        Object[] data = event.getData();
-        List<Object> columnValues = new ArrayList<>();
-        for (int i = 0; i < wisbFieldIndices.size(); i++) {
-            Object o = data[wisbFieldIndices.get(i)];
-            // convert value to string
-            columnValues.add(o.toString());
-        }
-        distinctWindow.send(columnValues, event.getTimestamp());
-        Set wiriValues = distinctWindow.distinctValues().keySet();
-
-        LOG.debug("window slided: {}, with wiri: {}", distinctWindow.windowSlided(), distinctWindow.distinctValues());
-
-        if (distinctWindow.windowSlided()) {
-            compareAndEmit(wisbValues, wiriValues, event);
-        }
-
-        if (wisbType == NoDataWisbType.dynamic) {
-            // deep copy
-            wisbValues = new HashSet<>(wiriValues);
-        }
-    }
-
-    @SuppressWarnings("rawtypes")
-    private void compareAndEmit(Set wisb, Set wiri, StreamEvent event) {
-        // compare with wisbValues if wisbValues are already there for dynamic type
-        Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
-        LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri);
-        if (noDataValues != null && noDataValues.size() > 0) {
-            LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisbValues);
-            AlertStreamEvent alertEvent = createAlertEvent(event.getTimestamp(), event.getData());
-            collector.emit(alertEvent);
-        }
-    }
-
-    private AlertStreamEvent createAlertEvent(long timestamp, Object[] triggerEvent) {
-        String is = policyDef.getInputStreams().get(0);
-        final StreamDefinition sd = sds.get(is);
-
-        AlertStreamEvent event = new AlertStreamEvent();
-        event.setTimestamp(timestamp);
-        event.setData(triggerEvent);
-        event.setStreamId(policyDef.getOutputStreams().get(0));
-        event.setPolicyId(context.getPolicyDefinition().getName());
-        if (this.context.getPolicyEvaluator() != null) {
-            event.setCreatedBy(context.getPolicyEvaluator().getName());
-        }
-        event.setCreatedTime(System.currentTimeMillis());
-        event.setSchema(sd);
-        return event;
-    }
-
-    @Override
-    public void close() throws Exception {
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
deleted file mode 100644
index b1e32bd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Joiner;
-
-public class NoDataPolicyTimeBatchHandler implements PolicyStreamHandler {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyTimeBatchHandler.class);
-    private Map<String, StreamDefinition> sds;
-
-    private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
-    // reuse PolicyDefinition.defintion.value field to store full set of values
-    // separated by comma
-    private volatile PolicyDefinition policyDef;
-    private volatile Collector<AlertStreamEvent> collector;
-    private volatile PolicyHandlerContext context;
-    private volatile NoDataWisbType wisbType;
-    private volatile DistinctValuesInTimeBatchWindow distinctWindow;
-
-    public NoDataPolicyTimeBatchHandler(Map<String, StreamDefinition> sds) {
-        this.sds = sds;
-    }
-
-    @Override
-    public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
-        this.collector = collector;
-        this.context = context;
-        this.policyDef = context.getPolicyDefinition();
-        List<String> inputStreams = policyDef.getInputStreams();
-        // validate inputStreams has to contain only one stream
-        if (inputStreams.size() != 1) {
-            throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert");
-        }
-        // validate outputStream has to contain only one stream
-        if (policyDef.getOutputStreams().size() != 1) {
-            throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert");
-        }
-
-        String policyValue = policyDef.getDefinition().getValue();
-        // assume that no data alert policy value consists of "windowPeriod,
-        // type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value,
-        // f2_value}
-        String[] segments = policyValue.split(",");
-        this.wisbType = NoDataWisbType.valueOf(segments[1]);
-        // for provided wisb values, need to parse, for dynamic wisb values, it
-        // is computed through a window
-        Set<String> wisbValues = new HashSet<String>();
-        if (wisbType == NoDataWisbType.provided) {
-            for (int i = 2; i < segments.length; i++) {
-                wisbValues.add(segments[i]);
-            }
-        }
-        
-        long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
-        distinctWindow = new DistinctValuesInTimeBatchWindow(this, windowPeriod, wisbValues);
-        // populate wisb field names
-        String is = inputStreams.get(0);
-        StreamDefinition sd = sds.get(is);
-        String nodataColumnNameKey = "nodataColumnName";
-        if (!policyDef.getDefinition().getProperties().containsKey(nodataColumnNameKey)) {
-            throw new IllegalArgumentException("policy nodata column name has to be defined for no data alert");
-        }
-        wisbFieldIndices.add(sd.getColumnIndex((String) policyDef.getDefinition().getProperties().get(nodataColumnNameKey)));
-    }
-
-    @Override
-    public void send(StreamEvent event) throws Exception {
-        Object[] data = event.getData();
-
-        List<Object> columnValues = new ArrayList<>();
-        for (int i = 0; i < wisbFieldIndices.size(); i++) {
-            Object o = data[wisbFieldIndices.get(i)];
-            // convert value to string
-            columnValues.add(o.toString());
-        }
-        // use local timestamp rather than event timestamp
-        distinctWindow.send(event, columnValues, System.currentTimeMillis());
-        LOG.debug("event sent to window with wiri: {}", distinctWindow.distinctValues());
-    }
-
-    @SuppressWarnings("rawtypes")
-    public void compareAndEmit(Set wisb, Set wiri, StreamEvent event) {
-        // compare with wisbValues if wisbValues are already there for dynamic
-        // type
-        Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
-        LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri);
-        if (noDataValues != null && noDataValues.size() > 0) {
-            LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisb);
-
-            String is = policyDef.getOutputStreams().get(0);
-            StreamDefinition sd = sds.get(is);
-            int timestampIndex = sd.getColumnIndex("timestamp");
-            int hostIndex = sd.getColumnIndex("host");
-            int originalStreamNameIndex = sd.getColumnIndex("originalStreamName");
-
-            for (Object one : noDataValues) {
-                Object[] triggerEvent = new Object[sd.getColumns().size()];
-                for (int i = 0; i < sd.getColumns().size(); i++) {
-                    if (i == timestampIndex) {
-                        triggerEvent[i] = System.currentTimeMillis();
-                    } else if (i == hostIndex) {
-                        triggerEvent[hostIndex] = ((List) one).get(0);
-                    } else if (i == originalStreamNameIndex) {
-                        triggerEvent[originalStreamNameIndex] = event.getStreamId();
-                    } else if (sd.getColumns().size() < i) {
-                        LOG.error("strema event data have different lenght compare to column definition!");
-                    } else {
-                        triggerEvent[i] = sd.getColumns().get(i).getDefaultValue();
-                    }
-                }
-                AlertStreamEvent alertEvent = createAlertEvent(sd, event.getTimestamp(), triggerEvent);
-                LOG.info(String.format("Nodata alert %s generated and will be emitted", Joiner.on(",").join(triggerEvent)));
-                collector.emit(alertEvent);
-            }
-
-        }
-    }
-
-    private AlertStreamEvent createAlertEvent(StreamDefinition sd, long timestamp, Object[] triggerEvent) {
-        AlertStreamEvent event = new AlertStreamEvent();
-        event.setTimestamp(timestamp);
-        event.setData(triggerEvent);
-        event.setStreamId(policyDef.getOutputStreams().get(0));
-        event.setPolicyId(context.getPolicyDefinition().getName());
-        if (this.context.getPolicyEvaluator() != null) {
-            event.setCreatedBy(context.getPolicyEvaluator().getName());
-        }
-        event.setCreatedTime(System.currentTimeMillis());
-        event.setSchema(sd);
-        return event;
-    }
-
-    @Override
-    public void close() throws Exception {
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
deleted file mode 100644
index fa27108..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import java.util.List;
-import java.util.Set;
-
-/**
- * Since 6/29/16.
- */
-public interface NoDataWisbParser {
-    /**
-     * parse policy definition and return WISB values for one or multiple fields
-     * for example host and data center are 2 fields for no data alert, then WISB is a list of two values.
-     *
-     * @param args some information parsed from policy defintion
-     * @return list of list of field values
-     */
-    Set<List<String>> parse(String[] args);
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
deleted file mode 100644
index 4f54358..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Since 6/29/16.
- */
-public class NoDataWisbProvidedParser implements NoDataWisbParser {
-    /**
-     * policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value".
-     */
-    @Override
-    public Set<List<String>> parse(String[] args) {
-        int numOfFields = Integer.parseInt(args[2]);
-        Set<List<String>> wisbValues = new HashSet<>();
-        int i = 3 + numOfFields;
-        while (i < args.length) {
-            List<String> fields = new ArrayList<>();
-            for (int j = 0; j < numOfFields; j++) {
-                fields.add(args[i + j]);
-            }
-            wisbValues.add(fields);
-            i += numOfFields;
-        }
-        return wisbValues;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
deleted file mode 100644
index 887d099..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.nodata;
-
-/**
- * Since 6/29/16.
- */
-public enum NoDataWisbType {
-    provided,
-    dynamic
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
deleted file mode 100644
index 7ecc36f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.wso2.siddhi.query.api.ExecutionPlan;
-
-import java.util.List;
-import java.util.Map;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyExecutionPlan {
-    /**
-     * Actual input streams.
-     */
-    private Map<String, List<StreamColumn>> inputStreams;
-
-    /**
-     * Actual output streams.
-     */
-    private Map<String, List<StreamColumn>> outputStreams;
-
-    /**
-     * Execution plan source.
-     */
-    private String executionPlanSource;
-
-    /**
-     * Execution plan.
-     */
-    private ExecutionPlan internalExecutionPlan;
-
-    private String executionPlanDesc;
-
-    private List<StreamPartition> streamPartitions;
-
-    public String getExecutionPlanSource() {
-        return executionPlanSource;
-    }
-
-    public void setExecutionPlanSource(String executionPlanSource) {
-        this.executionPlanSource = executionPlanSource;
-    }
-
-    public ExecutionPlan getInternalExecutionPlan() {
-        return internalExecutionPlan;
-    }
-
-    public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) {
-        this.internalExecutionPlan = internalExecutionPlan;
-    }
-
-    public String getExecutionPlanDesc() {
-        return executionPlanDesc;
-    }
-
-    public void setExecutionPlanDesc(String executionPlanDesc) {
-        this.executionPlanDesc = executionPlanDesc;
-    }
-
-    public List<StreamPartition> getStreamPartitions() {
-        return streamPartitions;
-    }
-
-    public void setStreamPartitions(List<StreamPartition> streamPartitions) {
-        this.streamPartitions = streamPartitions;
-    }
-
-    public Map<String, List<StreamColumn>> getInputStreams() {
-        return inputStreams;
-    }
-
-    public void setInputStreams(Map<String, List<StreamColumn>> inputStreams) {
-        this.inputStreams = inputStreams;
-    }
-
-    public Map<String, List<StreamColumn>> getOutputStreams() {
-        return outputStreams;
-    }
-
-    public void setOutputStreams(Map<String, List<StreamColumn>> outputStreams) {
-        this.outputStreams = outputStreams;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
deleted file mode 100644
index b8e5e42..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-/**
- * Keep PolicyExecutionPlanner as simple and fast as possible (avoid any backend data exchanging).
- */
-interface PolicyExecutionPlanner {
-    /**
-     * @return PolicyExecutionPlan.
-     */
-    PolicyExecutionPlan getExecutionPlan();
-
-    static PolicyExecutionPlan parseExecutionPlan(String executionPlan) throws Exception {
-        return new PolicyExecutionPlannerImpl(executionPlan).getExecutionPlan();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
deleted file mode 100644
index 4e6901d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.collections.ListUtils;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.exception.DefinitionNotExistException;
-import org.wso2.siddhi.query.api.ExecutionPlan;
-import org.wso2.siddhi.query.api.exception.DuplicateDefinitionException;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-import org.wso2.siddhi.query.api.execution.ExecutionElement;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
-import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
-import org.wso2.siddhi.query.api.execution.query.input.state.*;
-import org.wso2.siddhi.query.api.execution.query.input.stream.*;
-import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-import org.wso2.siddhi.query.api.execution.query.selection.Selector;
-import org.wso2.siddhi.query.api.expression.Expression;
-import org.wso2.siddhi.query.api.expression.Variable;
-import org.wso2.siddhi.query.api.expression.condition.Compare;
-import org.wso2.siddhi.query.api.expression.constant.IntConstant;
-import org.wso2.siddhi.query.api.expression.constant.LongConstant;
-import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
-import org.wso2.siddhi.query.compiler.SiddhiCompiler;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PolicyExecutionPlannerImpl.class);
-
-    /**
-     * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
-     */
-    private static final String WINDOW_EXTERNAL_TIME = "externalTime";
-
-    private final String executionPlan;
-    private final Map<String,List<StreamColumn>> effectiveInputStreams;
-    private final Map<String, String> effectiveInputStreamsAlias;
-    private final Map<String,List<StreamColumn>> effectiveOutputStreams;
-    private final Map<String,StreamPartition> effectivePartitions;
-    private final PolicyExecutionPlan policyExecutionPlan;
-
-    public PolicyExecutionPlannerImpl(String executionPlan) throws Exception {
-        this.executionPlan = executionPlan;
-        this.effectiveInputStreams = new HashMap<>();
-        this.effectiveInputStreamsAlias = new HashMap<>();
-        this.effectiveOutputStreams = new HashMap<>();
-        this.effectivePartitions = new HashMap<>();
-        this.policyExecutionPlan = doParse();
-    }
-
-    @Override
-    public PolicyExecutionPlan getExecutionPlan() {
-        return policyExecutionPlan;
-    }
-
-    private PolicyExecutionPlan doParse()  throws Exception {
-        PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
-        try {
-            ExecutionPlan executionPlan = SiddhiCompiler.parse(this.executionPlan);
-
-            policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
-
-            // Set current execution plan as valid
-            policyExecutionPlan.setExecutionPlanSource(this.executionPlan);
-            policyExecutionPlan.setInternalExecutionPlan(executionPlan);
-
-
-            // Go through execution element
-            for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
-                // -------------
-                // Explain Query
-                // -------------
-                if (executionElement instanceof Query) {
-                    // -----------------------
-                    // Query Level Variables
-                    // -----------------------
-                    InputStream inputStream = ((Query) executionElement).getInputStream();
-                    Selector selector = ((Query) executionElement).getSelector();
-                    Map<String, SingleInputStream> queryLevelAliasToStreamMapping = new HashMap<>();
-
-                    // Inputs stream definitions
-                    for (String streamId : inputStream.getUniqueStreamIds()) {
-                        if (!effectiveInputStreams.containsKey(streamId)) {
-                            org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
-                            if (streamDefinition != null) {
-                                effectiveInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
-                            } else {
-                                effectiveInputStreams.put(streamId, null);
-                            }
-                        }
-                    }
-
-                    // Window Spec and Partition
-                    if (inputStream instanceof SingleInputStream) {
-                        retrieveAliasForQuery((SingleInputStream) inputStream, queryLevelAliasToStreamMapping);
-                        retrievePartition(findStreamPartition((SingleInputStream) inputStream, selector));
-                    } else {
-                        if (inputStream instanceof JoinInputStream) {
-                            // Only Support JOIN/INNER_JOIN Now
-                            if (((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.INNER_JOIN) || ((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.JOIN)) {
-                                SingleInputStream leftInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getLeftInputStream();
-                                SingleInputStream rightInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getRightInputStream();
-
-                                retrievePartition(findStreamPartition(leftInputStream, selector));
-                                retrievePartition(findStreamPartition(rightInputStream, selector));
-                                retrieveAliasForQuery(leftInputStream, queryLevelAliasToStreamMapping);
-                                retrieveAliasForQuery(rightInputStream, queryLevelAliasToStreamMapping);
-
-                            } else {
-                                throw new ExecutionPlanValidationException("Not support " + ((JoinInputStream) inputStream).getType() + " yet, currently support: INNER JOIN");
-                            }
-
-                            Expression joinCondition = ((JoinInputStream) inputStream).getOnCompare();
-
-                            if (joinCondition != null) {
-                                if (joinCondition instanceof Compare) {
-                                    if (((Compare) joinCondition).getOperator().equals(Compare.Operator.EQUAL)) {
-                                        Variable leftExpression = (Variable) ((Compare) joinCondition).getLeftExpression();
-                                        Preconditions.checkNotNull(leftExpression.getStreamId());
-                                        Preconditions.checkNotNull(leftExpression.getAttributeName());
-
-                                        StreamPartition leftPartition = new StreamPartition();
-                                        leftPartition.setType(StreamPartition.Type.GROUPBY);
-                                        leftPartition.setColumns(Collections.singletonList(leftExpression.getAttributeName()));
-                                        leftPartition.setStreamId(retrieveStreamId(leftExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));
-                                        retrievePartition(leftPartition);
-
-                                        Variable rightExpression = (Variable) ((Compare) joinCondition).getRightExpression();
-                                        Preconditions.checkNotNull(rightExpression.getStreamId());
-                                        Preconditions.checkNotNull(rightExpression.getAttributeName());
-                                        StreamPartition rightPartition = new StreamPartition();
-                                        rightPartition.setType(StreamPartition.Type.GROUPBY);
-                                        rightPartition.setColumns(Collections.singletonList(rightExpression.getAttributeName()));
-                                        rightPartition.setStreamId(retrieveStreamId(rightExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));
-                                        retrievePartition(leftPartition);
-                                    } else {
-                                        throw new ExecutionPlanValidationException("Only support \"EQUAL\" condition in INNER JOIN" + joinCondition);
-                                    }
-                                } else {
-                                    throw new ExecutionPlanValidationException("Only support \"Compare\" on INNER JOIN condition in INNER JOIN: " + joinCondition);
-                                }
-                            }
-                        } else if (inputStream instanceof StateInputStream) {
-                            // Group By Spec
-                            List<Variable> groupBy = selector.getGroupByList();
-                            if (groupBy.size() >= 0) {
-                                Map<String, List<Variable>> streamGroupBy = new HashMap<>();
-                                for (String streamId : inputStream.getUniqueStreamIds()) {
-                                    streamGroupBy.put(streamId, new ArrayList<>());
-                                }
-
-                                collectStreamReferenceIdMapping(((StateInputStream)inputStream).getStateElement());
-
-                                for (Variable variable : groupBy) {
-                                    // Not stream not set, then should be all streams' same field
-                                    if (variable.getStreamId() == null) {
-                                        for (String streamId : inputStream.getUniqueStreamIds()) {
-                                            streamGroupBy.get(streamId).add(variable);
-                                        }
-                                    } else {
-                                        String streamId = variable.getStreamId();
-                                        if (!this.effectiveInputStreamsAlias.containsKey(streamId)) {
-                                            streamId = retrieveStreamId(variable, effectiveInputStreams,queryLevelAliasToStreamMapping);
-                                        } else {
-                                            streamId = this.effectiveInputStreamsAlias.get(streamId);
-                                        }
-                                        if (streamGroupBy.containsKey(streamId)) {
-                                            streamGroupBy.get(streamId).add(variable);
-                                        } else {
-                                            throw new DefinitionNotExistException(streamId);
-                                        }
-                                    }
-                                }
-                                for (Map.Entry<String, List<Variable>> entry : streamGroupBy.entrySet()) {
-                                    if (entry.getValue().size() > 0) {
-                                        StreamPartition partition = generatePartition(entry.getKey(), null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()])));
-                                        if (((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.PATTERN)
-                                                || ((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.SEQUENCE)) {
-                                            if (effectivePartitions.containsKey(partition.getStreamId())) {
-                                                StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());
-                                                if (!existingPartition.equals(partition)
-                                                        && existingPartition.getType().equals(partition.getType())
-                                                        && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns())) {
-                                                    partition.setSortSpec(existingPartition.getSortSpec());
-                                                }
-                                            }
-                                        }
-                                        retrievePartition(partition);
-                                    }
-                                }
-                            }
-                        }
-                    }
-
-                    // Output streams
-                    OutputStream outputStream = ((Query) executionElement).getOutputStream();
-                    effectiveOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
-                } else {
-                    LOG.warn("Unhandled execution element: {}", executionElement.toString());
-                }
-            }
-            // Set effective input streams
-            policyExecutionPlan.setInputStreams(effectiveInputStreams);
-
-            // Set effective output streams
-            policyExecutionPlan.setOutputStreams(effectiveOutputStreams);
-
-            // Set Partitions
-            for (String streamId : effectiveInputStreams.keySet()) {
-                // Use shuffle partition by default
-                if (!effectivePartitions.containsKey(streamId)) {
-                    StreamPartition shufflePartition = new StreamPartition();
-                    shufflePartition.setStreamId(streamId);
-                    shufflePartition.setType(StreamPartition.Type.SHUFFLE);
-                    effectivePartitions.put(streamId, shufflePartition);
-                }
-            }
-            policyExecutionPlan.setStreamPartitions(new ArrayList<>(effectivePartitions.values()));
-        } catch (Exception ex) {
-            LOG.error("Got error to parse policy execution plan: \n{}", this.executionPlan, ex);
-            throw ex;
-        }
-        return policyExecutionPlan;
-    }
-
-    private void collectStreamReferenceIdMapping(StateElement stateElement) {
-        if (stateElement instanceof LogicalStateElement) {
-            collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement1());
-            collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement2());
-        } else if (stateElement instanceof CountStateElement) {
-            collectStreamReferenceIdMapping(((CountStateElement) stateElement).getStreamStateElement());
-        } else if (stateElement instanceof EveryStateElement) {
-            collectStreamReferenceIdMapping(((EveryStateElement) stateElement).getStateElement());
-        } else if (stateElement instanceof NextStateElement) {
-            collectStreamReferenceIdMapping(((NextStateElement) stateElement).getStateElement());
-            collectStreamReferenceIdMapping(((NextStateElement) stateElement).getNextStateElement());
-        } else if (stateElement instanceof StreamStateElement) {
-            BasicSingleInputStream basicSingleInputStream = ((StreamStateElement) stateElement).getBasicSingleInputStream();
-            this.effectiveInputStreamsAlias.put(basicSingleInputStream.getStreamReferenceId(), basicSingleInputStream.getStreamId());
-        }
-    }
-
-    private String retrieveStreamId(Variable variable, Map<String, List<StreamColumn>> streamMap, Map<String, SingleInputStream> aliasMap) {
-        Preconditions.checkNotNull(variable.getStreamId(), "streamId");
-        if (streamMap.containsKey(variable.getStreamId()) && aliasMap.containsKey(variable.getStreamId())) {
-            throw new DuplicateDefinitionException("Duplicated streamId and alias: " + variable.getStreamId());
-        } else if (streamMap.containsKey(variable.getStreamId())) {
-            return variable.getStreamId();
-        } else if (aliasMap.containsKey(variable.getStreamId())) {
-            return aliasMap.get(variable.getStreamId()).getStreamId();
-        } else {
-            throw new DefinitionNotExistException(variable.getStreamId());
-        }
-    }
-
-    private StreamPartition findStreamPartition(SingleInputStream inputStream, Selector selector) {
-        // Window Spec
-        List<Window> windows = new ArrayList<>();
-        for (StreamHandler streamHandler : inputStream.getStreamHandlers()) {
-            if (streamHandler instanceof Window) {
-                windows.add((Window) streamHandler);
-            }
-        }
-
-        // Group By Spec
-        List<Variable> groupBy = selector.getGroupByList();
-        if (windows.size() > 0 || groupBy.size() >= 0) {
-            return generatePartition(inputStream.getStreamId(), windows, groupBy);
-        } else {
-            return null;
-        }
-    }
-
-    private void retrievePartition(StreamPartition partition) {
-        if (partition == null) {
-            return;
-        }
-
-        if (!effectivePartitions.containsKey(partition.getStreamId())) {
-            effectivePartitions.put(partition.getStreamId(), partition);
-        } else if (!effectivePartitions.get(partition.getStreamId()).equals(partition)) {
-            StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());
-            // If same Type & Columns but different sort spec, then use larger
-            if (existingPartition.getType().equals(partition.getType())
-                && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns())
-                && partition.getSortSpec().getWindowPeriodMillis() > existingPartition.getSortSpec().getWindowPeriodMillis()
-                || existingPartition.getType().equals(StreamPartition.Type.SHUFFLE)) {
-                effectivePartitions.put(partition.getStreamId(), partition);
-            } else {
-                // Throw exception as it unable to conflict effectivePartitions on same stream will not be able to run in distributed mode
-                throw new ExecutionPlanValidationException("You have incompatible partitions on stream " + partition.getStreamId()
-                    + ": [1] " + effectivePartitions.get(partition.getStreamId()).toString() + " [2] " + partition.toString() + "");
-            }
-        }
-    }
-
-    private void retrieveAliasForQuery(SingleInputStream inputStream, Map<String, SingleInputStream> aliasStreamMapping) {
-        if (inputStream.getStreamReferenceId() != null) {
-            if (aliasStreamMapping.containsKey(inputStream.getStreamReferenceId())) {
-                throw new ExecutionPlanValidationException("Duplicated stream alias " + inputStream.getStreamId() + " -> " + inputStream);
-            } else {
-                aliasStreamMapping.put(inputStream.getStreamReferenceId(), inputStream);
-            }
-        }
-    }
-
-    private StreamPartition generatePartition(String streamId, List<Window> windows, List<Variable> groupBy) {
-        StreamPartition partition = new StreamPartition();
-        partition.setStreamId(streamId);
-        StreamSortSpec sortSpec = null;
-        if (windows != null && windows.size() > 0) {
-            for (Window window : windows) {
-                if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) {
-                    sortSpec = new StreamSortSpec();
-                    sortSpec.setWindowPeriodMillis(getExternalTimeWindowSize(window));
-                    sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 5);
-                }
-            }
-        }
-        partition.setSortSpec(sortSpec);
-        if (groupBy != null && groupBy.size() > 0) {
-            partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
-            partition.setType(StreamPartition.Type.GROUPBY);
-        } else {
-            partition.setType(StreamPartition.Type.SHUFFLE);
-        }
-        return partition;
-    }
-
-    private static int getExternalTimeWindowSize(Window window) {
-        Expression windowSize = window.getParameters()[1];
-        if (windowSize instanceof TimeConstant) {
-            return ((TimeConstant) windowSize).getValue().intValue();
-        } else if (windowSize instanceof IntConstant) {
-            return ((IntConstant) windowSize).getValue();
-        } else if (windowSize instanceof LongConstant) {
-            return ((LongConstant) windowSize).getValue().intValue();
-        } else {
-            throw new UnsupportedOperationException("Illegal type of window size expression:" + windowSize.toString());
-        }
-    }
-
-    private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
-        return outputAttributeList.stream().map(outputAttribute -> {
-            StreamColumn streamColumn = new StreamColumn();
-            streamColumn.setName(outputAttribute.getRename());
-            streamColumn.setDescription(outputAttribute.getExpression().toString());
-            return streamColumn;
-        }).collect(Collectors.toList());
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
deleted file mode 100644
index 4add3ff..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-import com.google.common.base.Preconditions;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * PolicyInterpreter Helper Methods:
- * <ul>
- * <li>Parse: parse siddhi query and generate static execution plan</li>
- * <li>Validate: validate policy definition with execution plan and metadata</li>
- * </ul>
- *
- * @see PolicyExecutionPlanner
- * @see <a href="https://docs.wso2.com/display/CEP300/WSO2+Complex+Event+Processor+Documentation">WSO2 Complex Event Processor Documentation</a>
- */
-public class PolicyInterpreter {
-    private static final Logger LOG = LoggerFactory.getLogger(PolicyInterpreter.class);
-
-    /**
-     * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
-     */
-    private static final String WINDOW_EXTERNAL_TIME = "externalTime";
-
-    public static PolicyParseResult parse(String executionPlanQuery) {
-        PolicyParseResult policyParseResult = new PolicyParseResult();
-        try {
-            policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
-            policyParseResult.setSuccess(true);
-            policyParseResult.setMessage("Parsed successfully");
-        } catch (Exception exception) {
-            LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
-            policyParseResult.setSuccess(false);
-            policyParseResult.setMessage(exception.getMessage());
-            policyParseResult.setStackTrace(exception);
-        }
-        return policyParseResult;
-    }
-
-    /**
-     * Quick parseExecutionPlan policy.
-     */
-    public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
-        // Validate inputStreams are valid
-        Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
-        return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
-    }
-
-    public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
-        return PolicyExecutionPlanner.parseExecutionPlan(executionPlanQuery);
-    }
-
-    public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
-        Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
-        PolicyValidationResult policyValidationResult = new PolicyValidationResult();
-        policyValidationResult.setPolicyDefinition(policy);
-        try {
-            if (policy.getInputStreams() != null) {
-                for (String streamId : policy.getInputStreams()) {
-                    if (allDefinitions.containsKey(streamId)) {
-                        inputDefinitions.put(streamId, allDefinitions.get(streamId));
-                    } else {
-                        throw new StreamNotDefinedException(streamId);
-                    }
-                }
-            }
-
-            PolicyExecutionPlan policyExecutionPlan = null;
-            if (PolicyStreamHandlers.SIDDHI_ENGINE.equalsIgnoreCase(policy.getDefinition().getType())) {
-                policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
-                // Validate output
-                if (policy.getOutputStreams() != null) {
-                    for (String outputStream : policy.getOutputStreams()) {
-                        if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
-                            throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
-                        }
-                    }
-                }
-            }
-            policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
-            policyValidationResult.setSuccess(true);
-            policyValidationResult.setMessage("Validated successfully");
-        } catch (Exception exception) {
-            LOG.error("Got error to validate policy definition: {}", policy, exception);
-            policyValidationResult.setSuccess(false);
-            policyValidationResult.setMessage(exception.getMessage());
-            policyValidationResult.setStackTrace(exception);
-        }
-
-        return policyValidationResult;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
deleted file mode 100644
index a0f3ad2..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyParseResult {
-    private boolean success;
-    private String message;
-    private String exception;
-
-    private PolicyExecutionPlan policyExecutionPlan;
-
-    public String getException() {
-        return exception;
-    }
-
-    public void setException(String exception) {
-        this.exception = exception;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-
-    public void setMessage(String message) {
-        this.message = message;
-    }
-
-    public void setStackTrace(Throwable throwable) {
-        this.setException(ExceptionUtils.getStackTrace(throwable));
-    }
-
-    public boolean isSuccess() {
-        return success;
-    }
-
-    public void setSuccess(boolean success) {
-        this.success = success;
-    }
-
-    public PolicyExecutionPlan getPolicyExecutionPlan() {
-        return policyExecutionPlan;
-    }
-
-    public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
-        this.policyExecutionPlan = policyExecutionPlan;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
deleted file mode 100644
index 17f6091..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyValidationResult {
-    private boolean success;
-    private String message;
-    private String exception;
-
-    private PolicyExecutionPlan policyExecutionPlan;
-    private PolicyDefinition policyDefinition;
-
-    public String getException() {
-        return exception;
-    }
-
-    public void setException(String exception) {
-        this.exception = exception;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-
-    public void setMessage(String message) {
-        this.message = message;
-    }
-
-    public void setStackTrace(Throwable throwable) {
-        this.setException(ExceptionUtils.getStackTrace(throwable));
-    }
-
-    public boolean isSuccess() {
-        return success;
-    }
-
-    public void setSuccess(boolean success) {
-        this.success = success;
-    }
-
-    public PolicyExecutionPlan getPolicyExecutionPlan() {
-        return policyExecutionPlan;
-    }
-
-    public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
-        this.policyExecutionPlan = policyExecutionPlan;
-    }
-
-    public PolicyDefinition getPolicyDefinition() {
-        return policyDefinition;
-    }
-
-    public void setPolicyDefinition(PolicyDefinition policyDefinition) {
-        this.policyDefinition = policyDefinition;
-    }
-}
\ No newline at end of file