You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/19 09:15:17 UTC

incubator-eagle git commit: [EAGLE-639] Generate sortSpec only on externalWindow

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 453c3a5fa -> 6f88c30f8


[EAGLE-639] Generate sortSpec only on externalWindow

Generate sortSpec on externalWindow

https://issues.apache.org/jira/browse/EAGLE-639

Author: Hao Chen <ha...@apache.org>

Closes #532 from haoch/EnhancePolicyIntepreter.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/6f88c30f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/6f88c30f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/6f88c30f

Branch: refs/heads/master
Commit: 6f88c30f855249c401a40efc6941090a152670f3
Parents: 453c3a5
Author: Hao Chen <ha...@apache.org>
Authored: Wed Oct 19 17:15:06 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Oct 19 17:15:06 2016 +0800

----------------------------------------------------------------------
 .../engine/sorter/StreamSortHandlerTest.java    |   7 +-
 .../metadata/resource/MetadataResource.java     |   4 +-
 .../metadata/resource/PolicyCompiler.java       | 235 ------------------
 .../metadata/resource/PolicyInterpreter.java    | 244 +++++++++++++++++++
 .../metadata/resource/PolicyCompilerTest.java   | 195 ---------------
 .../resource/PolicyInterpreterTest.java         | 226 +++++++++++++++++
 6 files changed, 476 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
