You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/19 03:28:05 UTC
[50/50] incubator-eagle git commit: [EAGLE-635] Refactor policy
parser and validator for richer plan details and better performance
[EAGLE-635] Refactor policy parser and validator for richer plan details and better performance
## Changes
* Refactor policy parser and validator for richer plan details and better performance
* Decouple PolicyExecutionPlan and PolicyValidation
## API
* Parse API
~~~
POST /metadata/policies/parse
Accept-Type: text
from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.timeBatch(2 min) select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT"
~~~
* Validation API
~~~
POST /metadata/policies/validate
Accept-Type: application/json
{
"name": "hdfsPolicy",
"description": "hdfsPolicy",
"inputStreams": [
"HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX"
],
"outputStreams": [
"HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT"
],
"definition": {
"type": "siddhi",
"value": "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.timeBatch(2 min) select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT "
},
"partitionSpec": [
{
"streamId": "hdfs_audit_log_enriched_stream",
"type": "GROUPBY",
"columns" : [
"cmd"
]
}
],
"parallelismHint": 2
}
~~~
Author: Hao Chen <ha...@apache.org>
Closes #529 from haoch/RefactorPolicyValidator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/6dbdb4f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/6dbdb4f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/6dbdb4f7
Branch: refs/heads/master
Commit: 6dbdb4f72ec5adf89f4f7a13bf766e01e3ff6705
Parents: adc2ba9
Author: Hao Chen <ha...@apache.org>
Authored: Wed Oct 19 10:28:09 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Oct 19 10:28:09 2016 +0800
----------------------------------------------------------------------
.../conf/sandbox-userprofile-scheduler.conf | 2 +-
.../eagle-alert/alert-common/pom.xml | 8 -
.../eagle-alert/alert-engine/pom.xml | 10 +-
.../evaluator/impl/SiddhiDefinitionAdapter.java | 13 +
.../alert-metadata-service/pom.xml | 13 -
.../metadata/resource/MetadataResource.java | 10 +-
.../metadata/resource/PolicyCompiler.java | 235 +++++++++++++++++++
.../metadata/resource/PolicyExecutionPlan.java | 100 ++++++++
.../metadata/resource/PolicyParseResult.java | 65 +++++
.../metadata/resource/PolicyValidation.java | 97 --------
.../resource/PolicyValidationResult.java | 76 ++++++
.../metadata/resource/PolicyValidator.java | 124 ----------
.../metadata/resource/PolicyCompilerTest.java | 195 +++++++++++++++
.../metadata/resource/PolicyValidatorTest.java | 187 ---------------
14 files changed, 699 insertions(+), 436 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-assembly/src/main/conf/sandbox-userprofile-scheduler.conf
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/conf/sandbox-userprofile-scheduler.conf b/eagle-assembly/src/main/conf/sandbox-userprofile-scheduler.conf
index f54162d..4593a7e 100644
--- a/eagle-assembly/src/main/conf/sandbox-userprofile-scheduler.conf
+++ b/eagle-assembly/src/main/conf/sandbox-userprofile-scheduler.conf
@@ -63,4 +63,4 @@ akka {
# Filter of log events that is used by the LoggingAdapter before
# publishing log events to the eventStream.
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/pom.xml
index 57a0157..fd3eccd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/pom.xml
@@ -63,10 +63,6 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
@@ -82,10 +78,6 @@
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
</dependency>
- <!--<dependency>-->
- <!--<groupId>org.codehaus.jackson</groupId>-->
- <!--<artifactId>jackson-jaxrs</artifactId>-->
- <!--</dependency>-->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
index 1ea6088..649125c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
@@ -41,15 +41,17 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>${kafka.artifact.id}</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
</dependency>
- <!--<dependency>-->
- <!--<groupId>org.codehaus.jackson</groupId>-->
- <!--<artifactId>jackson-jaxrs</artifactId>-->
- <!--</dependency>-->
<dependency>
<groupId>com.netflix.archaius</groupId>
<artifactId>archaius-core</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
index 3645dcf..114c40a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
@@ -102,6 +102,19 @@ public class SiddhiDefinitionAdapter {
return builder.toString();
}
+ public static String buildSiddhiExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) {
+ StringBuilder builder = new StringBuilder();
+ for (Map.Entry<String,StreamDefinition> entry: inputStreamDefinitions.entrySet()) {
+ builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(entry.getValue()));
+ builder.append("\n");
+ }
+ builder.append(policyDefinition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated siddhi execution plan: {}", builder.toString());
+ }
+ return builder.toString();
+ }
+
/**
* public enum Type {
* STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
index 0518a15..df2f803 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
@@ -63,19 +63,6 @@
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
</dependency>
-
- <!--<dependency>-->
- <!--<groupId>org.codehaus.jackson</groupId>-->
- <!--<artifactId>jackson-mapper-asl</artifactId>-->
- <!--</dependency>-->
- <!--<dependency>-->
- <!--<groupId>org.codehaus.jackson</groupId>-->
- <!--<artifactId>jackson-jaxrs</artifactId>-->
- <!--</dependency>-->
- <!--<dependency>-->
- <!--<groupId>org.codehaus.jackson</groupId>-->
- <!--<artifactId>jackson-xc</artifactId>-->
- <!--</dependency>-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 32a978a..eb23362 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -206,8 +206,14 @@ public class MetadataResource {
@Path("/policies/validate")
@POST
- public PolicyValidation validatePolicy(PolicyDefinition policy) {
- return PolicyValidator.validate(policy,dao);
+ public PolicyValidationResult validatePolicy(PolicyDefinition policy) {
+ return PolicyCompiler.validate(policy,dao);
+ }
+
+ @Path("/policies/parse")
+ @POST
+ public PolicyParseResult parsePolicy(String policyDefinition) {
+ return PolicyCompiler.parse(policyDefinition);
}
@Path("/policies/batch")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
new file mode 100644
index 0000000..6e3938d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import com.google.common.base.Preconditions;
+import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.query.api.ExecutionPlan;
+import org.wso2.siddhi.query.api.execution.ExecutionElement;
+import org.wso2.siddhi.query.api.execution.query.Query;
+import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
+import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
+import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.JoinInputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.StateInputStream;
+import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
+import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
+import org.wso2.siddhi.query.api.execution.query.selection.Selector;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
+import org.wso2.siddhi.query.compiler.SiddhiCompiler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class PolicyCompiler {
+ private static final Logger LOG = LoggerFactory.getLogger(PolicyCompiler.class);
+
+
+ public static PolicyParseResult parse(String executionPlanQuery) {
+ PolicyParseResult policyParseResult = new PolicyParseResult();
+ try {
+ policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
+ policyParseResult.setSuccess(true);
+ policyParseResult.setMessage("Parsed successfully");
+ } catch (Exception exception) {
+ LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
+ policyParseResult.setSuccess(false);
+ policyParseResult.setMessage(exception.getMessage());
+ policyParseResult.setStackTrace(exception);
+ }
+ return policyParseResult;
+ }
+
+ /**
+ * Quick parseExecutionPlan policy.
+ */
+ public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
+ // Validate inputStreams are valid
+ Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
+ return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
+ }
+
+ public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
+ PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
+ try {
+ ExecutionPlan executionPlan = SiddhiCompiler.parse(executionPlanQuery);
+
+ policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
+
+ // Set current execution plan as valid
+ policyExecutionPlan.setExecutionPlanSource(executionPlanQuery);
+ policyExecutionPlan.setInternalExecutionPlan(executionPlan);
+
+ Map<String, List<StreamColumn>> actualInputStreams = new HashMap<>();
+ Map<String, List<StreamColumn>> actualOutputStreams = new HashMap<>();
+ List<StreamPartition> partitions = new ArrayList<>();
+
+ // Go through execution element
+ for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
+ if (executionElement instanceof Query) {
+ // -------------
+ // Explain Query
+ // -------------
+
+ // Input streams
+ InputStream inputStream = ((Query) executionElement).getInputStream();
+ Selector selector = ((Query) executionElement).getSelector();
+
+ for (String streamId : inputStream.getUniqueStreamIds()) {
+ if (!actualInputStreams.containsKey(streamId)) {
+ org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
+ if (streamDefinition != null) {
+ actualInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
+ } else {
+ actualInputStreams.put(streamId, null);
+ }
+ }
+ }
+
+ // Window Spec and Partition
+ if (inputStream instanceof SingleInputStream) {
+ // Window Spec
+ List<Window> windows = new ArrayList<>();
+ for (StreamHandler streamHandler : ((SingleInputStream) inputStream).getStreamHandlers()) {
+ if (streamHandler instanceof Window) {
+ windows.add((Window) streamHandler);
+ }
+ }
+
+ // Group By Spec
+ List<Variable> groupBy = selector.getGroupByList();
+
+ if (windows.size() > 0 || groupBy.size() >= 0) {
+ partitions.add(convertSingleStreamWindowAndGroupByToPartition(((SingleInputStream) inputStream).getStreamId(), windows, groupBy));
+ }
+ }
+ // else if(inputStream instanceof JoinInputStream) {
+ // // TODO: Parse multiple stream join
+ // } else if(inputStream instanceof StateInputStream) {
+ // // TODO: Parse StateInputStream
+ // }
+
+ // Output streams
+ OutputStream outputStream = ((Query) executionElement).getOutputStream();
+ actualOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
+ } else {
+ LOG.warn("Unhandled execution element: {}", executionElement.toString());
+ }
+ }
+ // Set used input streams
+ policyExecutionPlan.setInputStreams(actualInputStreams);
+
+ // Set Partitions
+ policyExecutionPlan.setStreamPartitions(partitions);
+
+ // Validate outputStreams
+ policyExecutionPlan.setOutputStreams(actualOutputStreams);
+ } catch (Exception ex) {
+ LOG.error("Got error to parseExecutionPlan policy execution plan: \n{}", executionPlanQuery, ex);
+ throw ex;
+ }
+ return policyExecutionPlan;
+ }
+
+ private static StreamPartition convertSingleStreamWindowAndGroupByToPartition(String streamId, List<Window> windows, List<Variable> groupBy) {
+ StreamPartition partition = new StreamPartition();
+ partition.setStreamId(streamId);
+ StreamSortSpec sortSpec = null;
+
+ if (windows.size() > 0) {
+ sortSpec = new StreamSortSpec();
+ for (Window window : windows) {
+ if (window.getFunction().equals("timeBatch")) {
+ sortSpec.setWindowPeriodMillis(((TimeConstant) window.getParameters()[0]).getValue().intValue());
+ sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 3);
+ }
+ }
+ }
+ partition.setSortSpec(sortSpec);
+ if (groupBy.size() > 0) {
+ partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
+ partition.setType(StreamPartition.Type.GROUPBY);
+ } else {
+ partition.setType(StreamPartition.Type.SHUFFLE);
+ }
+ return partition;
+ }
+
+ public static PolicyValidationResult validate(PolicyDefinition policy, IMetadataDao metadataDao) {
+ Map<String, StreamDefinition> allDefinitions = new HashMap<>();
+ for (StreamDefinition definition : metadataDao.listStreams()) {
+ allDefinitions.put(definition.getStreamId(), definition);
+ }
+ return validate(policy, allDefinitions);
+ }
+
+ public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
+ Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
+ PolicyValidationResult policyValidationResult = new PolicyValidationResult();
+ policyValidationResult.setPolicyDefinition(policy);
+ try {
+ if (policy.getInputStreams() != null) {
+ for (String streamId : policy.getInputStreams()) {
+ if (allDefinitions.containsKey(streamId)) {
+ inputDefinitions.put(streamId, allDefinitions.get(streamId));
+ } else {
+ throw new StreamNotDefinedException(streamId);
+ }
+ }
+ }
+
+ PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
+ // Validate output
+ if (policy.getOutputStreams() != null) {
+ for (String outputStream : policy.getOutputStreams()) {
+ if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
+ throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
+ }
+ }
+ }
+ policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
+ policyValidationResult.setSuccess(true);
+ policyValidationResult.setMessage("Validated successfully");
+ } catch (Exception exception) {
+ LOG.error("Got error to validate policy definition: {}", policy, exception);
+ policyValidationResult.setSuccess(false);
+ policyValidationResult.setMessage(exception.getMessage());
+ policyValidationResult.setStackTrace(exception);
+ }
+
+ return policyValidationResult;
+ }
+
+ private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
+ return outputAttributeList.stream().map(outputAttribute -> {
+ StreamColumn streamColumn = new StreamColumn();
+ streamColumn.setName(outputAttribute.getRename());
+ streamColumn.setDescription(outputAttribute.getExpression().toString());
+ return streamColumn;
+ }).collect(Collectors.toList());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
new file mode 100644
index 0000000..f925e3d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.wso2.siddhi.query.api.ExecutionPlan;
+
+import java.util.List;
+import java.util.Map;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyExecutionPlan {
+ /**
+ * Actual input streams.
+ */
+ private Map<String, List<StreamColumn>> inputStreams;
+
+ /**
+ * Actual output streams.
+ */
+ private Map<String, List<StreamColumn>> outputStreams;
+
+ /**
+ * Execution plan source.
+ */
+ private String executionPlanSource;
+
+ /**
+ * Execution plan.
+ */
+ private ExecutionPlan internalExecutionPlan;
+
+ private String executionPlanDesc;
+
+ private List<StreamPartition> streamPartitions;
+
+ public String getExecutionPlanSource() {
+ return executionPlanSource;
+ }
+
+ public void setExecutionPlanSource(String executionPlanSource) {
+ this.executionPlanSource = executionPlanSource;
+ }
+
+ public ExecutionPlan getInternalExecutionPlan() {
+ return internalExecutionPlan;
+ }
+
+ public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) {
+ this.internalExecutionPlan = internalExecutionPlan;
+ }
+
+ public String getExecutionPlanDesc() {
+ return executionPlanDesc;
+ }
+
+ public void setExecutionPlanDesc(String executionPlanDesc) {
+ this.executionPlanDesc = executionPlanDesc;
+ }
+
+ public List<StreamPartition> getStreamPartitions() {
+ return streamPartitions;
+ }
+
+ public void setStreamPartitions(List<StreamPartition> streamPartitions) {
+ this.streamPartitions = streamPartitions;
+ }
+
+ public Map<String, List<StreamColumn>> getInputStreams() {
+ return inputStreams;
+ }
+
+ public void setInputStreams(Map<String, List<StreamColumn>> inputStreams) {
+ this.inputStreams = inputStreams;
+ }
+
+ public Map<String, List<StreamColumn>> getOutputStreams() {
+ return outputStreams;
+ }
+
+ public void setOutputStreams(Map<String, List<StreamColumn>> outputStreams) {
+ this.outputStreams = outputStreams;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
new file mode 100644
index 0000000..2522270
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyParseResult {
+ private boolean success;
+ private String message;
+ private String exception;
+
+ private PolicyExecutionPlan policyExecutionPlan;
+
+ public String getException() {
+ return exception;
+ }
+
+ public void setException(String exception) {
+ this.exception = exception;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public void setStackTrace(Throwable throwable) {
+ this.setException(ExceptionUtils.getStackTrace(throwable));
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public PolicyExecutionPlan getPolicyExecutionPlan() {
+ return policyExecutionPlan;
+ }
+
+ public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
+ this.policyExecutionPlan = policyExecutionPlan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidation.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidation.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidation.java
deleted file mode 100644
index 4b69a35..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidation.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-
-import java.util.Map;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyValidation {
- private boolean success;
- private String message;
- private String exception;
-
- private Map<String, StreamDefinition> validInputStreams;
- private Map<String, StreamDefinition> validOutputStreams;
- private PolicyDefinition policyDefinition;
- private String validExecutionPlan;
-
- public boolean isSuccess() {
- return success;
- }
-
- public void setSuccess(boolean success) {
- this.success = success;
- }
-
- public String getMessage() {
- return message;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
-
-
- public String getException() {
- return exception;
- }
-
- public void setException(String exception) {
- this.exception = exception;
- }
-
- public void setStackTrace(Throwable throwable) {
- this.exception = ExceptionUtils.getStackTrace(throwable);
- }
-
- public Map<String, StreamDefinition> getValidOutputStreams() {
- return validOutputStreams;
- }
-
- public void setValidOutputStreams(Map<String, StreamDefinition> validOutputStreams) {
- this.validOutputStreams = validOutputStreams;
- }
-
- public Map<String, StreamDefinition> getValidInputStreams() {
- return validInputStreams;
- }
-
- public void setValidInputStreams(Map<String, StreamDefinition> validInputStreams) {
- this.validInputStreams = validInputStreams;
- }
-
- public PolicyDefinition getPolicyDefinition() {
- return policyDefinition;
- }
-
- public void setPolicyDefinition(PolicyDefinition policyDefinition) {
- this.policyDefinition = policyDefinition;
- }
-
- public String getValidExecutionPlan() {
- return validExecutionPlan;
- }
-
- public void setValidExecutionPlan(String validExecutionPlan) {
- this.validExecutionPlan = validExecutionPlan;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
new file mode 100644
index 0000000..b1912af
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyValidationResult {
+ private boolean success;
+ private String message;
+ private String exception;
+
+ private PolicyExecutionPlan policyExecutionPlan;
+ private PolicyDefinition policyDefinition;
+
+ public String getException() {
+ return exception;
+ }
+
+ public void setException(String exception) {
+ this.exception = exception;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public void setStackTrace(Throwable throwable) {
+ this.setException(ExceptionUtils.getStackTrace(throwable));
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public PolicyExecutionPlan getPolicyExecutionPlan() {
+ return policyExecutionPlan;
+ }
+
+ public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
+ this.policyExecutionPlan = policyExecutionPlan;
+ }
+
+ public PolicyDefinition getPolicyDefinition() {
+ return policyDefinition;
+ }
+
+ public void setPolicyDefinition(PolicyDefinition policyDefinition) {
+ this.policyDefinition = policyDefinition;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidator.java
deleted file mode 100644
index aef6aa8..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidator.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.compiler.exception.SiddhiParserException;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class PolicyValidator {
- private static final Logger LOG = LoggerFactory.getLogger(PolicyValidator.class);
-
- public static PolicyValidation validate(PolicyDefinition policy, Map<String, StreamDefinition> allStreamDefinitions) {
- PolicyValidation policyValidation = new PolicyValidation();
- policyValidation.setPolicyDefinition(policy);
-
- SiddhiManager siddhiManager = null;
- ExecutionPlanRuntime executionRuntime = null;
- String executionPlan = null;
-
- try {
- // Validate inputStreams are valid
- Preconditions.checkNotNull(policy.getInputStreams(), "No inputStreams to connect from");
- Map<String, StreamDefinition> currentDefinitions = new HashMap<>();
- for (String streamId : policy.getInputStreams()) {
- if (allStreamDefinitions.containsKey(streamId)) {
- currentDefinitions.put(streamId, allStreamDefinitions.get(streamId));
- } else {
- throw new StreamNotDefinedException(streamId);
- }
- }
-
- // Build final execution plan
- executionPlan = SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policy, currentDefinitions);
- siddhiManager = new SiddhiManager();
- executionRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-
- // Set current execution plan as valid
- policyValidation.setValidExecutionPlan(executionPlan);
-
- // Siddhi runtime active stream definitions
- Map<String, AbstractDefinition> definitionMap = executionRuntime.getStreamDefinitionMap();
-
- Map<String, StreamDefinition> validInputStreams = new HashMap<>();
- Map<String, StreamDefinition> validOutputStreams = new HashMap<>();
-
- for (Map.Entry<String, AbstractDefinition> entry : definitionMap.entrySet()) {
- if (currentDefinitions.containsKey(entry.getKey())) {
- validInputStreams.put(entry.getKey(), currentDefinitions.get(entry.getKey()));
- } else {
- validOutputStreams.put(entry.getKey(), SiddhiDefinitionAdapter.convertFromSiddiDefinition(entry.getValue()));
- }
- }
- policyValidation.setValidInputStreams(validInputStreams);
-
- // Validate outputStreams
- policyValidation.setValidOutputStreams(validOutputStreams);
- if (policy.getOutputStreams() != null) {
- for (String outputStream : policy.getOutputStreams()) {
- if (!validOutputStreams.containsKey(outputStream)) {
- throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
- }
- }
- }
-
- // TODO: Validate partitions
-
- policyValidation.setSuccess(true);
- policyValidation.setMessage("Validation success");
- } catch (SiddhiParserException parserException) {
- LOG.error("Got error to parse policy execution plan: \n{}", executionPlan, parserException);
- policyValidation.setSuccess(false);
- policyValidation.setMessage("Parser Error: " + parserException.getMessage());
- policyValidation.setStackTrace(parserException);
- } catch (Exception exception) {
- LOG.error("Got Error to validate policy definition", exception);
- policyValidation.setSuccess(false);
- policyValidation.setMessage("Validation Error: " + exception.getMessage());
- policyValidation.setStackTrace(exception);
- } finally {
- if (executionRuntime != null) {
- executionRuntime.shutdown();
- }
- if (siddhiManager != null) {
- siddhiManager.shutdown();
- }
- }
- return policyValidation;
- }
-
- public static PolicyValidation validate(PolicyDefinition policy, IMetadataDao metadataDao) {
- Map<String, StreamDefinition> allDefinitions = new HashMap<>();
- for (StreamDefinition definition : metadataDao.listStreams()) {
- allDefinitions.put(definition.getStreamId(), definition);
- }
- return validate(policy, allDefinitions);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
new file mode 100644
index 0000000..682de4c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class PolicyCompilerTest {
+ @Test
+ public void parseFullPolicyQuery() throws Exception {
+ PolicyExecutionPlan executionPlan = PolicyCompiler.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.timeBatch(2 min) "
+ + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT");
+ Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
+ Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]);
+ }
+
+ @Test
+ public void testValidPolicy() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1#window.timeBatch(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+ put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+ put("INPUT_STREAM_3", createStreamDefinition("INPUT_STREAM_3"));
+ put("INPUT_STREAM_4", createStreamDefinition("INPUT_STREAM_4"));
+ }
+ });
+ Assert.assertTrue(validation.isSuccess());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+ }
+
+ @Test
+ public void testValidPolicyWithTooManyInputStreams() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+ put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+ }
+ });
+ Assert.assertTrue(validation.isSuccess());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+ }
+
+ @Test
+ public void testValidPolicyWithTooFewOutputStreams() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue(
+ "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
+ + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
+ );
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+ put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+ }
+ });
+ Assert.assertTrue(validation.isSuccess());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+ Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
+ }
+
+ @Test
+ public void testInvalidPolicyForSyntaxError() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM", createStreamDefinition("INPUT_STREAM"));
+ }
+ });
+ Assert.assertFalse(validation.isSuccess());
+ }
+
+ @Test
+ public void testInvalidPolicyForNotDefinedInputStream() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+ }
+ });
+ Assert.assertFalse(validation.isSuccess());
+ }
+
+ @Test
+ public void testInvalidPolicyForNotDefinedOutputStream() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+ }
+ });
+ Assert.assertFalse(validation.isSuccess());
+ }
+
+ // --------------
+ // Helper Methods
+ // --------------
+
+ private static StreamDefinition createStreamDefinition(String streamId) {
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setStreamId(streamId);
+ List<StreamColumn> columns = new ArrayList<>();
+ columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ streamDefinition.setColumns(columns);
+ return streamDefinition;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6dbdb4f7/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyValidatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyValidatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyValidatorTest.java
deleted file mode 100644
index b9a1b23..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyValidatorTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.*;
-
-public class PolicyValidatorTest {
- @Test
- public void testValidPolicy() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- put("INPUT_STREAM_3", createStreamDefinition("INPUT_STREAM_3"));
- put("INPUT_STREAM_4", createStreamDefinition("INPUT_STREAM_4"));
- }
- });
- Assert.assertTrue(validation.isSuccess());
- Assert.assertEquals(1, validation.getValidInputStreams().size());
- Assert.assertEquals(1, validation.getValidOutputStreams().size());
- }
-
- @Test
- public void testValidPolicyWithTooManyInputStreams() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- }
- });
- Assert.assertTrue(validation.isSuccess());
- Assert.assertEquals(2, validation.getValidInputStreams().size());
- Assert.assertEquals(1, validation.getValidOutputStreams().size());
- }
-
- @Test
- public void testValidPolicyWithTooFewOutputStreams() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue(
- "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
- + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
- );
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- }
- });
- Assert.assertTrue(validation.isSuccess());
- Assert.assertEquals(2, validation.getValidInputStreams().size());
- Assert.assertEquals(2, validation.getValidOutputStreams().size());
- }
-
- @Test
- public void testInvalidPolicyForSyntaxError() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM", createStreamDefinition("INPUT_STREAM"));
- }
- });
- Assert.assertFalse(validation.isSuccess());
- }
-
- @Test
- public void testInvalidPolicyForNotDefinedInputStream() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- }
- });
- Assert.assertFalse(validation.isSuccess());
- }
-
- @Test
- public void testInvalidPolicyForNotDefinedOutputStream() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidation validation = PolicyValidator.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- }
- });
- Assert.assertFalse(validation.isSuccess());
- }
-
- // --------------
- // Helper Methods
- // --------------
-
- private static StreamDefinition createStreamDefinition(String streamId) {
- StreamDefinition streamDefinition = new StreamDefinition();
- streamDefinition.setStreamId(streamId);
- List<StreamColumn> columns = new ArrayList<>();
- columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
- columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
- streamDefinition.setColumns(columns);
- return streamDefinition;
- }
-}
\ No newline at end of file