You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/01/20 12:16:58 UTC

incubator-eagle git commit: EAGLE-130 Fix Dynamical Pipeline Aggregation Problem

Repository: incubator-eagle
Updated Branches:
  refs/heads/master c1485aac5 -> a979b0f2a


EAGLE-130 Fix Dynamical Pipeline Aggregation Problem

https://issues.apache.org/jira/browse/EAGLE-130

* Fix Dynamical Pipeline Aggregation Problem

Author: @haoch <ha...@apache.org>
Reviewer: @haoch <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a979b0f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a979b0f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a979b0f2

Branch: refs/heads/master
Commit: a979b0f2a19d2bd8f1f5313baad021ee4efd2821
Parents: c1485aa
Author: Hao Chen <ha...@apache.org>
Authored: Wed Jan 20 19:13:56 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Jan 20 19:13:56 2016 +0800

----------------------------------------------------------------------
 .../src/test/resources/pipeline_1.conf           |  2 +-
 .../src/test/resources/pipeline_4.conf           | 19 +++++++++++--------
 .../impl/aggregate/AggregateExecutorFactory.java |  4 ++--
 .../impl/aggregate/SimpleAggregateExecutor.java  | 12 ++++++++++--
 .../core/StreamAggregateExpansion.scala          |  2 +-
 .../datastream/core/StreamAlertExpansion.scala   |  9 ++++++---
 .../policy/siddhi/SiddhiPolicyDefinition.java    | 10 +++++-----
 .../policy/siddhi/SiddhiPolicyEvaluator.java     | 15 ++++++++++++---
 .../policy/siddhi/SiddhiStreamMetadataUtils.java |  2 +-
 9 files changed, 49 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
index 8bd4fd3..d7a634c 100644
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
@@ -112,7 +112,7 @@
 		}
 
 		Alert.alert {
-			upStreamNames = [metricStream_1,metricStream_2]
+//			upStreamNames = [metricStream_1,metricStream_2]
 			alertExecutorId = defaultAlertExecutor
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
index 9e297ee..609e905 100644
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
@@ -96,14 +96,17 @@
 			// alertExecutorId = defaultAlertExecutor
 		}
 
-//		Aggregator.Aggregator{ sql = """
-//				@info("query")
-//				from JmxStreamOne[value > 100.0] select * insert into OutputStream;
-//			"""
-//		}
-//		JmxStreamOne -> Aggregator{
-//			grouping = shuffle
-//		}
+		Aggregator.Aggregator{ sql = """
+				define stream JmxStreamOne(eagleAlertContext object, timestamp long, metric string, value double);
+				@info(name = "query")
+				from JmxStreamOne[value > 100.0] select * insert into outputStream;
+		"""}
+
+
+		JmxStreamOne -> Aggregator {}
+
+		Aggregator -> printer {}
+
 //		Aggregator -> aggregatedSink{
 //			grouping = shuffle
 //		}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
