You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by mw...@apache.org on 2016/07/25 09:36:54 UTC

[17/47] incubator-eagle git commit: EAGLE-294 fix default value fillup issue If a policy metadata field is not set, add null attributes to input stream for SiddhiCEP

EAGLE-294 fix default value fillup issue
 If a policy metadata field is not set, add null attributes to input stream for SiddhiCEP

Author: Huizhi Lu, ihuizhi.lu@gmail.com
Reviewer: Hao Chen, Yong Zhang

Closes #196


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9ad4b635
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9ad4b635
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9ad4b635

Branch: refs/heads/master
Commit: 9ad4b635d1bad7d7db9470b7ab48d99d1e4aa3e1
Parents: e7433a3
Author: yonzhang <yo...@gmail.com>
Authored: Mon May 23 18:17:38 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Mon May 23 18:17:38 2016 -0700

----------------------------------------------------------------------
 .../src/main/resources/application.conf         | 73 ++++++++++++++++++++
 .../src/main/resources/log4j.properties         | 40 +++++++++++
 .../policy/siddhi/SiddhiPolicyEvaluator.java    | 57 ++++++++++-----
 3 files changed, 152 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ad4b635/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
new file mode 100644
index 0000000..72c2ae5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "envContextConfig" : {
+    "env" : "storm",
+    "mode" : "local",
+    "topologyName" : "SpadesMonitorTopology",
+    "stormConfigFile" : "spades-monitor-storm.yaml",
+    "parallelismConfig" : {
+      "SpadesMonitorStream" : 1,
+      "SpadesMonitorExecutor*" : 1
+    }
+  },
+  "dataSourceConfig": {
+    "topic" : "spades_monitor_sandbox",
+    "zkConnection" : "sandbox.hortonworks.com:2181",
+    "zkConnectionTimeoutMS" : 15000,
+    "consumerGroupId" : "eagle.consumer",
+    "fetchSize" : 1048586,
+    "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer",
+    "transactionZKServers" : "sandbox.hortonworks.com",
+    "transactionZKPort" : 2181,
+    "transactionZKRoot" : "/consumers",
+    "transactionStateUpdateMS" : 2000
+  },
+  "alertExecutorConfigs" : {
+    "SpadesMonitorExecutor" : {
+      "parallelism" : 1,
+      "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+      "needValidation" : "true"
+    }
+  },
+  "eagleProps" : {
+    "site" : "sandbox",
+    "application": "SpadesMonitor",
+    "dataJoinPollIntervalSec" : 30,
+    "mailHost" : "mailHost.com",
+    "mailSmtpPort":"25",
+    "mailDebug" : "true",
+    "balancePartitionEnabled" : true,
+    #"partitionRefreshIntervalInMin" : 60,
+    #"kafkaStatisticRangeInMin" : 60,
+    "eagleService": {
+      "host": "localhost",
+      "port": 9099,
+      "username": "admin",
+      "password": "secret"
+    }
+    "readHdfsUserCommandPatternFrom" : "file"
+  },
+  "dynamicConfigSource" : {
+    "enabled" : true,
+    "initDelayMillis" : 0,
+    "delayMillis" : 30000
+  },
+  "eagleNotificationProps" : {
+    "eagleStoreEnabled": true,
+    "kafka_broker":"127.0.0.1:6667"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ad4b635/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
new file mode 100644
index 0000000..ae58e6b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=info, stdout, DRFA
+
+eagle.log.dir=./logs
+eagle.log.file=eagle.log
+
+
+#log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
+#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
+log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG
+#log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+log4j.appender.DRFA.DatePattern=yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ad4b635/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
index c289d08..cbee286 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
@@ -207,17 +207,25 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
     @Override
     public void evaluate(ValuesArray data) throws Exception {
         if (!siddhiRuntime.markdownEnabled) {
-            if (LOG.isDebugEnabled()) LOG.debug("Siddhi policy evaluator consumers data :" + data);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Siddhi policy evaluator consumers data :" + data);
+            }
             Collector outputCollector = (Collector) data.get(0);
             String streamName = (String) data.get(1);
-            SortedMap map = (SortedMap) data.get(2);
-            validateEventInRuntime(streamName, map);
+            SortedMap dataMap = (SortedMap) data.get(2);
+
+            // Get metadata keyset for the stream.
+            Set<String> metadataKeys = StreamMetadataManager.getInstance()
+                    .getMetadataEntityMapForStream(streamName).keySet();
+
+            validateEventInRuntime(streamName, dataMap, metadataKeys);
+
             synchronized (siddhiRuntime) {
                 // retain the collector in the context. This assignment is idempotent
                 context.outputCollector = outputCollector;
 
-                List<Object> input = new ArrayList<>();
-                putAttrsIntoInputStream(input, streamName, map);
+                List<Object> input = new ArrayList<Object>();
+                putAttrsIntoInputStream(input, streamName, metadataKeys, dataMap);
                 siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0]));
             }
         }
@@ -232,28 +240,41 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
      * @param data         input event
      * @see <a href="https://issues.apache.org/jira/browse/EAGLE-49">https://issues.apache.org/jira/browse/EAGLE-49</a>
      */
-    private void validateEventInRuntime(String sourceStream, SortedMap data) {
-        if (!needValidation)
+    private void validateEventInRuntime(String sourceStream, SortedMap data, Set<String> metadataKeys) {
+        if (!needValidation) {
             return;
-        SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(sourceStream);
-        if (!map.keySet().equals(data.keySet())) {
+        }
+
+        if (!metadataKeys.equals(data.keySet())) {
             Set<Object> badKeys = new TreeSet<>();
-            for (Object key : data.keySet()) if (!map.containsKey(key)) badKeys.add(key);
-            LOG.warn(String.format("Ignore invalid fields %s in event: %s from stream: %s, valid fields are: %s", badKeys.toString(), data.toString(), sourceStream, map.keySet().toString()));
-            for (Object key : badKeys) data.remove(key);
+            for (Object key : data.keySet()) {
+                if (!metadataKeys.contains(key)) {
+                    badKeys.add(key);
+                }
+            }
+            LOG.warn(String.format("Ignore invalid fields %s in event: %s from stream: %s, valid fields are: %s",
+                    badKeys.toString(), data.toString(), sourceStream, metadataKeys.toString()));
+
+            for (Object key : badKeys) {
+                data.remove(key);
+            }
         }
     }
 
-    private void putAttrsIntoInputStream(List<Object> input, String streamName, SortedMap map) {
+    private void putAttrsIntoInputStream(List<Object> input, String streamName, Set<String> metadataKeys, SortedMap dataMap) {
         if (!needValidation) {
-            input.addAll(map.values());
+            input.addAll(dataMap.values());
             return;
         }
-        for (Object key : map.keySet()) {
-            Object value = map.get(key);
+
+        // If a metadata field is not set, we put null for the field's value.
+        for (String key : metadataKeys) {
+            Object value = dataMap.get(key);
             if (value == null) {
-                input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, (String) key));
-            } else input.add(value);
+                input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, key));
+            } else {
+                input.add(value);
+            }
         }
     }