You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/01/20 12:16:58 UTC
incubator-eagle git commit: EAGLE-130 Fix Dynamical Pipeline
Aggregation Problem
Repository: incubator-eagle
Updated Branches:
refs/heads/master c1485aac5 -> a979b0f2a
EAGLE-130 Fix Dynamical Pipeline Aggregation Problem
https://issues.apache.org/jira/browse/EAGLE-130
* Fix Dynamical Pipeline Aggregation Problem
Author: @haoch <ha...@apache.org>
Reviewer: @haoch <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a979b0f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a979b0f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a979b0f2
Branch: refs/heads/master
Commit: a979b0f2a19d2bd8f1f5313baad021ee4efd2821
Parents: c1485aa
Author: Hao Chen <ha...@apache.org>
Authored: Wed Jan 20 19:13:56 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Jan 20 19:13:56 2016 +0800
----------------------------------------------------------------------
.../src/test/resources/pipeline_1.conf | 2 +-
.../src/test/resources/pipeline_4.conf | 19 +++++++++++--------
.../impl/aggregate/AggregateExecutorFactory.java | 4 ++--
.../impl/aggregate/SimpleAggregateExecutor.java | 12 ++++++++++--
.../core/StreamAggregateExpansion.scala | 2 +-
.../datastream/core/StreamAlertExpansion.scala | 9 ++++++---
.../policy/siddhi/SiddhiPolicyDefinition.java | 10 +++++-----
.../policy/siddhi/SiddhiPolicyEvaluator.java | 15 ++++++++++++---
.../policy/siddhi/SiddhiStreamMetadataUtils.java | 2 +-
9 files changed, 49 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
index 8bd4fd3..d7a634c 100644
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
@@ -112,7 +112,7 @@
}
Alert.alert {
- upStreamNames = [metricStream_1,metricStream_2]
+// upStreamNames = [metricStream_1,metricStream_2]
alertExecutorId = defaultAlertExecutor
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
index 9e297ee..609e905 100644
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
@@ -96,14 +96,17 @@
// alertExecutorId = defaultAlertExecutor
}
-// Aggregator.Aggregator{ sql = """
-// @info("query")
-// from JmxStreamOne[value > 100.0] select * insert into OutputStream;
-// """
-// }
-// JmxStreamOne -> Aggregator{
-// grouping = shuffle
-// }
+ Aggregator.Aggregator{ sql = """
+ define stream JmxStreamOne(eagleAlertContext object, timestamp long, metric string, value double);
+ @info(name = "query")
+ from JmxStreamOne[value > 100.0] select * insert into outputStream;
+ """}
+
+
+ JmxStreamOne -> Aggregator {}
+
+ Aggregator -> printer {}
+
// Aggregator -> aggregatedSink{
// grouping = shuffle
// }
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
index 48c832a..d92cde8 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
@@ -44,12 +44,12 @@ public class AggregateExecutorFactory {
public static final AggregateExecutorFactory Instance = new AggregateExecutorFactory();
- public IPolicyExecutor[] createExecutors(String cql) throws Exception {
+ public IPolicyExecutor[] createExecutors(String cql,List<String> upStreamNames) throws Exception {
int numPartitions = 1; //loadExecutorConfig(config, executorId, partitionerCls);
IPolicyExecutor[] executors = new IPolicyExecutor[numPartitions];
for (int i = 0; i < numPartitions ; i++ ) {
- executors[i] = new SimpleAggregateExecutor(cql, "siddhiCEPEngine", i, numPartitions);
+ executors[i] = new SimpleAggregateExecutor(cql, "siddhiCEPEngine", i, numPartitions,upStreamNames);
}
return executors;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
index e42fc48..6593b32 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
@@ -53,6 +53,7 @@ public class SimpleAggregateExecutor
private final String cql;
private final int partitionSeq;
private final int totalPartitionNum;
+ private final String[] sourceStreams;
private String policyId;
private String executorId;
@@ -60,10 +61,17 @@ public class SimpleAggregateExecutor
private AggregateDefinitionAPIEntity aggDef;
private PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator;
- public SimpleAggregateExecutor(String cql, String policyType, int partitionSeq, int totalPartitionNum) {
+ public SimpleAggregateExecutor(String cql, String policyType, int partitionSeq, int totalPartitionNum,List<String> sourceStreams) {
this.cql = cql;
this.partitionSeq = partitionSeq;
this.totalPartitionNum = totalPartitionNum;
+
+ if(sourceStreams == null){
+ this.sourceStreams = new String[]{Constants.EAGLE_DEFAULT_POLICY_NAME};
+ }else{
+ this.sourceStreams = sourceStreams.toArray(new String[sourceStreams.size()]);
+ }
+
// create an fixed definition policy api entity, and indicate it has full definition
aggDef = new AggregateDefinitionAPIEntity();
aggDef.setTags(new HashMap<String, String>());
@@ -128,7 +136,7 @@ public class SimpleAggregateExecutor
// Create evaluator instances
pe = (PolicyEvaluator<AggregateDefinitionAPIEntity>) evalCls
.getConstructor(Config.class, String.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
- .newInstance(config, alertDef.getTags().get(Constants.POLICY_ID), policyDef, new String[]{Constants.EAGLE_DEFAULT_POLICY_NAME}, false);
+ .newInstance(config, alertDef.getTags().get(Constants.POLICY_ID), policyDef, sourceStreams, false);
} catch (Exception ex) {
LOG.error("Fail creating new policyEvaluator", ex);
LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
index ffb4b9e..c5acab6 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
@@ -43,7 +43,7 @@ class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(conf
val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
val analyzeExecutors = if (cepQl != null) {
- AggregateExecutorFactory.Instance.createExecutors(cepQl)
+ AggregateExecutorFactory.Instance.createExecutors(cepQl,upStreamNames)
} else {
AggregateExecutorFactory.Instance.createExecutors(config, upStreamNames, analyzerId)
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
index 1ef57cc..700e21d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
@@ -175,10 +175,13 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
}
case _: MapperProducer[Any,Any] => {
val mapper = current.asInstanceOf[MapperProducer[Any,Any]].fn
- val newfun: (Any => Any) = {
- a => mapper(a) match {
+ val newfun: (Any => Any) = { a =>
+ val result = mapper(a)
+ result match {
+ case scala.Tuple1(x1) => (null, upStreamName, x1)
case scala.Tuple2(x1, x2) => (x1, upStreamName, x2)
- case _ => throw new IllegalArgumentException
+ case scala.Tuple3(_, _, _) => result
+ case _ => throw new IllegalArgumentException(s"Illegal message :$result, Tuple1/Tuple2/Tuple3 are supported")
}
}
current match {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
index 421a464..639c15b 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
@@ -32,14 +32,14 @@ import org.apache.eagle.policy.config.AbstractPolicyDefinition;
public class SiddhiPolicyDefinition extends AbstractPolicyDefinition {
private String expression;
- private boolean containsDefintion;
+ private boolean containsDefinition;
- public boolean isContainsDefintion() {
- return containsDefintion;
+ public boolean isContainsDefinition() {
+ return containsDefinition;
}
- public void setContainsDefintion(boolean containsDefintion) {
- this.containsDefintion = containsDefintion;
+ public void setContainsDefinition(boolean containsDefinition) {
+ this.containsDefinition = containsDefinition;
}
public String getExpression() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
index 13db689..69d65d5 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
@@ -33,6 +33,7 @@ import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.query.api.execution.query.Query;
import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
+import org.wso2.siddhi.query.compiler.exception.SiddhiParserException;
import java.lang.reflect.Field;
import java.util.*;
@@ -107,7 +108,7 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
// compose execution plan sql
String executionPlan = policyDef.getExpression();
- if (!policyDef.isContainsDefintion()) {
+ if (!policyDef.isContainsDefinition()) {
StringBuilder sb = new StringBuilder();
for (String sourceStream : sourceStreams) {
String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream);
@@ -119,7 +120,13 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
executionPlan = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + expression;
}
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
+ ExecutionPlanRuntime executionPlanRuntime = null;
+ try {
+ executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
+ }catch (SiddhiParserException ex){
+ LOG.error("Failed to parse: "+executionPlan,ex);
+ throw ex;
+ }
for(String sourceStream : sourceStreams){
siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream));
@@ -177,7 +184,9 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
// input.add(streamName);
putAttrsIntoInputStream(input, streamName, map);
try {
- siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0]));
+ InputHandler inputHandler = siddhiRuntime.siddhiInputHandlers.get(streamName);
+ if(inputHandler == null) throw new NullPointerException("InputHandler for stream ["+streamName+"] is not found");
+ inputHandler.send(input.toArray(new Object[input.size()]));
}catch (InterruptedException ex){
LOG.error("Got exception "+ex.getMessage(),ex);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
index d14cf83..33c7bc9 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
@@ -35,7 +35,7 @@ public class SiddhiStreamMetadataUtils {
public static SortedMap<String, AlertStreamSchemaEntity> getAttrMap(String streamName) {
SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName);
if(map == null || map.size() == 0){
- throw new IllegalStateException("Alert stream schema should never be empty");
+ throw new IllegalStateException("Alert stream schema ["+streamName+"] should never be empty");
}
return map;
}