index 98657e7..885017a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
@@ -32,6 +32,7 @@ import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory;
 import org.apache.eagle.alert.utils.DateTimeUtil;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -135,7 +136,7 @@ public class StreamSortHandlerTest {
      *
      * @throws InterruptedException
      */
-    @Test
+    @Test @Ignore("Igored heavy benchmark test in unit test")
     public void testWithUnsortedEventsInLargeWindowBenchmark() throws InterruptedException {
         metricReporter.report();
         testWithUnsortedEventsIn1hWindow(1000);
@@ -146,8 +147,8 @@ public class StreamSortHandlerTest {
         metricReporter.report();
         testWithUnsortedEventsIn1hWindow(1000000);
         metricReporter.report();
-//        testWithUnsortedEventsIn1hWindow(10000000);
-//        metricReporter.report();
+        testWithUnsortedEventsIn1hWindow(10000000);
+        metricReporter.report();
     }
 
     public void testWithUnsortedEventsIn1hWindow(int count) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index eb23362..190da2a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -207,13 +207,13 @@ public class MetadataResource {
     @Path("/policies/validate")
     @POST
     public PolicyValidationResult validatePolicy(PolicyDefinition policy) {
-        return PolicyCompiler.validate(policy,dao);
+        return PolicyInterpreter.validate(policy,dao);
     }
 
     @Path("/policies/parse")
     @POST
     public PolicyParseResult parsePolicy(String policyDefinition) {
-        return PolicyCompiler.parse(policyDefinition);
+        return PolicyInterpreter.parse(policyDefinition);
     }
 
     @Path("/policies/batch")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
deleted file mode 100644
index 6e3938d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import com.google.common.base.Preconditions;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.query.api.ExecutionPlan;
-import org.wso2.siddhi.query.api.execution.ExecutionElement;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
-import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
-import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
-import org.wso2.siddhi.query.api.execution.query.input.stream.JoinInputStream;
-import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
-import org.wso2.siddhi.query.api.execution.query.input.stream.StateInputStream;
-import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-import org.wso2.siddhi.query.api.execution.query.selection.Selector;
-import org.wso2.siddhi.query.api.expression.Variable;
-import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
-import org.wso2.siddhi.query.compiler.SiddhiCompiler;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class PolicyCompiler {
-    private static final Logger LOG = LoggerFactory.getLogger(PolicyCompiler.class);
-
-
-    public static PolicyParseResult parse(String executionPlanQuery) {
-        PolicyParseResult policyParseResult = new PolicyParseResult();
-        try {
-            policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
-            policyParseResult.setSuccess(true);
-            policyParseResult.setMessage("Parsed successfully");
-        } catch (Exception exception) {
-            LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
-            policyParseResult.setSuccess(false);
-            policyParseResult.setMessage(exception.getMessage());
-            policyParseResult.setStackTrace(exception);
-        }
-        return policyParseResult;
-    }
-
-    /**
-     * Quick parseExecutionPlan policy.
-     */
-    public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
-        // Validate inputStreams are valid
-        Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
-        return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
-    }
-
-    public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
-        PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
-        try {
-            ExecutionPlan executionPlan = SiddhiCompiler.parse(executionPlanQuery);
-
-            policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
-
-            // Set current execution plan as valid
-            policyExecutionPlan.setExecutionPlanSource(executionPlanQuery);
-            policyExecutionPlan.setInternalExecutionPlan(executionPlan);
-
-            Map<String, List<StreamColumn>> actualInputStreams = new HashMap<>();
-            Map<String, List<StreamColumn>> actualOutputStreams = new HashMap<>();
-            List<StreamPartition> partitions = new ArrayList<>();
-
-            // Go through execution element
-            for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
-                if (executionElement instanceof Query) {
-                    // -------------
-                    // Explain Query
-                    // -------------
-
-                    // Input streams
-                    InputStream inputStream = ((Query) executionElement).getInputStream();
-                    Selector selector = ((Query) executionElement).getSelector();
-
-                    for (String streamId : inputStream.getUniqueStreamIds()) {
-                        if (!actualInputStreams.containsKey(streamId)) {
-                            org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
-                            if (streamDefinition != null) {
-                                actualInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
-                            } else {
-                                actualInputStreams.put(streamId, null);
-                            }
-                        }
-                    }
-
-                    // Window Spec and Partition
-                    if (inputStream instanceof SingleInputStream) {
-                        // Window Spec
-                        List<Window> windows = new ArrayList<>();
-                        for (StreamHandler streamHandler : ((SingleInputStream) inputStream).getStreamHandlers()) {
-                            if (streamHandler instanceof Window) {
-                                windows.add((Window) streamHandler);
-                            }
-                        }
-
-                        // Group By Spec
-                        List<Variable> groupBy = selector.getGroupByList();
-
-                        if (windows.size() > 0 || groupBy.size() >= 0) {
-                            partitions.add(convertSingleStreamWindowAndGroupByToPartition(((SingleInputStream) inputStream).getStreamId(), windows, groupBy));
-                        }
-                    }
-                    //    else if(inputStream instanceof JoinInputStream) {
-                    //        // TODO: Parse multiple stream join
-                    //    } else if(inputStream instanceof StateInputStream) {
-                    //        // TODO: Parse StateInputStream
-                    //    }
-
-                    // Output streams
-                    OutputStream outputStream = ((Query) executionElement).getOutputStream();
-                    actualOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
-                } else {
-                    LOG.warn("Unhandled execution element: {}", executionElement.toString());
-                }
-            }
-            // Set used input streams
-            policyExecutionPlan.setInputStreams(actualInputStreams);
-
-            // Set Partitions
-            policyExecutionPlan.setStreamPartitions(partitions);
-
-            // Validate outputStreams
-            policyExecutionPlan.setOutputStreams(actualOutputStreams);
-        } catch (Exception ex) {
-            LOG.error("Got error to parseExecutionPlan policy execution plan: \n{}", executionPlanQuery, ex);
-            throw ex;
-        }
-        return policyExecutionPlan;
-    }
-
-    private static StreamPartition convertSingleStreamWindowAndGroupByToPartition(String streamId, List<Window> windows, List<Variable> groupBy) {
-        StreamPartition partition = new StreamPartition();
-        partition.setStreamId(streamId);
-        StreamSortSpec sortSpec = null;
-
-        if (windows.size() > 0) {
-            sortSpec = new StreamSortSpec();
-            for (Window window : windows) {
-                if (window.getFunction().equals("timeBatch")) {
-                    sortSpec.setWindowPeriodMillis(((TimeConstant) window.getParameters()[0]).getValue().intValue());
-                    sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 3);
-                }
-            }
-        }
-        partition.setSortSpec(sortSpec);
-        if (groupBy.size() > 0) {
-            partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
-            partition.setType(StreamPartition.Type.GROUPBY);
-        } else {
-            partition.setType(StreamPartition.Type.SHUFFLE);
-        }
-        return partition;
-    }
-
-    public static PolicyValidationResult validate(PolicyDefinition policy, IMetadataDao metadataDao) {
-        Map<String, StreamDefinition> allDefinitions = new HashMap<>();
-        for (StreamDefinition definition : metadataDao.listStreams()) {
-            allDefinitions.put(definition.getStreamId(), definition);
-        }
-        return validate(policy, allDefinitions);
-    }
-
-    public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
-        Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
-        PolicyValidationResult policyValidationResult = new PolicyValidationResult();
-        policyValidationResult.setPolicyDefinition(policy);
-        try {
-            if (policy.getInputStreams() != null) {
-                for (String streamId : policy.getInputStreams()) {
-                    if (allDefinitions.containsKey(streamId)) {
-                        inputDefinitions.put(streamId, allDefinitions.get(streamId));
-                    } else {
-                        throw new StreamNotDefinedException(streamId);
-                    }
-                }
-            }
-
-            PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
-            // Validate output
-            if (policy.getOutputStreams() != null) {
-                for (String outputStream : policy.getOutputStreams()) {
-                    if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
-                        throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
-                    }
-                }
-            }
-            policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
-            policyValidationResult.setSuccess(true);
-            policyValidationResult.setMessage("Validated successfully");
-        } catch (Exception exception) {
-            LOG.error("Got error to validate policy definition: {}", policy, exception);
-            policyValidationResult.setSuccess(false);
-            policyValidationResult.setMessage(exception.getMessage());
-            policyValidationResult.setStackTrace(exception);
-        }
-
-        return policyValidationResult;
-    }
-
-    private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
-        return outputAttributeList.stream().map(outputAttribute -> {
-            StreamColumn streamColumn = new StreamColumn();
-            streamColumn.setName(outputAttribute.getRename());
-            streamColumn.setDescription(outputAttribute.getExpression().toString());
-            return streamColumn;
-        }).collect(Collectors.toList());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
new file mode 100644
index 0000000..b97583c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import com.google.common.base.Preconditions;
+import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.query.api.ExecutionPlan;
+import org.wso2.siddhi.query.api.execution.ExecutionElement;
+import org.wso2.siddhi.query.api.execution.query.Query;
+import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
+import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
+import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
+import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
+import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
+import org.wso2.siddhi.query.api.execution.query.selection.Selector;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
+import org.wso2.siddhi.query.compiler.SiddhiCompiler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * PolicyInterpreter:
+ * <ul>
+ * <li>Parse: parse siddhi query and generate static execution plan</li>
+ * <li>Validate: validate policy definition with execution plan and metadata</li>
+ * </ul>
+ * @see <a href="https://docs.wso2.com/display/CEP300/WSO2+Complex+Event+Processor+Documentation">WSO2 Complex Event Processor Documentation</a>
+ */
+public class PolicyInterpreter {
+    private static final Logger LOG = LoggerFactory.getLogger(PolicyInterpreter.class);
+
+    /**
+     * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
+     */
+    private static final String WINDOW_EXTERNAL_TIME = "externalTime";
+
+    public static PolicyParseResult parse(String executionPlanQuery) {
+        PolicyParseResult policyParseResult = new PolicyParseResult();
+        try {
+            policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
+            policyParseResult.setSuccess(true);
+            policyParseResult.setMessage("Parsed successfully");
+        } catch (Exception exception) {
+            LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
+            policyParseResult.setSuccess(false);
+            policyParseResult.setMessage(exception.getMessage());
+            policyParseResult.setStackTrace(exception);
+        }
+        return policyParseResult;
+    }
+
+    /**
+     * Quick parseExecutionPlan policy.
+     */
+    public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
+        // Validate inputStreams are valid
+        Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
+        return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
+    }
+
+    public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
+        PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
+        try {
+            ExecutionPlan executionPlan = SiddhiCompiler.parse(executionPlanQuery);
+
+            policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
+
+            // Set current execution plan as valid
+            policyExecutionPlan.setExecutionPlanSource(executionPlanQuery);
+            policyExecutionPlan.setInternalExecutionPlan(executionPlan);
+
+            Map<String, List<StreamColumn>> actualInputStreams = new HashMap<>();
+            Map<String, List<StreamColumn>> actualOutputStreams = new HashMap<>();
+            List<StreamPartition> partitions = new ArrayList<>();
+
+            // Go through execution element
+            for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
+                if (executionElement instanceof Query) {
+                    // -------------
+                    // Explain Query
+                    // -------------
+
+                    // Input streams
+                    InputStream inputStream = ((Query) executionElement).getInputStream();
+                    Selector selector = ((Query) executionElement).getSelector();
+
+                    for (String streamId : inputStream.getUniqueStreamIds()) {
+                        if (!actualInputStreams.containsKey(streamId)) {
+                            org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
+                            if (streamDefinition != null) {
+                                actualInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
+                            } else {
+                                actualInputStreams.put(streamId, null);
+                            }
+                        }
+                    }
+
+                    // Window Spec and Partition
+                    if (inputStream instanceof SingleInputStream) {
+                        // Window Spec
+                        List<Window> windows = new ArrayList<>();
+                        for (StreamHandler streamHandler : ((SingleInputStream) inputStream).getStreamHandlers()) {
+                            if (streamHandler instanceof Window) {
+                                windows.add((Window) streamHandler);
+                            }
+                        }
+
+                        // Group By Spec
+                        List<Variable> groupBy = selector.getGroupByList();
+
+                        if (windows.size() > 0 || groupBy.size() >= 0) {
+                            partitions.add(convertSingleStreamWindowAndGroupByToPartition(((SingleInputStream) inputStream).getStreamId(), windows, groupBy));
+                        }
+                    }
+                    //    else if(inputStream instanceof JoinInputStream) {
+                    //        // TODO: Parse multiple stream join
+                    //    } else if(inputStream instanceof StateInputStream) {
+                    //        // TODO: Parse StateInputStream
+                    //    }
+
+                    // Output streams
+                    OutputStream outputStream = ((Query) executionElement).getOutputStream();
+                    actualOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
+                } else {
+                    LOG.warn("Unhandled execution element: {}", executionElement.toString());
+                }
+            }
+            // Set used input streams
+            policyExecutionPlan.setInputStreams(actualInputStreams);
+
+            // Set Partitions
+            policyExecutionPlan.setStreamPartitions(partitions);
+
+            // Validate outputStreams
+            policyExecutionPlan.setOutputStreams(actualOutputStreams);
+        } catch (Exception ex) {
+            LOG.error("Got error to parseExecutionPlan policy execution plan: \n{}", executionPlanQuery, ex);
+            throw ex;
+        }
+        return policyExecutionPlan;
+    }
+
+    private static StreamPartition convertSingleStreamWindowAndGroupByToPartition(String streamId, List<Window> windows, List<Variable> groupBy) {
+        StreamPartition partition = new StreamPartition();
+        partition.setStreamId(streamId);
+        StreamSortSpec sortSpec = null;
+
+        if (windows.size() > 0) {
+            for (Window window : windows) {
+                if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) {
+                    sortSpec = new StreamSortSpec();
+                    sortSpec.setWindowPeriodMillis(((TimeConstant) window.getParameters()[1]).getValue().intValue());
+                    sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 3);
+                }
+            }
+        }
+        partition.setSortSpec(sortSpec);
+        if (groupBy.size() > 0) {
+            partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
+            partition.setType(StreamPartition.Type.GROUPBY);
+        } else {
+            partition.setType(StreamPartition.Type.SHUFFLE);
+        }
+        return partition;
+    }
+
+    public static PolicyValidationResult validate(PolicyDefinition policy, IMetadataDao metadataDao) {
+        Map<String, StreamDefinition> allDefinitions = new HashMap<>();
+        for (StreamDefinition definition : metadataDao.listStreams()) {
+            allDefinitions.put(definition.getStreamId(), definition);
+        }
+        return validate(policy, allDefinitions);
+    }
+
+    public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
+        Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
+        PolicyValidationResult policyValidationResult = new PolicyValidationResult();
+        policyValidationResult.setPolicyDefinition(policy);
+        try {
+            if (policy.getInputStreams() != null) {
+                for (String streamId : policy.getInputStreams()) {
+                    if (allDefinitions.containsKey(streamId)) {
+                        inputDefinitions.put(streamId, allDefinitions.get(streamId));
+                    } else {
+                        throw new StreamNotDefinedException(streamId);
+                    }
+                }
+            }
+
+            PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
+            // Validate output
+            if (policy.getOutputStreams() != null) {
+                for (String outputStream : policy.getOutputStreams()) {
+                    if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
+                        throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
+                    }
+                }
+            }
+            policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
+            policyValidationResult.setSuccess(true);
+            policyValidationResult.setMessage("Validated successfully");
+        } catch (Exception exception) {
+            LOG.error("Got error to validate policy definition: {}", policy, exception);
+            policyValidationResult.setSuccess(false);
+            policyValidationResult.setMessage(exception.getMessage());
+            policyValidationResult.setStackTrace(exception);
+        }
+
+        return policyValidationResult;
+    }
+
+    private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
+        return outputAttributeList.stream().map(outputAttribute -> {
+            StreamColumn streamColumn = new StreamColumn();
+            streamColumn.setName(outputAttribute.getRename());
+            streamColumn.setDescription(outputAttribute.getExpression().toString());
+            return streamColumn;
+        }).collect(Collectors.toList());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
deleted file mode 100644
index 682de4c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.*;
-
-public class PolicyCompilerTest {
-    @Test
-    public void parseFullPolicyQuery() throws Exception {
-        PolicyExecutionPlan executionPlan = PolicyCompiler.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.timeBatch(2 min) "
-            + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT");
-        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
-        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]);
-    }
-
-    @Test
-    public void testValidPolicy() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1#window.timeBatch(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-                put("INPUT_STREAM_3", createStreamDefinition("INPUT_STREAM_3"));
-                put("INPUT_STREAM_4", createStreamDefinition("INPUT_STREAM_4"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
-    }
-
-    @Test
-    public void testValidPolicyWithTooManyInputStreams() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
-    }
-
-    @Test
-    public void testValidPolicyWithTooFewOutputStreams() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue(
-            "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
-                + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
-        );
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
-    }
-
-    @Test
-    public void testInvalidPolicyForSyntaxError() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM", createStreamDefinition("INPUT_STREAM"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    @Test
-    public void testInvalidPolicyForNotDefinedInputStream() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    @Test
-    public void testInvalidPolicyForNotDefinedOutputStream() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    // --------------
-    // Helper Methods
-    // --------------
-
-    private static StreamDefinition createStreamDefinition(String streamId) {
-        StreamDefinition streamDefinition = new StreamDefinition();
-        streamDefinition.setStreamId(streamId);
-        List<StreamColumn> columns = new ArrayList<>();
-        columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
-        columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
-        streamDefinition.setColumns(columns);
-        return streamDefinition;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
new file mode 100644
index 0000000..48ac9eb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class PolicyInterpreterTest {
+    @Test
+    public void parseFullPolicyQuery() throws Exception {
+        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) "
+            + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT");
+        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
+        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]);
+        Assert.assertEquals(1, executionPlan.getStreamPartitions().size());
+        Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
+    }
+
+    @Test
+    public void testValidPolicyWithExternalTimeWindow() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1#window.externalTime(timestamp, 2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+                put("INPUT_STREAM_3", createStreamDefinition("INPUT_STREAM_3"));
+                put("INPUT_STREAM_4", createStreamDefinition("INPUT_STREAM_4"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+        Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+    }
+
+    @Test
+    public void testValidPolicyWithTimeWindow() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1#window.time(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+        Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+    }
+
+    @Test
+    public void testValidPolicyWithTooManyInputStreams() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+    }
+
+    @Test
+    public void testValidPolicyWithTooFewOutputStreams() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue(
+            "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
+                + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
+        );
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
+    }
+
+    @Test
+    public void testInvalidPolicyForSyntaxError() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM", createStreamDefinition("INPUT_STREAM"));
+            }
+        });
+        Assert.assertFalse(validation.isSuccess());
+    }
+
+    @Test
+    public void testInvalidPolicyForNotDefinedInputStream() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+            }
+        });
+        Assert.assertFalse(validation.isSuccess());
+    }
+
+    @Test
+    public void testInvalidPolicyForNotDefinedOutputStream() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+            }
+        });
+        Assert.assertFalse(validation.isSuccess());
+    }
+
+    // --------------
+    // Helper Methods
+    // --------------
+
+    private static StreamDefinition createStreamDefinition(String streamId) {
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setStreamId(streamId);
+        List<StreamColumn> columns = new ArrayList<>();
+        columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        columns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
+        streamDefinition.setColumns(columns);
+        return streamDefinition;
+    }
+}
\ No newline at end of file