You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/19 03:28:05 UTC

[50/50] incubator-eagle git commit: [EAGLE-635] Refactor policy parser and validator for richer plan details and better performance

[EAGLE-635] Refactor policy parser and validator for richer plan details and better performance

## Changes

* Refactor policy parser and validator for richer plan details and better performance
* Decouple PolicyExecutionPlan and PolicyValidation

## API
* Parse API
~~~
POST /metadata/policies/parse
Accept-Type: text

from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.timeBatch(2 min) select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT"
~~~
* Validation API
~~~
POST /metadata/policies/validate
Accept-Type: application/json

{
   "name": "hdfsPolicy",
   "description": "hdfsPolicy",
   "inputStreams": [
      "HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX"
   ],
   "outputStreams": [
      "HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT"
   ],
   "definition": {
      "type": "siddhi",
      "value": "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.timeBatch(2 min) select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT "
   },
   "partitionSpec": [
      {
         "streamId": "hdfs_audit_log_enriched_stream",
         "type": "GROUPBY",
         "columns" : [
            "cmd"
         ]
      }
   ],
   "parallelismHint": 2
}

~~~

Author: Hao Chen <ha...@apache.org>

Closes #529 from haoch/RefactorPolicyValidator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/6dbdb4f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/6dbdb4f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/6dbdb4f7

Branch: refs/heads/master
Commit: 6dbdb4f72ec5adf89f4f7a13bf766e01e3ff6705
Parents: adc2ba9
Author: Hao Chen <ha...@apache.org>
Authored: Wed Oct 19 10:28:09 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Oct 19 10:28:09 2016 +0800