index 48c832a..d92cde8 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
@@ -44,12 +44,12 @@ public class AggregateExecutorFactory {
 	public static final AggregateExecutorFactory Instance = new AggregateExecutorFactory();
 
 
-	public IPolicyExecutor[] createExecutors(String cql) throws Exception {
+	public IPolicyExecutor[] createExecutors(String cql,List<String> upStreamNames) throws Exception {
 		int numPartitions = 1; //loadExecutorConfig(config, executorId, partitionerCls);
 
 		IPolicyExecutor[] executors = new IPolicyExecutor[numPartitions];
 		for (int i = 0; i < numPartitions ; i++ ) {
-			executors[i] = new SimpleAggregateExecutor(cql, "siddhiCEPEngine", i, numPartitions);
+			executors[i] = new SimpleAggregateExecutor(cql, "siddhiCEPEngine", i, numPartitions,upStreamNames);
 		}
 
 		return executors;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
index e42fc48..6593b32 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
@@ -53,6 +53,7 @@ public class SimpleAggregateExecutor
     private final String cql;
     private final int partitionSeq;
     private final int totalPartitionNum;
+    private final String[] sourceStreams;
 
     private String policyId;
     private String executorId;
@@ -60,10 +61,17 @@ public class SimpleAggregateExecutor
     private AggregateDefinitionAPIEntity aggDef;
     private PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator;
 
-    public SimpleAggregateExecutor(String cql, String policyType, int partitionSeq, int totalPartitionNum) {
+    public SimpleAggregateExecutor(String cql, String policyType, int partitionSeq, int totalPartitionNum,List<String> sourceStreams) {
         this.cql = cql;
         this.partitionSeq = partitionSeq;
         this.totalPartitionNum = totalPartitionNum;
+
+        if(sourceStreams == null){
+            this.sourceStreams = new String[]{Constants.EAGLE_DEFAULT_POLICY_NAME};
+        }else{
+            this.sourceStreams = sourceStreams.toArray(new String[sourceStreams.size()]);
+        }
+
         // create an fixed definition policy api entity, and indicate it has full definition
         aggDef = new AggregateDefinitionAPIEntity();
         aggDef.setTags(new HashMap<String, String>());
@@ -128,7 +136,7 @@ public class SimpleAggregateExecutor
             // Create evaluator instances
             pe = (PolicyEvaluator<AggregateDefinitionAPIEntity>) evalCls
                     .getConstructor(Config.class, String.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
-                    .newInstance(config, alertDef.getTags().get(Constants.POLICY_ID), policyDef, new String[]{Constants.EAGLE_DEFAULT_POLICY_NAME}, false);
+                    .newInstance(config, alertDef.getTags().get(Constants.POLICY_ID), policyDef, sourceStreams, false);
         } catch (Exception ex) {
             LOG.error("Fail creating new policyEvaluator", ex);
             LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
index ffb4b9e..c5acab6 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
@@ -43,7 +43,7 @@ class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(conf
         val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
 
         val analyzeExecutors = if (cepQl != null) {
-          AggregateExecutorFactory.Instance.createExecutors(cepQl)
+          AggregateExecutorFactory.Instance.createExecutors(cepQl,upStreamNames)
         } else {
           AggregateExecutorFactory.Instance.createExecutors(config, upStreamNames, analyzerId)
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
index 1ef57cc..700e21d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
@@ -175,10 +175,13 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
       }
       case _: MapperProducer[Any,Any] => {
         val mapper = current.asInstanceOf[MapperProducer[Any,Any]].fn
-        val newfun: (Any => Any) = {
-          a => mapper(a) match {
+        val newfun: (Any => Any) = { a =>
+          val result = mapper(a)
+          result match {
+            case scala.Tuple1(x1) => (null, upStreamName, x1)
             case scala.Tuple2(x1, x2) => (x1, upStreamName, x2)
-            case _ => throw new IllegalArgumentException
+            case scala.Tuple3(_, _, _) => result
+            case _ => throw new IllegalArgumentException(s"Illegal message :$result, Tuple1/Tuple2/Tuple3 are supported")
           }
         }
         current match {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
index 421a464..639c15b 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
@@ -32,14 +32,14 @@ import org.apache.eagle.policy.config.AbstractPolicyDefinition;
 public class SiddhiPolicyDefinition extends AbstractPolicyDefinition {
 	private String expression;
 
-	private boolean containsDefintion;
+	private boolean containsDefinition;
 
-	public boolean isContainsDefintion() {
-		return containsDefintion;
+	public boolean isContainsDefinition() {
+		return containsDefinition;
 	}
 
-	public void setContainsDefintion(boolean containsDefintion) {
-		this.containsDefintion = containsDefintion;
+	public void setContainsDefinition(boolean containsDefinition) {
+		this.containsDefinition = containsDefinition;
 	}
 
 	public String getExpression() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
index 13db689..69d65d5 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
@@ -33,6 +33,7 @@ import org.wso2.siddhi.core.query.output.callback.QueryCallback;
 import org.wso2.siddhi.core.stream.input.InputHandler;
 import org.wso2.siddhi.query.api.execution.query.Query;
 import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
+import org.wso2.siddhi.query.compiler.exception.SiddhiParserException;
 
 import java.lang.reflect.Field;
 import java.util.*;
@@ -107,7 +108,7 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 
 		// compose execution plan sql
 		String executionPlan = policyDef.getExpression();
-		if (!policyDef.isContainsDefintion()) {
+		if (!policyDef.isContainsDefinition()) {
 			StringBuilder sb = new StringBuilder();
 			for (String sourceStream : sourceStreams) {
 				String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream);
@@ -119,7 +120,13 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 			executionPlan = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + expression;
 		}
 
-		ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
+		ExecutionPlanRuntime executionPlanRuntime = null;
+		try {
+				executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
+		}catch (SiddhiParserException ex){
+			LOG.error("Failed to parse: "+executionPlan,ex);
+			throw ex;
+		}
 
 		for(String sourceStream : sourceStreams){
 			siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream));
@@ -177,7 +184,9 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 			// input.add(streamName);
 			putAttrsIntoInputStream(input, streamName, map);
             try {
-                siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0]));
+                InputHandler inputHandler = siddhiRuntime.siddhiInputHandlers.get(streamName);
+				if(inputHandler == null) throw new NullPointerException("InputHandler for stream ["+streamName+"] is not found");
+				inputHandler.send(input.toArray(new Object[input.size()]));
             }catch (InterruptedException ex){
                 LOG.error("Got exception "+ex.getMessage(),ex);
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a979b0f2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
index d14cf83..33c7bc9 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
@@ -35,7 +35,7 @@ public class SiddhiStreamMetadataUtils {
 	public static SortedMap<String, AlertStreamSchemaEntity> getAttrMap(String streamName) {
 		SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName);
 		if(map == null || map.size() == 0){
-			throw new IllegalStateException("Alert stream schema should never be empty");
+			throw new IllegalStateException("Alert stream schema ["+streamName+"] should never be empty");
 		}
 		return map;
 	}