You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/19 09:15:17 UTC
incubator-eagle git commit: [EAGLE-639] Generate sortSpec only on
externalWindow
Repository: incubator-eagle
Updated Branches:
refs/heads/master 453c3a5fa -> 6f88c30f8
[EAGLE-639] Generate sortSpec only on externalWindow
Generate sortSpec on externalWindow
https://issues.apache.org/jira/browse/EAGLE-639
Author: Hao Chen <ha...@apache.org>
Closes #532 from haoch/EnhancePolicyIntepreter.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/6f88c30f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/6f88c30f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/6f88c30f
Branch: refs/heads/master
Commit: 6f88c30f855249c401a40efc6941090a152670f3
Parents: 453c3a5
Author: Hao Chen <ha...@apache.org>
Authored: Wed Oct 19 17:15:06 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Oct 19 17:15:06 2016 +0800
----------------------------------------------------------------------
.../engine/sorter/StreamSortHandlerTest.java | 7 +-
.../metadata/resource/MetadataResource.java | 4 +-
.../metadata/resource/PolicyCompiler.java | 235 ------------------
.../metadata/resource/PolicyInterpreter.java | 244 +++++++++++++++++++
.../metadata/resource/PolicyCompilerTest.java | 195 ---------------
.../resource/PolicyInterpreterTest.java | 226 +++++++++++++++++
6 files changed, 476 insertions(+), 435 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
index 98657e7..885017a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
@@ -32,6 +32,7 @@ import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory;
import org.apache.eagle.alert.utils.DateTimeUtil;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -135,7 +136,7 @@ public class StreamSortHandlerTest {
*
* @throws InterruptedException
*/
- @Test
+ @Test @Ignore("Igored heavy benchmark test in unit test")
public void testWithUnsortedEventsInLargeWindowBenchmark() throws InterruptedException {
metricReporter.report();
testWithUnsortedEventsIn1hWindow(1000);
@@ -146,8 +147,8 @@ public class StreamSortHandlerTest {
metricReporter.report();
testWithUnsortedEventsIn1hWindow(1000000);
metricReporter.report();
-// testWithUnsortedEventsIn1hWindow(10000000);
-// metricReporter.report();
+ testWithUnsortedEventsIn1hWindow(10000000);
+ metricReporter.report();
}
public void testWithUnsortedEventsIn1hWindow(int count) throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index eb23362..190da2a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -207,13 +207,13 @@ public class MetadataResource {
@Path("/policies/validate")
@POST
public PolicyValidationResult validatePolicy(PolicyDefinition policy) {
- return PolicyCompiler.validate(policy,dao);
+ return PolicyInterpreter.validate(policy,dao);
}
@Path("/policies/parse")
@POST
public PolicyParseResult parsePolicy(String policyDefinition) {
- return PolicyCompiler.parse(policyDefinition);
+ return PolicyInterpreter.parse(policyDefinition);
}
@Path("/policies/batch")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
deleted file mode 100644
index 6e3938d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyCompiler.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import com.google.common.base.Preconditions;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.query.api.ExecutionPlan;
-import org.wso2.siddhi.query.api.execution.ExecutionElement;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
-import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
-import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
-import org.wso2.siddhi.query.api.execution.query.input.stream.JoinInputStream;
-import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
-import org.wso2.siddhi.query.api.execution.query.input.stream.StateInputStream;
-import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-import org.wso2.siddhi.query.api.execution.query.selection.Selector;
-import org.wso2.siddhi.query.api.expression.Variable;
-import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
-import org.wso2.siddhi.query.compiler.SiddhiCompiler;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class PolicyCompiler {
- private static final Logger LOG = LoggerFactory.getLogger(PolicyCompiler.class);
-
-
- public static PolicyParseResult parse(String executionPlanQuery) {
- PolicyParseResult policyParseResult = new PolicyParseResult();
- try {
- policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
- policyParseResult.setSuccess(true);
- policyParseResult.setMessage("Parsed successfully");
- } catch (Exception exception) {
- LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
- policyParseResult.setSuccess(false);
- policyParseResult.setMessage(exception.getMessage());
- policyParseResult.setStackTrace(exception);
- }
- return policyParseResult;
- }
-
- /**
- * Quick parseExecutionPlan policy.
- */
- public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
- // Validate inputStreams are valid
- Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
- return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
- }
-
- public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
- PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
- try {
- ExecutionPlan executionPlan = SiddhiCompiler.parse(executionPlanQuery);
-
- policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
-
- // Set current execution plan as valid
- policyExecutionPlan.setExecutionPlanSource(executionPlanQuery);
- policyExecutionPlan.setInternalExecutionPlan(executionPlan);
-
- Map<String, List<StreamColumn>> actualInputStreams = new HashMap<>();
- Map<String, List<StreamColumn>> actualOutputStreams = new HashMap<>();
- List<StreamPartition> partitions = new ArrayList<>();
-
- // Go through execution element
- for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
- if (executionElement instanceof Query) {
- // -------------
- // Explain Query
- // -------------
-
- // Input streams
- InputStream inputStream = ((Query) executionElement).getInputStream();
- Selector selector = ((Query) executionElement).getSelector();
-
- for (String streamId : inputStream.getUniqueStreamIds()) {
- if (!actualInputStreams.containsKey(streamId)) {
- org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
- if (streamDefinition != null) {
- actualInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
- } else {
- actualInputStreams.put(streamId, null);
- }
- }
- }
-
- // Window Spec and Partition
- if (inputStream instanceof SingleInputStream) {
- // Window Spec
- List<Window> windows = new ArrayList<>();
- for (StreamHandler streamHandler : ((SingleInputStream) inputStream).getStreamHandlers()) {
- if (streamHandler instanceof Window) {
- windows.add((Window) streamHandler);
- }
- }
-
- // Group By Spec
- List<Variable> groupBy = selector.getGroupByList();
-
- if (windows.size() > 0 || groupBy.size() >= 0) {
- partitions.add(convertSingleStreamWindowAndGroupByToPartition(((SingleInputStream) inputStream).getStreamId(), windows, groupBy));
- }
- }
- // else if(inputStream instanceof JoinInputStream) {
- // // TODO: Parse multiple stream join
- // } else if(inputStream instanceof StateInputStream) {
- // // TODO: Parse StateInputStream
- // }
-
- // Output streams
- OutputStream outputStream = ((Query) executionElement).getOutputStream();
- actualOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
- } else {
- LOG.warn("Unhandled execution element: {}", executionElement.toString());
- }
- }
- // Set used input streams
- policyExecutionPlan.setInputStreams(actualInputStreams);
-
- // Set Partitions
- policyExecutionPlan.setStreamPartitions(partitions);
-
- // Validate outputStreams
- policyExecutionPlan.setOutputStreams(actualOutputStreams);
- } catch (Exception ex) {
- LOG.error("Got error to parseExecutionPlan policy execution plan: \n{}", executionPlanQuery, ex);
- throw ex;
- }
- return policyExecutionPlan;
- }
-
- private static StreamPartition convertSingleStreamWindowAndGroupByToPartition(String streamId, List<Window> windows, List<Variable> groupBy) {
- StreamPartition partition = new StreamPartition();
- partition.setStreamId(streamId);
- StreamSortSpec sortSpec = null;
-
- if (windows.size() > 0) {
- sortSpec = new StreamSortSpec();
- for (Window window : windows) {
- if (window.getFunction().equals("timeBatch")) {
- sortSpec.setWindowPeriodMillis(((TimeConstant) window.getParameters()[0]).getValue().intValue());
- sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 3);
- }
- }
- }
- partition.setSortSpec(sortSpec);
- if (groupBy.size() > 0) {
- partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
- partition.setType(StreamPartition.Type.GROUPBY);
- } else {
- partition.setType(StreamPartition.Type.SHUFFLE);
- }
- return partition;
- }
-
- public static PolicyValidationResult validate(PolicyDefinition policy, IMetadataDao metadataDao) {
- Map<String, StreamDefinition> allDefinitions = new HashMap<>();
- for (StreamDefinition definition : metadataDao.listStreams()) {
- allDefinitions.put(definition.getStreamId(), definition);
- }
- return validate(policy, allDefinitions);
- }
-
- public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
- Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
- PolicyValidationResult policyValidationResult = new PolicyValidationResult();
- policyValidationResult.setPolicyDefinition(policy);
- try {
- if (policy.getInputStreams() != null) {
- for (String streamId : policy.getInputStreams()) {
- if (allDefinitions.containsKey(streamId)) {
- inputDefinitions.put(streamId, allDefinitions.get(streamId));
- } else {
- throw new StreamNotDefinedException(streamId);
- }
- }
- }
-
- PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
- // Validate output
- if (policy.getOutputStreams() != null) {
- for (String outputStream : policy.getOutputStreams()) {
- if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
- throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
- }
- }
- }
- policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
- policyValidationResult.setSuccess(true);
- policyValidationResult.setMessage("Validated successfully");
- } catch (Exception exception) {
- LOG.error("Got error to validate policy definition: {}", policy, exception);
- policyValidationResult.setSuccess(false);
- policyValidationResult.setMessage(exception.getMessage());
- policyValidationResult.setStackTrace(exception);
- }
-
- return policyValidationResult;
- }
-
- private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
- return outputAttributeList.stream().map(outputAttribute -> {
- StreamColumn streamColumn = new StreamColumn();
- streamColumn.setName(outputAttribute.getRename());
- streamColumn.setDescription(outputAttribute.getExpression().toString());
- return streamColumn;
- }).collect(Collectors.toList());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
new file mode 100644
index 0000000..b97583c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import com.google.common.base.Preconditions;
+import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.query.api.ExecutionPlan;
+import org.wso2.siddhi.query.api.execution.ExecutionElement;
+import org.wso2.siddhi.query.api.execution.query.Query;
+import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
+import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
+import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
+import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
+import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
+import org.wso2.siddhi.query.api.execution.query.selection.Selector;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
+import org.wso2.siddhi.query.compiler.SiddhiCompiler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * PolicyInterpreter:
+ * <ul>
+ * <li>Parse: parse siddhi query and generate static execution plan</li>
+ * <li>Validate: validate policy definition with execution plan and metadata</li>
+ * </ul>
+ * @see <a href="https://docs.wso2.com/display/CEP300/WSO2+Complex+Event+Processor+Documentation">WSO2 Complex Event Processor Documentation</a>
+ */
+public class PolicyInterpreter {
+ private static final Logger LOG = LoggerFactory.getLogger(PolicyInterpreter.class);
+
+ /**
+ * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
+ */
+ private static final String WINDOW_EXTERNAL_TIME = "externalTime";
+
+ public static PolicyParseResult parse(String executionPlanQuery) {
+ PolicyParseResult policyParseResult = new PolicyParseResult();
+ try {
+ policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
+ policyParseResult.setSuccess(true);
+ policyParseResult.setMessage("Parsed successfully");
+ } catch (Exception exception) {
+ LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
+ policyParseResult.setSuccess(false);
+ policyParseResult.setMessage(exception.getMessage());
+ policyParseResult.setStackTrace(exception);
+ }
+ return policyParseResult;
+ }
+
+ /**
+ * Quick parseExecutionPlan policy.
+ */
+ public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
+ // Validate inputStreams are valid
+ Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
+ return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
+ }
+
+ public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
+ PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
+ try {
+ ExecutionPlan executionPlan = SiddhiCompiler.parse(executionPlanQuery);
+
+ policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
+
+ // Set current execution plan as valid
+ policyExecutionPlan.setExecutionPlanSource(executionPlanQuery);
+ policyExecutionPlan.setInternalExecutionPlan(executionPlan);
+
+ Map<String, List<StreamColumn>> actualInputStreams = new HashMap<>();
+ Map<String, List<StreamColumn>> actualOutputStreams = new HashMap<>();
+ List<StreamPartition> partitions = new ArrayList<>();
+
+ // Go through execution element
+ for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
+ if (executionElement instanceof Query) {
+ // -------------
+ // Explain Query
+ // -------------
+
+ // Input streams
+ InputStream inputStream = ((Query) executionElement).getInputStream();
+ Selector selector = ((Query) executionElement).getSelector();
+
+ for (String streamId : inputStream.getUniqueStreamIds()) {
+ if (!actualInputStreams.containsKey(streamId)) {
+ org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
+ if (streamDefinition != null) {
+ actualInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
+ } else {
+ actualInputStreams.put(streamId, null);
+ }
+ }
+ }
+
+ // Window Spec and Partition
+ if (inputStream instanceof SingleInputStream) {
+ // Window Spec
+ List<Window> windows = new ArrayList<>();
+ for (StreamHandler streamHandler : ((SingleInputStream) inputStream).getStreamHandlers()) {
+ if (streamHandler instanceof Window) {
+ windows.add((Window) streamHandler);
+ }
+ }
+
+ // Group By Spec
+ List<Variable> groupBy = selector.getGroupByList();
+
+ if (windows.size() > 0 || groupBy.size() >= 0) {
+ partitions.add(convertSingleStreamWindowAndGroupByToPartition(((SingleInputStream) inputStream).getStreamId(), windows, groupBy));
+ }
+ }
+ // else if(inputStream instanceof JoinInputStream) {
+ // // TODO: Parse multiple stream join
+ // } else if(inputStream instanceof StateInputStream) {
+ // // TODO: Parse StateInputStream
+ // }
+
+ // Output streams
+ OutputStream outputStream = ((Query) executionElement).getOutputStream();
+ actualOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
+ } else {
+ LOG.warn("Unhandled execution element: {}", executionElement.toString());
+ }
+ }
+ // Set used input streams
+ policyExecutionPlan.setInputStreams(actualInputStreams);
+
+ // Set Partitions
+ policyExecutionPlan.setStreamPartitions(partitions);
+
+ // Validate outputStreams
+ policyExecutionPlan.setOutputStreams(actualOutputStreams);
+ } catch (Exception ex) {
+ LOG.error("Got error to parseExecutionPlan policy execution plan: \n{}", executionPlanQuery, ex);
+ throw ex;
+ }
+ return policyExecutionPlan;
+ }
+
+ private static StreamPartition convertSingleStreamWindowAndGroupByToPartition(String streamId, List<Window> windows, List<Variable> groupBy) {
+ StreamPartition partition = new StreamPartition();
+ partition.setStreamId(streamId);
+ StreamSortSpec sortSpec = null;
+
+ if (windows.size() > 0) {
+ for (Window window : windows) {
+ if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) {
+ sortSpec = new StreamSortSpec();
+ sortSpec.setWindowPeriodMillis(((TimeConstant) window.getParameters()[1]).getValue().intValue());
+ sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 3);
+ }
+ }
+ }
+ partition.setSortSpec(sortSpec);
+ if (groupBy.size() > 0) {
+ partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
+ partition.setType(StreamPartition.Type.GROUPBY);
+ } else {
+ partition.setType(StreamPartition.Type.SHUFFLE);
+ }
+ return partition;
+ }
+
+ public static PolicyValidationResult validate(PolicyDefinition policy, IMetadataDao metadataDao) {
+ Map<String, StreamDefinition> allDefinitions = new HashMap<>();
+ for (StreamDefinition definition : metadataDao.listStreams()) {
+ allDefinitions.put(definition.getStreamId(), definition);
+ }
+ return validate(policy, allDefinitions);
+ }
+
+ public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
+ Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
+ PolicyValidationResult policyValidationResult = new PolicyValidationResult();
+ policyValidationResult.setPolicyDefinition(policy);
+ try {
+ if (policy.getInputStreams() != null) {
+ for (String streamId : policy.getInputStreams()) {
+ if (allDefinitions.containsKey(streamId)) {
+ inputDefinitions.put(streamId, allDefinitions.get(streamId));
+ } else {
+ throw new StreamNotDefinedException(streamId);
+ }
+ }
+ }
+
+ PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
+ // Validate output
+ if (policy.getOutputStreams() != null) {
+ for (String outputStream : policy.getOutputStreams()) {
+ if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
+ throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
+ }
+ }
+ }
+ policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
+ policyValidationResult.setSuccess(true);
+ policyValidationResult.setMessage("Validated successfully");
+ } catch (Exception exception) {
+ LOG.error("Got error to validate policy definition: {}", policy, exception);
+ policyValidationResult.setSuccess(false);
+ policyValidationResult.setMessage(exception.getMessage());
+ policyValidationResult.setStackTrace(exception);
+ }
+
+ return policyValidationResult;
+ }
+
+ private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
+ return outputAttributeList.stream().map(outputAttribute -> {
+ StreamColumn streamColumn = new StreamColumn();
+ streamColumn.setName(outputAttribute.getRename());
+ streamColumn.setDescription(outputAttribute.getExpression().toString());
+ return streamColumn;
+ }).collect(Collectors.toList());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
deleted file mode 100644
index 682de4c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyCompilerTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.*;
-
-public class PolicyCompilerTest {
- @Test
- public void parseFullPolicyQuery() throws Exception {
- PolicyExecutionPlan executionPlan = PolicyCompiler.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.timeBatch(2 min) "
- + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT");
- Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
- Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]);
- }
-
- @Test
- public void testValidPolicy() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1#window.timeBatch(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- put("INPUT_STREAM_3", createStreamDefinition("INPUT_STREAM_3"));
- put("INPUT_STREAM_4", createStreamDefinition("INPUT_STREAM_4"));
- }
- });
- Assert.assertTrue(validation.isSuccess());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
- }
-
- @Test
- public void testValidPolicyWithTooManyInputStreams() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- }
- });
- Assert.assertTrue(validation.isSuccess());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
- }
-
- @Test
- public void testValidPolicyWithTooFewOutputStreams() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue(
- "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
- + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
- );
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- }
- });
- Assert.assertTrue(validation.isSuccess());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
- Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
- }
-
- @Test
- public void testInvalidPolicyForSyntaxError() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM", createStreamDefinition("INPUT_STREAM"));
- }
- });
- Assert.assertFalse(validation.isSuccess());
- }
-
- @Test
- public void testInvalidPolicyForNotDefinedInputStream() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- }
- });
- Assert.assertFalse(validation.isSuccess());
- }
-
- @Test
- public void testInvalidPolicyForNotDefinedOutputStream() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyCompiler.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- }
- });
- Assert.assertFalse(validation.isSuccess());
- }
-
- // --------------
- // Helper Methods
- // --------------
-
- private static StreamDefinition createStreamDefinition(String streamId) {
- StreamDefinition streamDefinition = new StreamDefinition();
- streamDefinition.setStreamId(streamId);
- List<StreamColumn> columns = new ArrayList<>();
- columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
- columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
- streamDefinition.setColumns(columns);
- return streamDefinition;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f88c30f/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
new file mode 100644
index 0000000..48ac9eb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metadata.resource;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class PolicyInterpreterTest {
+ @Test
+ public void parseFullPolicyQuery() throws Exception {
+ PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) "
+ + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT");
+ Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
+ Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]);
+ Assert.assertEquals(1, executionPlan.getStreamPartitions().size());
+ Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
+ }
+
+ @Test
+ public void testValidPolicyWithExternalTimeWindow() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1#window.externalTime(timestamp, 2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+ put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+ put("INPUT_STREAM_3", createStreamDefinition("INPUT_STREAM_3"));
+ put("INPUT_STREAM_4", createStreamDefinition("INPUT_STREAM_4"));
+ }
+ });
+ Assert.assertTrue(validation.isSuccess());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+ Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+ }
+
+ @Test
+ public void testValidPolicyWithTimeWindow() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1#window.time(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+ }
+ });
+ Assert.assertTrue(validation.isSuccess());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+ Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+ }
+
+ @Test
+ public void testValidPolicyWithTooManyInputStreams() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+ put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+ }
+ });
+ Assert.assertTrue(validation.isSuccess());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+ }
+
+ @Test
+ public void testValidPolicyWithTooFewOutputStreams() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue(
+ "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
+ + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
+ );
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+ put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+ }
+ });
+ Assert.assertTrue(validation.isSuccess());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+ Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
+ }
+
+ @Test
+ public void testInvalidPolicyForSyntaxError() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM", createStreamDefinition("INPUT_STREAM"));
+ }
+ });
+ Assert.assertFalse(validation.isSuccess());
+ }
+
+ @Test
+ public void testInvalidPolicyForNotDefinedInputStream() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
+ }
+ });
+ Assert.assertFalse(validation.isSuccess());
+ }
+
+ @Test
+ public void testInvalidPolicyForNotDefinedOutputStream() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
+ }
+ });
+ Assert.assertFalse(validation.isSuccess());
+ }
+
+ // --------------
+ // Helper Methods
+ // --------------
+
+ private static StreamDefinition createStreamDefinition(String streamId) {
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setStreamId(streamId);
+ List<StreamColumn> columns = new ArrayList<>();
+ columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ columns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
+ streamDefinition.setColumns(columns);
+ return streamDefinition;
+ }
+}
\ No newline at end of file