----------------------------------------------------------------------
 .../conf/sandbox-userprofile-scheduler.conf     |   2 +-
 .../eagle-alert/alert-common/pom.xml            |   8 -
 .../eagle-alert/alert-engine/pom.xml            |  10 +-
 .../evaluator/impl/SiddhiDefinitionAdapter.java |  13 +
 .../alert-metadata-service/pom.xml              |  13 -
 .../metadata/resource/MetadataResource.java     |  10 +-
 .../metadata/resource/PolicyCompiler.java       | 235 +++++++++++++++++++
 .../metadata/resource/PolicyExecutionPlan.java  | 100 ++++++++
 .../metadata/resource/PolicyParseResult.java    |  65 +++++
 .../metadata/resource/PolicyValidation.java     |  97 --------
 .../resource/PolicyValidationResult.java        |  76 ++++++
 .../metadata/resource/PolicyValidator.java      | 124 ----------
 .../metadata/resource/PolicyCompilerTest.java   | 195 +++++++++++++++
 .../metadata/resource/PolicyValidatorTest.java  | 187 ---------------
 14 files changed, 699 insertions(+), 436 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-assembly/src/main/conf/sandbox-userprofile-scheduler.conf
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/conf/sandbox-userprofile-scheduler.conf b/eagle-assembly/src/main/conf/sandbox-userprofile-scheduler.conf
index f54162d..4593a7e 100644
--- a/eagle-assembly/src/main/conf/sandbox-userprofile-scheduler.conf
+++ b/eagle-assembly/src/main/conf/sandbox-userprofile-scheduler.conf
@@ -63,4 +63,4 @@ akka {
   # Filter of log events that is used by the LoggingAdapter before
   # publishing log events to the eventStream.
   logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/pom.xml
index 57a0157..fd3eccd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/pom.xml
@@ -63,10 +63,6 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-framework</artifactId>
         </dependency>
@@ -82,10 +78,6 @@
             <groupId>com.fasterxml.jackson.jaxrs</groupId>
             <artifactId>jackson-jaxrs-json-provider</artifactId>
         </dependency>
-        <!--<dependency>-->
-        <!--<groupId>org.codehaus.jackson</groupId>-->
-        <!--<artifactId>jackson-jaxrs</artifactId>-->
-        <!--</dependency>-->
         <dependency>
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
index 1ea6088..649125c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
@@ -41,15 +41,17 @@
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>${kafka.artifact.id}</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.sun.jersey</groupId>
             <artifactId>jersey-client</artifactId>
         </dependency>
-        <!--<dependency>-->
-        <!--<groupId>org.codehaus.jackson</groupId>-->
-        <!--<artifactId>jackson-jaxrs</artifactId>-->
-        <!--</dependency>-->
         <dependency>
             <groupId>com.netflix.archaius</groupId>
             <artifactId>archaius-core</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
index 3645dcf..114c40a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
@@ -102,6 +102,19 @@ public class SiddhiDefinitionAdapter {
         return builder.toString();
     }
 
+    public static String buildSiddhiExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) {
+        StringBuilder builder = new StringBuilder();
+        for (Map.Entry<String,StreamDefinition> entry: inputStreamDefinitions.entrySet()) {
+            builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(entry.getValue()));
+            builder.append("\n");
+        }
+        builder.append(policyDefinition);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Generated siddhi execution plan: {}", builder.toString());
+        }
+        return builder.toString();
+    }
+
     /**
      * public enum Type {
      * STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
index 0518a15..df2f803 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
@@ -63,19 +63,6 @@
             <groupId>com.sun.jersey</groupId>
             <artifactId>jersey-client</artifactId>
         </dependency>
-
-        <!--<dependency>-->
-        <!--<groupId>org.codehaus.jackson</groupId>-->
-        <!--<artifactId>jackson-mapper-asl</artifactId>-->
-        <!--</dependency>-->
-        <!--<dependency>-->
-        <!--<groupId>org.codehaus.jackson</groupId>-->
-        <!--<artifactId>jackson-jaxrs</artifactId>-->
-        <!--</dependency>-->
-        <!--<dependency>-->
-        <!--<groupId>org.codehaus.jackson</groupId>-->
-        <!--<artifactId>jackson-xc</artifactId>-->
-        <!--</dependency>-->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 32a978a..eb23362 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -206,8 +206,14 @@ public class MetadataResource {
 
     @Path("/policies/validate")
     @POST
-    public PolicyValidation validatePolicy(PolicyDefinition policy) {
-        return PolicyValidator.validate(policy,dao);
+    public PolicyValidationResult validatePolicy(PolicyDefinition policy) {
+        return PolicyCompiler.validate(policy,dao);
+    }
+
+    @Path("/policies/parse")
+    @POST
+    public PolicyParseResult parsePolicy(String policyDefinition) {
+        return PolicyCompiler.parse(policyDefinition);
     }
 
     @Path("/policies/batch")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
new file mode 100644
index 0000000..6e3938d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import com.google.common.base.Preconditions;
+import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.query.api.ExecutionPlan;
+import org.wso2.siddhi.query.api.execution.ExecutionElement;
+import org.wso2.siddhi.query.api.execution.query.Query;
+import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
+import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
+import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.JoinInputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.StateInputStream;
+import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
+import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
+import org.wso2.siddhi.query.api.execution.query.selection.Selector;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
+import org.wso2.siddhi.query.compiler.SiddhiCompiler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class PolicyCompiler {
+    private static final Logger LOG = LoggerFactory.getLogger(PolicyCompiler.class);
+
+
+    public static PolicyParseResult parse(String executionPlanQuery) {
+        PolicyParseResult policyParseResult = new PolicyParseResult();
+        try {
+            policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
+            policyParseResult.setSuccess(true);
+            policyParseResult.setMessage("Parsed successfully");
+        } catch (Exception exception) {
+            LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
+            policyParseResult.setSuccess(false);
+            policyParseResult.setMessage(exception.getMessage());
+            policyParseResult.setStackTrace(exception);
+        }
+        return policyParseResult;
+    }
+
+    /**
+     * Quick parseExecutionPlan policy.
+     */
+    public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
+        // Validate inputStreams are valid
+        Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
+        return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
+    }
+
+    public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
+        PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
+        try {
+            ExecutionPlan executionPlan = SiddhiCompiler.parse(executionPlanQuery);
+
+            policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
+
+            // Set current execution plan as valid
+            policyExecutionPlan.setExecutionPlanSource(executionPlanQuery);
+            policyExecutionPlan.setInternalExecutionPlan(executionPlan);
+
+            Map<String, List<StreamColumn>> actualInputStreams = new HashMap<>();
+            Map<String, List<StreamColumn>> actualOutputStreams = new HashMap<>();
+            List<StreamPartition> partitions = new ArrayList<>();
+
+            // Go through execution element
+            for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
+                if (executionElement instanceof Query) {
+                    // -------------
+                    // Explain Query
+                    // -------------
+
+                    // Input streams
+                    InputStream inputStream = ((Query) executionElement).getInputStream();
+                    Selector selector = ((Query) executionElement).getSelector();
+
+                    for (String streamId : inputStream.getUniqueStreamIds()) {
+                        if (!actualInputStreams.containsKey(streamId)) {
+                            org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
+                            if (streamDefinition != null) {
+                                actualInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
+                            } else {
+                                actualInputStreams.put(streamId, null);
+                            }
+                        }
+                    }
+
+                    // Window Spec and Partition
+                    if (inputStream instanceof SingleInputStream) {
+                        // Window Spec
+                        List<Window> windows = new ArrayList<>();
+                        for (StreamHandler streamHandler : ((SingleInputStream) inputStream).getStreamHandlers()) {
+                            if (streamHandler instanceof Window) {
+                                windows.add((Window) streamHandler);
+                            }
+                        }
+
+                        // Group By Spec
+                        List<Variable> groupBy = selector.getGroupByList();
+
+                        if (windows.size() > 0 || groupBy.size() >= 0) {
+                            partitions.add(convertSingleStreamWindowAndGroupByToPartition(((SingleInputStream) inputStream).getStreamId(), windows, groupBy));
+                        }
+                    }
+                    //    else if(inputStream instanceof JoinInputStream) {
+                    //        // TODO: Parse multiple stream join
+                    //    } else if(inputStream instanceof StateInputStream) {
+                    //        // TODO: Parse StateInputStream
+                    //    }
+
+                    // Output streams
+                    OutputStream outputStream = ((Query) executionElement).getOutputStream();
+                    actualOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
+                } else {
+                    LOG.warn("Unhandled execution element: {}", executionElement.toString());
+                }
+            }
+            // Set used input streams
+            policyExecutionPlan.setInputStreams(actualInputStreams);
+
+            // Set Partitions
+            policyExecutionPlan.setStreamPartitions(partitions);
+
+            // Validate outputStreams
+            policyExecutionPlan.setOutputStreams(actualOutputStreams);
+        } catch (Exception ex) {
+            LOG.error("Got error to parseExecutionPlan policy execution plan: \n{}", executionPlanQuery, ex);
+            throw ex;
+        }
+        return policyExecutionPlan;
+    }
+
+    private static StreamPartition convertSingleStreamWindowAndGroupByToPartition(String streamId, List<Window> windows, List<Variable> groupBy) {
+        StreamPartition partition = new StreamPartition();
+        partition.setStreamId(streamId);
+        StreamSortSpec sortSpec = null;
+
+        if (windows.size() > 0) {
+            sortSpec = new StreamSortSpec();
+            for (Window window : windows) {
+                if (window.getFunction().equals("timeBatch")) {
+                    sortSpec.setWindowPeriodMillis(((TimeConstant) window.getParameters()[0]).getValue().intValue());
+                    sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 3);
+                }
+            }
+        }
+        partition.setSortSpec(sortSpec);
+        if (groupBy.size() > 0) {
+            partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
+            partition.setType(StreamPartition.Type.GROUPBY);
+        } else {
+            partition.setType(StreamPartition.Type.SHUFFLE);
+        }
+        return partition;
+    }
+
+    public static PolicyValidationResult validate(PolicyDefinition policy, IMetadataDao metadataDao) {
+        Map<String, StreamDefinition> allDefinitions = new HashMap<>();
+        for (StreamDefinition definition : metadataDao.listStreams()) {
+            allDefinitions.put(definition.getStreamId(), definition);
+        }
+        return validate(policy, allDefinitions);
+    }
+
+    public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
+        Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
+        PolicyValidationResult policyValidationResult = new PolicyValidationResult();
+        policyValidationResult.setPolicyDefinition(policy);
+        try {
+            if (policy.getInputStreams() != null) {
+                for (String streamId : policy.getInputStreams()) {
+                    if (allDefinitions.containsKey(streamId)) {
+                        inputDefinitions.put(streamId, allDefinitions.get(streamId));
+                    } else {
+                        throw new StreamNotDefinedException(streamId);
+                    }
+                }
+            }
+
+            PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
+            // Validate output
+            if (policy.getOutputStreams() != null) {
+                for (String outputStream : policy.getOutputStreams()) {
+                    if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
+                        throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
+                    }
+                }
+            }
+            policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
+            policyValidationResult.setSuccess(true);
+            policyValidationResult.setMessage("Validated successfully");
+        } catch (Exception exception) {
+            LOG.error("Got error to validate policy definition: {}", policy, exception);
+            policyValidationResult.setSuccess(false);
+            policyValidationResult.setMessage(exception.getMessage());
+            policyValidationResult.setStackTrace(exception);
+        }
+
+        return policyValidationResult;
+    }
+
+    private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
+        return outputAttributeList.stream().map(outputAttribute -> {
+            StreamColumn streamColumn = new StreamColumn();
+            streamColumn.setName(outputAttribute.getRename());
+            streamColumn.setDescription(outputAttribute.getExpression().toString());
+            return streamColumn;
+        }).collect(Collectors.toList());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
new file mode 100644
index 0000000..f925e3d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.wso2.siddhi.query.api.ExecutionPlan;
+
+import java.util.List;
+import java.util.Map;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyExecutionPlan {
+    /**
+     * Actual input streams.
+     */
+    private Map<String, List<StreamColumn>> inputStreams;
+
+    /**
+     * Actual output streams.
+     */
+    private Map<String, List<StreamColumn>> outputStreams;
+
+    /**
+     * Execution plan source.
+     */
+    private String executionPlanSource;
+
+    /**
+     * Execution plan.
+     */
+    private ExecutionPlan internalExecutionPlan;
+
+    private String executionPlanDesc;
+
+    private List<StreamPartition> streamPartitions;
+
+    public String getExecutionPlanSource() {
+        return executionPlanSource;
+    }
+
+    public void setExecutionPlanSource(String executionPlanSource) {
+        this.executionPlanSource = executionPlanSource;
+    }
+
+    public ExecutionPlan getInternalExecutionPlan() {
+        return internalExecutionPlan;
+    }
+
+    public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) {
+        this.internalExecutionPlan = internalExecutionPlan;
+    }
+
+    public String getExecutionPlanDesc() {
+        return executionPlanDesc;
+    }
+
+    public void setExecutionPlanDesc(String executionPlanDesc) {
+        this.executionPlanDesc = executionPlanDesc;
+    }
+
+    public List<StreamPartition> getStreamPartitions() {
+        return streamPartitions;
+    }
+
+    public void setStreamPartitions(List<StreamPartition> streamPartitions) {
+        this.streamPartitions = streamPartitions;
+    }
+
+    public Map<String, List<StreamColumn>> getInputStreams() {
+        return inputStreams;
+    }
+
+    public void setInputStreams(Map<String, List<StreamColumn>> inputStreams) {
+        this.inputStreams = inputStreams;
+    }
+
+    public Map<String, List<StreamColumn>> getOutputStreams() {
+        return outputStreams;
+    }
+
+    public void setOutputStreams(Map<String, List<StreamColumn>> outputStreams) {
+        this.outputStreams = outputStreams;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
new file mode 100644
index 0000000..2522270
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyParseResult {
+    private boolean success;
+    private String message;
+    private String exception;
+
+    private PolicyExecutionPlan policyExecutionPlan;
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(String exception) {
+        this.exception = exception;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public void setStackTrace(Throwable throwable) {
+        this.setException(ExceptionUtils.getStackTrace(throwable));
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public PolicyExecutionPlan getPolicyExecutionPlan() {
+        return policyExecutionPlan;
+    }
+
+    public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
+        this.policyExecutionPlan = policyExecutionPlan;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidation.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidation.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidation.java
deleted file mode 100644
index 4b69a35..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidation.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-
-import java.util.Map;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyValidation {
-    private boolean success;
-    private String message;
-    private String exception;
-
-    private Map<String, StreamDefinition> validInputStreams;
-    private Map<String, StreamDefinition> validOutputStreams;
-    private PolicyDefinition policyDefinition;
-    private String validExecutionPlan;
-
-    public boolean isSuccess() {
-        return success;
-    }
-
-    public void setSuccess(boolean success) {
-        this.success = success;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-
-    public void setMessage(String message) {
-        this.message = message;
-    }
-
-
-    public String getException() {
-        return exception;
-    }
-
-    public void setException(String exception) {
-        this.exception = exception;
-    }
-
-    public void setStackTrace(Throwable throwable) {
-        this.exception = ExceptionUtils.getStackTrace(throwable);
-    }
-
-    public Map<String, StreamDefinition> getValidOutputStreams() {
-        return validOutputStreams;
-    }
-
-    public void setValidOutputStreams(Map<String, StreamDefinition> validOutputStreams) {
-        this.validOutputStreams = validOutputStreams;
-    }
-
-    public Map<String, StreamDefinition> getValidInputStreams() {
-        return validInputStreams;
-    }
-
-    public void setValidInputStreams(Map<String, StreamDefinition> validInputStreams) {
-        this.validInputStreams = validInputStreams;
-    }
-
-    public PolicyDefinition getPolicyDefinition() {
-        return policyDefinition;
-    }
-
-    public void setPolicyDefinition(PolicyDefinition policyDefinition) {
-        this.policyDefinition = policyDefinition;
-    }
-
-    public String getValidExecutionPlan() {
-        return validExecutionPlan;
-    }
-
-    public void setValidExecutionPlan(String validExecutionPlan) {
-        this.validExecutionPlan = validExecutionPlan;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
new file mode 100644
index 0000000..b1912af
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyValidationResult {
+    private boolean success;
+    private String message;
+    private String exception;
+
+    private PolicyExecutionPlan policyExecutionPlan;
+    private PolicyDefinition policyDefinition;
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(String exception) {
+        this.exception = exception;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public void setStackTrace(Throwable throwable) {
+        this.setException(ExceptionUtils.getStackTrace(throwable));
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public PolicyExecutionPlan getPolicyExecutionPlan() {
+        return policyExecutionPlan;
+    }
+
+    public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
+        this.policyExecutionPlan = policyExecutionPlan;
+    }
+
+    public PolicyDefinition getPolicyDefinition() {
+        return policyDefinition;
+    }
+
+    public void setPolicyDefinition(PolicyDefinition policyDefinition) {
+        this.policyDefinition = policyDefinition;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidator.java
deleted file mode 100644
index aef6aa8..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidator.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.compiler.exception.SiddhiParserException;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class PolicyValidator {
-    private static final Logger LOG = LoggerFactory.getLogger(PolicyValidator.class);
-
-    public static PolicyValidation validate(PolicyDefinition policy, Map<String, StreamDefinition> allStreamDefinitions) {
-        PolicyValidation policyValidation = new PolicyValidation();
-        policyValidation.setPolicyDefinition(policy);
-
-        SiddhiManager siddhiManager = null;
-        ExecutionPlanRuntime executionRuntime = null;
-        String executionPlan = null;
-
-        try {
-            // Validate inputStreams are valid
-            Preconditions.checkNotNull(policy.getInputStreams(), "No inputStreams to connect from");
-            Map<String, StreamDefinition> currentDefinitions = new HashMap<>();
-            for (String streamId : policy.getInputStreams()) {
-                if (allStreamDefinitions.containsKey(streamId)) {
-                    currentDefinitions.put(streamId, allStreamDefinitions.get(streamId));
-                } else {
-                    throw new StreamNotDefinedException(streamId);
-                }
-            }
-
-            // Build final execution plan
-            executionPlan = SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policy, currentDefinitions);
-            siddhiManager = new SiddhiManager();
-            executionRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-
-            // Set current execution plan as valid
-            policyValidation.setValidExecutionPlan(executionPlan);
-
-            // Siddhi runtime active stream definitions
-            Map<String, AbstractDefinition> definitionMap = executionRuntime.getStreamDefinitionMap();
-
-            Map<String, StreamDefinition> validInputStreams = new HashMap<>();
-            Map<String, StreamDefinition> validOutputStreams = new HashMap<>();
-
-            for (Map.Entry<String, AbstractDefinition> entry : definitionMap.entrySet()) {
-                if (currentDefinitions.containsKey(entry.getKey())) {
-                    validInputStreams.put(entry.getKey(), currentDefinitions.get(entry.getKey()));
-                } else {
-                    validOutputStreams.put(entry.getKey(), SiddhiDefinitionAdapter.convertFromSiddiDefinition(entry.getValue()));
-                }
-            }
-            policyValidation.setValidInputStreams(validInputStreams);
-
-            // Validate outputStreams
-            policyValidation.setValidOutputStreams(validOutputStreams);
-            if (policy.getOutputStreams() != null) {
-                for (String outputStream : policy.getOutputStreams()) {
-                    if (!validOutputStreams.containsKey(outputStream)) {
-                        throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
-                    }
-                }
-            }
-
-            // TODO: Validate partitions
-
-            policyValidation.setSuccess(true);
-            policyValidation.setMessage("Validation success");
-        } catch (SiddhiParserException parserException) {
-            LOG.error("Got error to parse policy execution plan: \n{}", executionPlan, parserException);
-            policyValidation.setSuccess(false);
-            policyValidation.setMessage("Parser Error: " + parserException.getMessage());
-            policyValidation.setStackTrace(parserException);
-        } catch (Exception exception) {
-            LOG.error("Got Error to validate policy definition", exception);
-            policyValidation.setSuccess(false);
-            policyValidation.setMessage("Validation Error: " + exception.getMessage());
-            policyValidation.setStackTrace(exception);
-        } finally {
-            if (executionRuntime != null) {
-                executionRuntime.shutdown();
-            }
-            if (siddhiManager != null) {
-                siddhiManager.shutdown();
-            }
-        }
-        return policyValidation;
-    }
-
-    public static PolicyValidation validate(PolicyDefinition policy, IMetadataDao metadataDao) {
-        Map<String, StreamDefinition> allDefinitions = new HashMap<>();
-        for (StreamDefinition definition : metadataDao.listStreams()) {
-            allDefinitions.put(definition.getStreamId(), definition);
-        }
-        return validate(policy, allDefinitions);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
new file mode 100644
index 0000000..682de4c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class PolicyCompilerTest {
+    @Test
+    public void parseFullPolicyQuery() throws Exception {
+        PolicyExecutionPlan executionPlan = PolicyCompiler.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.timeBatch(2 min) "
+            + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT");
+        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
+        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]);
+    }
+
+    @Test
+    public void testValidPolicy() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1#window.timeBatch(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+                put("INPUT_STREAM_3", createStreamDefinition("INPUT_STREAM_3"));
+                put("INPUT_STREAM_4", createStreamDefinition("INPUT_STREAM_4"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+    }
+
+    @Test
+    public void testValidPolicyWithTooManyInputStreams() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+    }
+
+    @Test
+    public void testValidPolicyWithTooFewOutputStreams() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue(
+            "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
+                + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
+        );
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
+    }
+
+    @Test
+    public void testInvalidPolicyForSyntaxError() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM", createStreamDefinition("INPUT_STREAM"));
+            }
+        });
+        Assert.assertFalse(validation.isSuccess());
+    }
+
+    @Test
+    public void testInvalidPolicyForNotDefinedInputStream() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+            }
+        });
+        Assert.assertFalse(validation.isSuccess());
+    }
+
+    @Test
+    public void testInvalidPolicyForNotDefinedOutputStream() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+            }
+        });
+        Assert.assertFalse(validation.isSuccess());
+    }
+
+    // --------------
+    // Helper Methods
+    // --------------
+
+    private static StreamDefinition createStreamDefinition(String streamId) {
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setStreamId(streamId);
+        List<StreamColumn> columns = new ArrayList<>();
+        columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        streamDefinition.setColumns(columns);
+        return streamDefinition;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyValidatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyValidatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyValidatorTest.java
deleted file mode 100644
index b9a1b23..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyValidatorTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.*;
-
-public class PolicyValidatorTest {
-    @Test
-    public void testValidPolicy() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-                put("INPUT_STREAM_3", createStreamDefinition("INPUT_STREAM_3"));
-                put("INPUT_STREAM_4", createStreamDefinition("INPUT_STREAM_4"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getValidInputStreams().size());
-        Assert.assertEquals(1, validation.getValidOutputStreams().size());
-    }
-
-    @Test
-    public void testValidPolicyWithTooManyInputStreams() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(2, validation.getValidInputStreams().size());
-        Assert.assertEquals(1, validation.getValidOutputStreams().size());
-    }
-
-    @Test
-    public void testValidPolicyWithTooFewOutputStreams() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue(
-            "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
-                + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
-        );
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(2, validation.getValidInputStreams().size());
-        Assert.assertEquals(2, validation.getValidOutputStreams().size());
-    }
-
-    @Test
-    public void testInvalidPolicyForSyntaxError() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM", createStreamDefinition("INPUT_STREAM"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    @Test
-    public void testInvalidPolicyForNotDefinedInputStream() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    @Test
-    public void testInvalidPolicyForNotDefinedOutputStream() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    // --------------
-    // Helper Methods
-    // --------------
-
-    private static StreamDefinition createStreamDefinition(String streamId) {
-        StreamDefinition streamDefinition = new StreamDefinition();
-        streamDefinition.setStreamId(streamId);
-        List<StreamColumn> columns = new ArrayList<>();
-        columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
-        columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
-        streamDefinition.setColumns(columns);
-        return streamDefinition;
-    }
-}
\ No newline at end of file