You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/20 08:08:49 UTC
[1/2] incubator-eagle git commit: [EAGLE-647] Support Policy
Execution Interpreter and Planner to compile siddhi query to distributed
execution plan
Repository: incubator-eagle
Updated Branches:
refs/heads/master 8991b61e2 -> 64fce8f80
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
deleted file mode 100644
index b1912af..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyValidationResult {
- private boolean success;
- private String message;
- private String exception;
-
- private PolicyExecutionPlan policyExecutionPlan;
- private PolicyDefinition policyDefinition;
-
- public String getException() {
- return exception;
- }
-
- public void setException(String exception) {
- this.exception = exception;
- }
-
- public String getMessage() {
- return message;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
-
- public void setStackTrace(Throwable throwable) {
- this.setException(ExceptionUtils.getStackTrace(throwable));
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- public void setSuccess(boolean success) {
- this.success = success;
- }
-
- public PolicyExecutionPlan getPolicyExecutionPlan() {
- return policyExecutionPlan;
- }
-
- public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
- this.policyExecutionPlan = policyExecutionPlan;
- }
-
- public PolicyDefinition getPolicyDefinition() {
- return policyDefinition;
- }
-
- public void setPolicyDefinition(PolicyDefinition policyDefinition) {
- this.policyDefinition = policyDefinition;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
deleted file mode 100644
index 48ac9eb..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.*;
-
-public class PolicyInterpreterTest {
- @Test
- public void parseFullPolicyQuery() throws Exception {
- PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) "
- + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT");
- Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
- Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]);
- Assert.assertEquals(1, executionPlan.getStreamPartitions().size());
- Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
- }
-
- @Test
- public void testValidPolicyWithExternalTimeWindow() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1#window.externalTime(timestamp, 2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- put("INPUT_STREAM_3", createStreamDefinition("INPUT_STREAM_3"));
- put("INPUT_STREAM_4", createStreamDefinition("INPUT_STREAM_4"));
- }
- });
- Assert.assertTrue(validation.isSuccess());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
- Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
- }
-
- @Test
- public void testValidPolicyWithTimeWindow() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1#window.time(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- }
- });
- Assert.assertTrue(validation.isSuccess());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
- Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
- }
-
- @Test
- public void testValidPolicyWithTooManyInputStreams() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- }
- });
- Assert.assertTrue(validation.isSuccess());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
- }
-
- @Test
- public void testValidPolicyWithTooFewOutputStreams() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue(
- "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
- + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
- );
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- }
- });
- Assert.assertTrue(validation.isSuccess());
- Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
- Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
- }
-
- @Test
- public void testInvalidPolicyForSyntaxError() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM", createStreamDefinition("INPUT_STREAM"));
- }
- });
- Assert.assertFalse(validation.isSuccess());
- }
-
- @Test
- public void testInvalidPolicyForNotDefinedInputStream() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
- }
- });
- Assert.assertFalse(validation.isSuccess());
- }
-
- @Test
- public void testInvalidPolicyForNotDefinedOutputStream() {
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("test_policy");
- policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
- policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
-
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- definition.setType("siddhi");
- definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
- definition.setInputStreams(policyDefinition.getInputStreams());
- definition.setOutputStreams(policyDefinition.getOutputStreams());
- policyDefinition.setDefinition(definition);
-
- PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
- {
- put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
- }
- });
- Assert.assertFalse(validation.isSuccess());
- }
-
- // --------------
- // Helper Methods
- // --------------
-
- private static StreamDefinition createStreamDefinition(String streamId) {
- StreamDefinition streamDefinition = new StreamDefinition();
- streamDefinition.setStreamId(streamId);
- List<StreamColumn> columns = new ArrayList<>();
- columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
- columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
- columns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
- streamDefinition.setColumns(columns);
- return streamDefinition;
- }
-}
\ No newline at end of file
[2/2] incubator-eagle git commit: [EAGLE-647] Support Policy
Execution Interpreter and Planner to compile siddhi query to distributed
execution plan
Posted by ha...@apache.org.
[EAGLE-647] Support Policy Execution Interpreter and Planner to compile siddhi query to distributed execution plan
Support Policy Execution Interpreter and Planner to compile siddhi query to distributed execution plan
* Support parse siddhi pattern and join query as distributed execution
* Support alias in inner join condition
* Refactor PolicyIntepreter to eagle-alert-engine and decoupel PolicyExecutionPlanner
* Fix factory method for PolicyExecutionPlanner
Author: Hao Chen <ha...@apache.org>
Closes #536 from haoch/SupportPatternAndJoin.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/64fce8f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/64fce8f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/64fce8f8
Branch: refs/heads/master
Commit: 64fce8f80745d6180ff04b6950364a802d73723e
Parents: 8991b61
Author: Hao Chen <ha...@apache.org>
Authored: Thu Oct 20 16:08:33 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Thu Oct 20 16:08:33 2016 +0800
----------------------------------------------------------------------
.../engine/interpreter/PolicyExecutionPlan.java | 100 +++++
.../interpreter/PolicyExecutionPlanner.java | 28 ++
.../interpreter/PolicyExecutionPlannerImpl.java | 339 +++++++++++++++++
.../engine/interpreter/PolicyInterpreter.java | 112 ++++++
.../engine/interpreter/PolicyParseResult.java | 65 ++++
.../interpreter/PolicyValidationResult.java | 76 ++++
.../interpreter/PolicyInterpreterTest.java | 381 +++++++++++++++++++
.../metadata/resource/MetadataResource.java | 9 +-
.../metadata/resource/PolicyExecutionPlan.java | 100 -----
.../metadata/resource/PolicyInterpreter.java | 244 ------------
.../metadata/resource/PolicyParseResult.java | 65 ----
.../resource/PolicyValidationResult.java | 76 ----
.../resource/PolicyInterpreterTest.java | 226 -----------
13 files changed, 1109 insertions(+), 712 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
new file mode 100644
index 0000000..7ecc36f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.wso2.siddhi.query.api.ExecutionPlan;
+
+import java.util.List;
+import java.util.Map;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyExecutionPlan {
+ /**
+ * Actual input streams.
+ */
+ private Map<String, List<StreamColumn>> inputStreams;
+
+ /**
+ * Actual output streams.
+ */
+ private Map<String, List<StreamColumn>> outputStreams;
+
+ /**
+ * Execution plan source.
+ */
+ private String executionPlanSource;
+
+ /**
+ * Execution plan.
+ */
+ private ExecutionPlan internalExecutionPlan;
+
+ private String executionPlanDesc;
+
+ private List<StreamPartition> streamPartitions;
+
+ public String getExecutionPlanSource() {
+ return executionPlanSource;
+ }
+
+ public void setExecutionPlanSource(String executionPlanSource) {
+ this.executionPlanSource = executionPlanSource;
+ }
+
+ public ExecutionPlan getInternalExecutionPlan() {
+ return internalExecutionPlan;
+ }
+
+ public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) {
+ this.internalExecutionPlan = internalExecutionPlan;
+ }
+
+ public String getExecutionPlanDesc() {
+ return executionPlanDesc;
+ }
+
+ public void setExecutionPlanDesc(String executionPlanDesc) {
+ this.executionPlanDesc = executionPlanDesc;
+ }
+
+ public List<StreamPartition> getStreamPartitions() {
+ return streamPartitions;
+ }
+
+ public void setStreamPartitions(List<StreamPartition> streamPartitions) {
+ this.streamPartitions = streamPartitions;
+ }
+
+ public Map<String, List<StreamColumn>> getInputStreams() {
+ return inputStreams;
+ }
+
+ public void setInputStreams(Map<String, List<StreamColumn>> inputStreams) {
+ this.inputStreams = inputStreams;
+ }
+
+ public Map<String, List<StreamColumn>> getOutputStreams() {
+ return outputStreams;
+ }
+
+ public void setOutputStreams(Map<String, List<StreamColumn>> outputStreams) {
+ this.outputStreams = outputStreams;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
new file mode 100644
index 0000000..9e8f9f1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+interface PolicyExecutionPlanner {
+ /**
+ * @return PolicyExecutionPlan.
+ */
+ PolicyExecutionPlan getExecutionPlan();
+
+ static PolicyExecutionPlan parseExecutionPlan(String executionPlan) throws Exception {
+ return new PolicyExecutionPlannerImpl(executionPlan).getExecutionPlan();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
new file mode 100644
index 0000000..82bb64f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.ListUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.exception.DefinitionNotExistException;
+import org.wso2.siddhi.query.api.ExecutionPlan;
+import org.wso2.siddhi.query.api.exception.DuplicateDefinitionException;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+import org.wso2.siddhi.query.api.execution.ExecutionElement;
+import org.wso2.siddhi.query.api.execution.query.Query;
+import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
+import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
+import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.JoinInputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.StateInputStream;
+import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
+import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
+import org.wso2.siddhi.query.api.execution.query.selection.Selector;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.condition.Compare;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
+import org.wso2.siddhi.query.compiler.SiddhiCompiler;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PolicyExecutionPlannerImpl.class);
+
+ /**
+ * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
+ */
+ private static final String WINDOW_EXTERNAL_TIME = "externalTime";
+
+ private final String executionPlan;
+ private final Map<String,List<StreamColumn>> effectiveInputStreams;
+ private final Map<String,List<StreamColumn>> effectiveOutputStreams;
+ private final Map<String,StreamPartition> effectivePartitions;
+ private final PolicyExecutionPlan policyExecutionPlan;
+
+ public PolicyExecutionPlannerImpl(String executionPlan) throws Exception {
+ this.executionPlan = executionPlan;
+ this.effectiveInputStreams = new HashMap<>();
+ this.effectiveOutputStreams = new HashMap<>();
+ this.effectivePartitions = new HashMap<>();
+ this.policyExecutionPlan = doParse();
+ }
+
+ @Override
+ public PolicyExecutionPlan getExecutionPlan() {
+ return policyExecutionPlan;
+ }
+
+ private PolicyExecutionPlan doParse() throws Exception {
+ PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
+ try {
+ ExecutionPlan executionPlan = SiddhiCompiler.parse(this.executionPlan);
+
+ policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
+
+ // Set current execution plan as valid
+ policyExecutionPlan.setExecutionPlanSource(this.executionPlan);
+ policyExecutionPlan.setInternalExecutionPlan(executionPlan);
+
+
+ // Go through execution element
+ for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
+ // -------------
+ // Explain Query
+ // -------------
+ if (executionElement instanceof Query) {
+ // -----------------------
+ // Query Level Variables
+ // -----------------------
+ InputStream inputStream = ((Query) executionElement).getInputStream();
+ Selector selector = ((Query) executionElement).getSelector();
+ Map<String, SingleInputStream> queryLevelAliasToStreamMapping = new HashMap<>();
+
+ // Inputs stream definitions
+ for (String streamId : inputStream.getUniqueStreamIds()) {
+ if (!effectiveInputStreams.containsKey(streamId)) {
+ org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
+ if (streamDefinition != null) {
+ effectiveInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
+ } else {
+ effectiveInputStreams.put(streamId, null);
+ }
+ }
+ }
+
+ // Window Spec and Partition
+ if (inputStream instanceof SingleInputStream) {
+ retrieveAliasForQuery((SingleInputStream) inputStream, queryLevelAliasToStreamMapping);
+ retrievePartition(findStreamPartition((SingleInputStream) inputStream, selector));
+ } else {
+ if (inputStream instanceof JoinInputStream) {
+ // Only Support JOIN/INNER_JOIN Now
+ if (((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.INNER_JOIN) || ((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.JOIN)) {
+ SingleInputStream leftInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getLeftInputStream();
+ SingleInputStream rightInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getRightInputStream();
+
+ retrievePartition(findStreamPartition(leftInputStream, selector));
+ retrievePartition(findStreamPartition(rightInputStream, selector));
+ retrieveAliasForQuery(leftInputStream, queryLevelAliasToStreamMapping);
+ retrieveAliasForQuery(rightInputStream, queryLevelAliasToStreamMapping);
+
+ } else {
+ throw new ExecutionPlanValidationException("Not support " + ((JoinInputStream) inputStream).getType() + " yet, currently support: INNER JOIN");
+ }
+
+ Expression joinCondition = ((JoinInputStream) inputStream).getOnCompare();
+
+ if (joinCondition != null) {
+ if (joinCondition instanceof Compare) {
+ if (((Compare) joinCondition).getOperator().equals(Compare.Operator.EQUAL)) {
+ Variable leftExpression = (Variable) ((Compare) joinCondition).getLeftExpression();
+ Preconditions.checkNotNull(leftExpression.getStreamId());
+ Preconditions.checkNotNull(leftExpression.getAttributeName());
+
+ StreamPartition leftPartition = new StreamPartition();
+ leftPartition.setType(StreamPartition.Type.GROUPBY);
+ leftPartition.setColumns(Collections.singletonList(leftExpression.getAttributeName()));
+ leftPartition.setStreamId(retrieveStreamId(leftExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));
+ retrievePartition(leftPartition);
+
+ Variable rightExpression = (Variable) ((Compare) joinCondition).getRightExpression();
+ Preconditions.checkNotNull(rightExpression.getStreamId());
+ Preconditions.checkNotNull(rightExpression.getAttributeName());
+ StreamPartition rightPartition = new StreamPartition();
+ rightPartition.setType(StreamPartition.Type.GROUPBY);
+ rightPartition.setColumns(Collections.singletonList(rightExpression.getAttributeName()));
+ rightPartition.setStreamId(retrieveStreamId(rightExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));
+ retrievePartition(leftPartition);
+ } else {
+ throw new ExecutionPlanValidationException("Only support \"EQUAL\" condition in INNER JOIN" + joinCondition);
+ }
+ } else {
+ throw new ExecutionPlanValidationException("Only support \"Compare\" on INNER JOIN condition in INNER JOIN: " + joinCondition);
+ }
+ }
+ } else if (inputStream instanceof StateInputStream) {
+ // Group By Spec
+ List<Variable> groupBy = selector.getGroupByList();
+ if (groupBy.size() >= 0) {
+ Map<String, List<Variable>> streamGroupBy = new HashMap<>();
+ for (String streamId : inputStream.getUniqueStreamIds()) {
+ streamGroupBy.put(streamId, new ArrayList<>());
+ }
+ for (Variable variable : groupBy) {
+ // Not stream not set, then should be all streams' same field
+ if (variable.getStreamId() == null) {
+ for (String streamId : inputStream.getUniqueStreamIds()) {
+ streamGroupBy.get(streamId).add(variable);
+ }
+ } else {
+ String streamId = retrieveStreamId(variable, effectiveInputStreams,queryLevelAliasToStreamMapping);
+ if (streamGroupBy.containsKey(streamId)) {
+ streamGroupBy.get(streamId).add(variable);
+ } else {
+ throw new DefinitionNotExistException(streamId);
+ }
+ }
+ }
+ for (Map.Entry<String, List<Variable>> entry : streamGroupBy.entrySet()) {
+ if (entry.getValue().size() > 0) {
+ retrievePartition(generatePartition(entry.getKey(), null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()]))));
+ }
+ }
+ }
+ }
+ }
+
+ // Output streams
+ OutputStream outputStream = ((Query) executionElement).getOutputStream();
+ effectiveOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
+ } else {
+ LOG.warn("Unhandled execution element: {}", executionElement.toString());
+ }
+ }
+ // Set effective input streams
+ policyExecutionPlan.setInputStreams(effectiveInputStreams);
+
+ // Set effective output streams
+ policyExecutionPlan.setOutputStreams(effectiveOutputStreams);
+
+ // Set Partitions
+ for (String streamId : effectiveInputStreams.keySet()) {
+ // Use shuffle partition by default
+ if (!effectivePartitions.containsKey(streamId)) {
+ StreamPartition shufflePartition = new StreamPartition();
+ shufflePartition.setStreamId(streamId);
+ shufflePartition.setType(StreamPartition.Type.SHUFFLE);
+ effectivePartitions.put(streamId, shufflePartition);
+ }
+ }
+ policyExecutionPlan.setStreamPartitions(new ArrayList<>(effectivePartitions.values()));
+ } catch (Exception ex) {
+ LOG.error("Got error to parse policy execution plan: \n{}", this.executionPlan, ex);
+ throw ex;
+ }
+ return policyExecutionPlan;
+ }
+
+ private String retrieveStreamId(Variable variable, Map<String, List<StreamColumn>> streamMap, Map<String, SingleInputStream> aliasMap) {
+ Preconditions.checkNotNull(variable.getStreamId(), "streamId");
+ if (streamMap.containsKey(variable.getStreamId()) && aliasMap.containsKey(variable.getStreamId())) {
+ throw new DuplicateDefinitionException("Duplicated streamId and alias: " + variable.getStreamId());
+ } else if (streamMap.containsKey(variable.getStreamId())) {
+ return variable.getStreamId();
+ } else if (aliasMap.containsKey(variable.getStreamId())) {
+ return aliasMap.get(variable.getStreamId()).getStreamId();
+ } else {
+ throw new DefinitionNotExistException(variable.getStreamId());
+ }
+ }
+
+ private StreamPartition findStreamPartition(SingleInputStream inputStream, Selector selector) {
+ // Window Spec
+ List<Window> windows = new ArrayList<>();
+ for (StreamHandler streamHandler : inputStream.getStreamHandlers()) {
+ if (streamHandler instanceof Window) {
+ windows.add((Window) streamHandler);
+ }
+ }
+
+ // Group By Spec
+ List<Variable> groupBy = selector.getGroupByList();
+ if (windows.size() > 0 || groupBy.size() >= 0) {
+ return generatePartition(inputStream.getStreamId(), windows, groupBy);
+ } else {
+ return null;
+ }
+ }
+
+ private void retrievePartition(StreamPartition partition) {
+ if (partition == null) {
+ return;
+ }
+
+ if (!effectivePartitions.containsKey(partition.getStreamId())) {
+ effectivePartitions.put(partition.getStreamId(), partition);
+ } else if (!effectivePartitions.get(partition.getStreamId()).equals(partition)) {
+ StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());
+ // If same Type & Columns but different sort spec, then use larger
+ if (existingPartition.getType().equals(partition.getType())
+ && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns())
+ && partition.getSortSpec().getWindowPeriodMillis() > existingPartition.getSortSpec().getWindowPeriodMillis()
+ || existingPartition.getType().equals(StreamPartition.Type.SHUFFLE)) {
+ effectivePartitions.put(partition.getStreamId(), partition);
+ } else {
+ // Throw exception as it unable to conflict effectivePartitions on same stream will not be able to run in distributed mode
+ throw new ExecutionPlanValidationException("You have incompatible partitions on stream " + partition.getStreamId()
+ + ": [1] " + effectivePartitions.get(partition.getStreamId()).toString() + " [2] " + partition.toString() + "");
+ }
+ }
+ }
+
+ private void retrieveAliasForQuery(SingleInputStream inputStream, Map<String, SingleInputStream> aliasStreamMapping) {
+ if (inputStream.getStreamReferenceId() != null) {
+ if (aliasStreamMapping.containsKey(inputStream.getStreamReferenceId())) {
+ throw new ExecutionPlanValidationException("Duplicated stream alias " + inputStream.getStreamId() + " -> " + inputStream);
+ } else {
+ aliasStreamMapping.put(inputStream.getStreamReferenceId(), inputStream);
+ }
+ }
+ }
+
+ private StreamPartition generatePartition(String streamId, List<Window> windows, List<Variable> groupBy) {
+ StreamPartition partition = new StreamPartition();
+ partition.setStreamId(streamId);
+ StreamSortSpec sortSpec = null;
+ if (windows != null && windows.size() > 0) {
+ for (Window window : windows) {
+ if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) {
+ sortSpec = new StreamSortSpec();
+ sortSpec.setWindowPeriodMillis(getExternalTimeWindowSize(window));
+ sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 5);
+ }
+ }
+ }
+ partition.setSortSpec(sortSpec);
+ if (groupBy != null && groupBy.size() > 0) {
+ partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
+ partition.setType(StreamPartition.Type.GROUPBY);
+ } else {
+ partition.setType(StreamPartition.Type.SHUFFLE);
+ }
+ return partition;
+ }
+
+ private static int getExternalTimeWindowSize(Window window) {
+ Expression windowSize = window.getParameters()[1];
+ if (windowSize instanceof TimeConstant) {
+ return ((TimeConstant) windowSize).getValue().intValue();
+ } else if (windowSize instanceof IntConstant) {
+ return ((IntConstant) windowSize).getValue();
+ } else if (windowSize instanceof LongConstant) {
+ return ((LongConstant) windowSize).getValue().intValue();
+ } else {
+ throw new UnsupportedOperationException("Illegal type of window size expression:" + windowSize.toString());
+ }
+ }
+
+ private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
+ return outputAttributeList.stream().map(outputAttribute -> {
+ StreamColumn streamColumn = new StreamColumn();
+ streamColumn.setName(outputAttribute.getRename());
+ streamColumn.setDescription(outputAttribute.getExpression().toString());
+ return streamColumn;
+ }).collect(Collectors.toList());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
new file mode 100644
index 0000000..8aec294
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+import com.google.common.base.Preconditions;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * PolicyInterpreter Helper Methods:
+ * <ul>
+ * <li>Parse: parse siddhi query and generate static execution plan</li>
+ * <li>Validate: validate policy definition with execution plan and metadata</li>
+ * </ul>
+ *
+ * @see PolicyExecutionPlanner
+ * @see <a href="https://docs.wso2.com/display/CEP300/WSO2+Complex+Event+Processor+Documentation">WSO2 Complex Event Processor Documentation</a>
+ */
+public class PolicyInterpreter {
+ private static final Logger LOG = LoggerFactory.getLogger(PolicyInterpreter.class);
+
+ /**
+ * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
+ */
+ private static final String WINDOW_EXTERNAL_TIME = "externalTime";
+
+ public static PolicyParseResult parse(String executionPlanQuery) {
+ PolicyParseResult policyParseResult = new PolicyParseResult();
+ try {
+ policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
+ policyParseResult.setSuccess(true);
+ policyParseResult.setMessage("Parsed successfully");
+ } catch (Exception exception) {
+ LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
+ policyParseResult.setSuccess(false);
+ policyParseResult.setMessage(exception.getMessage());
+ policyParseResult.setStackTrace(exception);
+ }
+ return policyParseResult;
+ }
+
+ /**
+ * Quick parseExecutionPlan policy.
+ */
+ public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
+ // Validate inputStreams are valid
+ Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
+ return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
+ }
+
+ public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
+ return PolicyExecutionPlanner.parseExecutionPlan(executionPlanQuery);
+ }
+
+ public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
+ Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
+ PolicyValidationResult policyValidationResult = new PolicyValidationResult();
+ policyValidationResult.setPolicyDefinition(policy);
+ try {
+ if (policy.getInputStreams() != null) {
+ for (String streamId : policy.getInputStreams()) {
+ if (allDefinitions.containsKey(streamId)) {
+ inputDefinitions.put(streamId, allDefinitions.get(streamId));
+ } else {
+ throw new StreamNotDefinedException(streamId);
+ }
+ }
+ }
+
+ PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
+ // Validate output
+ if (policy.getOutputStreams() != null) {
+ for (String outputStream : policy.getOutputStreams()) {
+ if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
+ throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
+ }
+ }
+ }
+ policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
+ policyValidationResult.setSuccess(true);
+ policyValidationResult.setMessage("Validated successfully");
+ } catch (Exception exception) {
+ LOG.error("Got error to validate policy definition: {}", policy, exception);
+ policyValidationResult.setSuccess(false);
+ policyValidationResult.setMessage(exception.getMessage());
+ policyValidationResult.setStackTrace(exception);
+ }
+
+ return policyValidationResult;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
new file mode 100644
index 0000000..a0f3ad2
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyParseResult {
+ private boolean success;
+ private String message;
+ private String exception;
+
+ private PolicyExecutionPlan policyExecutionPlan;
+
+ public String getException() {
+ return exception;
+ }
+
+ public void setException(String exception) {
+ this.exception = exception;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public void setStackTrace(Throwable throwable) {
+ this.setException(ExceptionUtils.getStackTrace(throwable));
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public PolicyExecutionPlan getPolicyExecutionPlan() {
+ return policyExecutionPlan;
+ }
+
+ public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
+ this.policyExecutionPlan = policyExecutionPlan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
new file mode 100644
index 0000000..17f6091
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyValidationResult {
+ private boolean success;
+ private String message;
+ private String exception;
+
+ private PolicyExecutionPlan policyExecutionPlan;
+ private PolicyDefinition policyDefinition;
+
+ public String getException() {
+ return exception;
+ }
+
+ public void setException(String exception) {
+ this.exception = exception;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public void setStackTrace(Throwable throwable) {
+ this.setException(ExceptionUtils.getStackTrace(throwable));
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public PolicyExecutionPlan getPolicyExecutionPlan() {
+ return policyExecutionPlan;
+ }
+
+ public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
+ this.policyExecutionPlan = policyExecutionPlan;
+ }
+
+ public PolicyDefinition getPolicyDefinition() {
+ return policyDefinition;
+ }
+
+ public void setPolicyDefinition(PolicyDefinition policyDefinition) {
+ this.policyDefinition = policyDefinition;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
new file mode 100644
index 0000000..cf1a7e9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.interpreter.PolicyExecutionPlan;
+import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter;
+import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
+import org.junit.Assert;
+import org.junit.Test;
+import org.wso2.siddhi.core.exception.DefinitionNotExistException;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+import java.util.*;
+
+public class PolicyInterpreterTest {
+ // -------------------------
+ // Single Stream Test Cases
+ // -------------------------
+ @Test
+ public void testParseSingleStreamPolicyQuery() throws Exception {
+ PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) "
+ + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT");
+ Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
+ Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]);
+ Assert.assertEquals(1, executionPlan.getStreamPartitions().size());
+ Assert.assertEquals(2*60*1000,executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+ }
+
+ @Test
+ public void testParseSingleStreamPolicyWithPattern() throws Exception {
+ PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+ "from e1=Stream1[price >= 20] -> e2=Stream2[price >= e1.price] \n"
+ + "select e1.symbol as symbol, e2.price as price, e1.price+e2.price as total_price \n"
+ + "group by symbol, company insert into OutStream");
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("Stream1"));
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("Stream2"));
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("OutStream"));
+ Assert.assertEquals(StreamPartition.Type.GROUPBY,executionPlan.getStreamPartitions().get(0).getType());
+ Assert.assertArrayEquals(new String[]{"symbol","company"},executionPlan.getStreamPartitions().get(0).getColumns().toArray());
+ Assert.assertEquals(StreamPartition.Type.GROUPBY,executionPlan.getStreamPartitions().get(1).getType());
+ Assert.assertArrayEquals(new String[]{"symbol","company"},executionPlan.getStreamPartitions().get(1).getColumns().toArray());
+ }
+
+ @Test
+ public void testParseSingleStreamPolicyQueryWithMultiplePartitionUsingLargerWindow() throws Exception {
+ PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+ "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 1 min) "
+ + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
+ + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 1 hour) "
+ + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
+ );
+ Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
+ Assert.assertEquals(1, executionPlan.getStreamPartitions().size());
+ Assert.assertEquals(60*60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+ }
+
+ @Test(expected = ExecutionPlanValidationException.class)
+ public void testParseSingleStreamPolicyQueryWithConflictPartition() throws Exception {
+ PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+ "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 5 min) "
+ + "select cmd, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
+ + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) "
+ + "select user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
+ );
+ Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
+ Assert.assertEquals(2, executionPlan.getStreamPartitions().size());
+ Assert.assertEquals(5*60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+ }
+
+ @Test
+ public void testValidPolicyWithExternalTimeWindow() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1#window.externalTime(timestamp, 2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
+ put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
+ put("INPUT_STREAM_3", mockStreamDefinition("INPUT_STREAM_3"));
+ put("INPUT_STREAM_4", mockStreamDefinition("INPUT_STREAM_4"));
+ }
+ });
+ Assert.assertTrue(validation.isSuccess());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+ Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+ }
+
+ @Test
+ public void testValidPolicyWithTimeWindow() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1#window.time(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
+ }
+ });
+ Assert.assertTrue(validation.isSuccess());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+ Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+ }
+
+ @Test
+ public void testValidPolicyWithTooManyInputStreams() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
+ put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
+ }
+ });
+ Assert.assertTrue(validation.isSuccess());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+ }
+
+ @Test
+ public void testValidPolicyWithTooFewOutputStreams() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue(
+ "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
+ + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
+ );
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
+ put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
+ }
+ });
+ Assert.assertTrue(validation.isSuccess());
+ Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+ Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
+ }
+
+ @Test
+ public void testInvalidPolicyForSyntaxError() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM", mockStreamDefinition("INPUT_STREAM"));
+ }
+ });
+ Assert.assertFalse(validation.isSuccess());
+ }
+
+ @Test
+ public void testInvalidPolicyForNotDefinedInputStream() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
+ }
+ });
+ Assert.assertFalse(validation.isSuccess());
+ }
+
+ @Test
+ public void testInvalidPolicyForNotDefinedOutputStream() {
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("test_policy");
+ policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+ policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
+
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ definition.setType("siddhi");
+ definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+ definition.setInputStreams(policyDefinition.getInputStreams());
+ definition.setOutputStreams(policyDefinition.getOutputStreams());
+ policyDefinition.setDefinition(definition);
+
+ PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+ {
+ put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
+ }
+ });
+ Assert.assertFalse(validation.isSuccess());
+ }
+
+ // ---------------------
+ // Two Stream Test Cases
+ // ---------------------
+
+ @Test
+ public void testParseTwoStreamPolicyQueryWithMultiplePartition() throws Exception {
+ PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+ "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1#window.externalTime(timestamp, 1 min) "
+ + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
+ + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2#window.externalTime(timestamp, 1 hour) "
+ + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
+ );
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1"));
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2"));
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
+ Assert.assertEquals(2, executionPlan.getStreamPartitions().size());
+ Assert.assertEquals(60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+ Assert.assertEquals(60*60*1000, executionPlan.getStreamPartitions().get(1).getSortSpec().getWindowPeriodMillis());
+ }
+
+ @Test
+ public void testParseTwoStreamPolicyQueryWithSinglePartition() throws Exception {
+ PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+ "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1#window.externalTime(timestamp, 1 min) "
+ + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
+ + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2 select * insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
+ );
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1"));
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2"));
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
+ Assert.assertEquals(2, executionPlan.getStreamPartitions().size());
+ Assert.assertEquals(60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+ Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(1).getType());
+ }
+
+
+ @Test
+ public void testParseTwoStreamPolicyQueryInnerJoin() throws Exception {
+ PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+ "from TickEvent[symbol=='EBAY']#window.length(2000) " +
+ "join NewsEvent#window.externalTime(timestamp, 1000 sec) \n" +
+ "select * insert into JoinStream"
+ );
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent"));
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent"));
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream"));
+ Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType());
+ Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
+ Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+ Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(1).getType());
+ Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec());
+ }
+
+ @Test
+ public void testParseTwoStreamPolicyQueryInnerJoinWithCondition() throws Exception {
+ PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+ "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" +
+ "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" +
+ "on TickEvent.symbol == NewsEvent.company \n" +
+ "insert into JoinStream "
+ );
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent"));
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent"));
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream"));
+ Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType());
+ Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
+ Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+ Assert.assertEquals(StreamPartition.Type.GROUPBY, executionPlan.getStreamPartitions().get(1).getType());
+ Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec());
+ }
+
+ @Test
+ public void testParseTwoStreamPolicyQueryInnerJoinWithConditionHavingAlias() throws Exception {
+ PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+ "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" +
+ "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" +
+ "on t.symbol == n.company \n" +
+ "insert into JoinStream "
+ );
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent"));
+ Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent"));
+ Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream"));
+ Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType());
+ Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
+ Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+ Assert.assertEquals(StreamPartition.Type.GROUPBY, executionPlan.getStreamPartitions().get(1).getType());
+ Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec());
+ }
+
+ @Test(expected = DefinitionNotExistException.class)
+ public void testParseTwoStreamPolicyQueryInnerJoinWithConditionHavingNotFoundAlias() throws Exception {
+ PolicyInterpreter.parseExecutionPlan(
+ "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" +
+ "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" +
+ "on t.symbol == NOT_EXIST_ALIAS.company \n" +
+ "insert into JoinStream "
+ );
+ }
+
+ // --------------
+ // Helper Methods
+ // --------------
+
+ private static StreamDefinition mockStreamDefinition(String streamId) {
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setStreamId(streamId);
+ List<StreamColumn> columns = new ArrayList<>();
+ columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ columns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
+ streamDefinition.setColumns(columns);
+ return streamDefinition;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 190da2a..3368517 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -23,6 +23,9 @@ import org.apache.eagle.alert.coordination.model.ScheduleState;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
import org.apache.eagle.alert.coordination.model.internal.Topology;
import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter;
+import org.apache.eagle.alert.engine.interpreter.PolicyParseResult;
+import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
import org.apache.eagle.alert.metadata.resource.Models;
@@ -207,7 +210,11 @@ public class MetadataResource {
@Path("/policies/validate")
@POST
public PolicyValidationResult validatePolicy(PolicyDefinition policy) {
- return PolicyInterpreter.validate(policy,dao);
+ Map<String, StreamDefinition> allDefinitions = new HashMap<>();
+ for (StreamDefinition definition : dao.listStreams()) {
+ allDefinitions.put(definition.getStreamId(), definition);
+ }
+ return PolicyInterpreter.validate(policy, allDefinitions);
}
@Path("/policies/parse")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
deleted file mode 100644
index f925e3d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.wso2.siddhi.query.api.ExecutionPlan;
-
-import java.util.List;
-import java.util.Map;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyExecutionPlan {
- /**
- * Actual input streams.
- */
- private Map<String, List<StreamColumn>> inputStreams;
-
- /**
- * Actual output streams.
- */
- private Map<String, List<StreamColumn>> outputStreams;
-
- /**
- * Execution plan source.
- */
- private String executionPlanSource;
-
- /**
- * Execution plan.
- */
- private ExecutionPlan internalExecutionPlan;
-
- private String executionPlanDesc;
-
- private List<StreamPartition> streamPartitions;
-
- public String getExecutionPlanSource() {
- return executionPlanSource;
- }
-
- public void setExecutionPlanSource(String executionPlanSource) {
- this.executionPlanSource = executionPlanSource;
- }
-
- public ExecutionPlan getInternalExecutionPlan() {
- return internalExecutionPlan;
- }
-
- public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) {
- this.internalExecutionPlan = internalExecutionPlan;
- }
-
- public String getExecutionPlanDesc() {
- return executionPlanDesc;
- }
-
- public void setExecutionPlanDesc(String executionPlanDesc) {
- this.executionPlanDesc = executionPlanDesc;
- }
-
- public List<StreamPartition> getStreamPartitions() {
- return streamPartitions;
- }
-
- public void setStreamPartitions(List<StreamPartition> streamPartitions) {
- this.streamPartitions = streamPartitions;
- }
-
- public Map<String, List<StreamColumn>> getInputStreams() {
- return inputStreams;
- }
-
- public void setInputStreams(Map<String, List<StreamColumn>> inputStreams) {
- this.inputStreams = inputStreams;
- }
-
- public Map<String, List<StreamColumn>> getOutputStreams() {
- return outputStreams;
- }
-
- public void setOutputStreams(Map<String, List<StreamColumn>> outputStreams) {
- this.outputStreams = outputStreams;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
deleted file mode 100644
index b97583c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import com.google.common.base.Preconditions;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.query.api.ExecutionPlan;
-import org.wso2.siddhi.query.api.execution.ExecutionElement;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
-import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
-import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
-import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
-import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-import org.wso2.siddhi.query.api.execution.query.selection.Selector;
-import org.wso2.siddhi.query.api.expression.Variable;
-import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
-import org.wso2.siddhi.query.compiler.SiddhiCompiler;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * PolicyInterpreter:
- * <ul>
- * <li>Parse: parse siddhi query and generate static execution plan</li>
- * <li>Validate: validate policy definition with execution plan and metadata</li>
- * </ul>
- * @see <a href="https://docs.wso2.com/display/CEP300/WSO2+Complex+Event+Processor+Documentation">WSO2 Complex Event Processor Documentation</a>
- */
-public class PolicyInterpreter {
- private static final Logger LOG = LoggerFactory.getLogger(PolicyInterpreter.class);
-
- /**
- * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
- */
- private static final String WINDOW_EXTERNAL_TIME = "externalTime";
-
- public static PolicyParseResult parse(String executionPlanQuery) {
- PolicyParseResult policyParseResult = new PolicyParseResult();
- try {
- policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
- policyParseResult.setSuccess(true);
- policyParseResult.setMessage("Parsed successfully");
- } catch (Exception exception) {
- LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
- policyParseResult.setSuccess(false);
- policyParseResult.setMessage(exception.getMessage());
- policyParseResult.setStackTrace(exception);
- }
- return policyParseResult;
- }
-
- /**
- * Quick parseExecutionPlan policy.
- */
- public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
- // Validate inputStreams are valid
- Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
- return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
- }
-
- public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
- PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
- try {
- ExecutionPlan executionPlan = SiddhiCompiler.parse(executionPlanQuery);
-
- policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
-
- // Set current execution plan as valid
- policyExecutionPlan.setExecutionPlanSource(executionPlanQuery);
- policyExecutionPlan.setInternalExecutionPlan(executionPlan);
-
- Map<String, List<StreamColumn>> actualInputStreams = new HashMap<>();
- Map<String, List<StreamColumn>> actualOutputStreams = new HashMap<>();
- List<StreamPartition> partitions = new ArrayList<>();
-
- // Go through execution element
- for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
- if (executionElement instanceof Query) {
- // -------------
- // Explain Query
- // -------------
-
- // Input streams
- InputStream inputStream = ((Query) executionElement).getInputStream();
- Selector selector = ((Query) executionElement).getSelector();
-
- for (String streamId : inputStream.getUniqueStreamIds()) {
- if (!actualInputStreams.containsKey(streamId)) {
- org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
- if (streamDefinition != null) {
- actualInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
- } else {
- actualInputStreams.put(streamId, null);
- }
- }
- }
-
- // Window Spec and Partition
- if (inputStream instanceof SingleInputStream) {
- // Window Spec
- List<Window> windows = new ArrayList<>();
- for (StreamHandler streamHandler : ((SingleInputStream) inputStream).getStreamHandlers()) {
- if (streamHandler instanceof Window) {
- windows.add((Window) streamHandler);
- }
- }
-
- // Group By Spec
- List<Variable> groupBy = selector.getGroupByList();
-
- if (windows.size() > 0 || groupBy.size() >= 0) {
- partitions.add(convertSingleStreamWindowAndGroupByToPartition(((SingleInputStream) inputStream).getStreamId(), windows, groupBy));
- }
- }
- // else if(inputStream instanceof JoinInputStream) {
- // // TODO: Parse multiple stream join
- // } else if(inputStream instanceof StateInputStream) {
- // // TODO: Parse StateInputStream
- // }
-
- // Output streams
- OutputStream outputStream = ((Query) executionElement).getOutputStream();
- actualOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
- } else {
- LOG.warn("Unhandled execution element: {}", executionElement.toString());
- }
- }
- // Set used input streams
- policyExecutionPlan.setInputStreams(actualInputStreams);
-
- // Set Partitions
- policyExecutionPlan.setStreamPartitions(partitions);
-
- // Validate outputStreams
- policyExecutionPlan.setOutputStreams(actualOutputStreams);
- } catch (Exception ex) {
- LOG.error("Got error to parseExecutionPlan policy execution plan: \n{}", executionPlanQuery, ex);
- throw ex;
- }
- return policyExecutionPlan;
- }
-
- private static StreamPartition convertSingleStreamWindowAndGroupByToPartition(String streamId, List<Window> windows, List<Variable> groupBy) {
- StreamPartition partition = new StreamPartition();
- partition.setStreamId(streamId);
- StreamSortSpec sortSpec = null;
-
- if (windows.size() > 0) {
- for (Window window : windows) {
- if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) {
- sortSpec = new StreamSortSpec();
- sortSpec.setWindowPeriodMillis(((TimeConstant) window.getParameters()[1]).getValue().intValue());
- sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 3);
- }
- }
- }
- partition.setSortSpec(sortSpec);
- if (groupBy.size() > 0) {
- partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
- partition.setType(StreamPartition.Type.GROUPBY);
- } else {
- partition.setType(StreamPartition.Type.SHUFFLE);
- }
- return partition;
- }
-
- public static PolicyValidationResult validate(PolicyDefinition policy, IMetadataDao metadataDao) {
- Map<String, StreamDefinition> allDefinitions = new HashMap<>();
- for (StreamDefinition definition : metadataDao.listStreams()) {
- allDefinitions.put(definition.getStreamId(), definition);
- }
- return validate(policy, allDefinitions);
- }
-
- public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
- Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
- PolicyValidationResult policyValidationResult = new PolicyValidationResult();
- policyValidationResult.setPolicyDefinition(policy);
- try {
- if (policy.getInputStreams() != null) {
- for (String streamId : policy.getInputStreams()) {
- if (allDefinitions.containsKey(streamId)) {
- inputDefinitions.put(streamId, allDefinitions.get(streamId));
- } else {
- throw new StreamNotDefinedException(streamId);
- }
- }
- }
-
- PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
- // Validate output
- if (policy.getOutputStreams() != null) {
- for (String outputStream : policy.getOutputStreams()) {
- if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
- throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
- }
- }
- }
- policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
- policyValidationResult.setSuccess(true);
- policyValidationResult.setMessage("Validated successfully");
- } catch (Exception exception) {
- LOG.error("Got error to validate policy definition: {}", policy, exception);
- policyValidationResult.setSuccess(false);
- policyValidationResult.setMessage(exception.getMessage());
- policyValidationResult.setStackTrace(exception);
- }
-
- return policyValidationResult;
- }
-
- private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
- return outputAttributeList.stream().map(outputAttribute -> {
- StreamColumn streamColumn = new StreamColumn();
- streamColumn.setName(outputAttribute.getRename());
- streamColumn.setDescription(outputAttribute.getExpression().toString());
- return streamColumn;
- }).collect(Collectors.toList());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
deleted file mode 100644
index 2522270..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyParseResult {
- private boolean success;
- private String message;
- private String exception;
-
- private PolicyExecutionPlan policyExecutionPlan;
-
- public String getException() {
- return exception;
- }
-
- public void setException(String exception) {
- this.exception = exception;
- }
-
- public String getMessage() {
- return message;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
-
- public void setStackTrace(Throwable throwable) {
- this.setException(ExceptionUtils.getStackTrace(throwable));
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- public void setSuccess(boolean success) {
- this.success = success;
- }
-
- public PolicyExecutionPlan getPolicyExecutionPlan() {
- return policyExecutionPlan;
- }
-
- public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
- this.policyExecutionPlan = policyExecutionPlan;
- }
-}