You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by mw...@apache.org on 2016/07/25 09:36:54 UTC
[17/47] incubator-eagle git commit: EAGLE-294 fix default value
fillup issue If a policy metadata field is not set,
add null attributes to input stream for SiddhiCEP
EAGLE-294 fix default value fillup issue
If a policy metadata field is not set, add null attributes to input stream for SiddhiCEP
Author: Huizhi Lu, ihuizhi.lu@gmail.com
Reviewer: Hao Chen, Yong Zhang
Closes #196
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9ad4b635
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9ad4b635
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9ad4b635
Branch: refs/heads/master
Commit: 9ad4b635d1bad7d7db9470b7ab48d99d1e4aa3e1
Parents: e7433a3
Author: yonzhang <yo...@gmail.com>
Authored: Mon May 23 18:17:38 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Mon May 23 18:17:38 2016 -0700
----------------------------------------------------------------------
.../src/main/resources/application.conf | 73 ++++++++++++++++++++
.../src/main/resources/log4j.properties | 40 +++++++++++
.../policy/siddhi/SiddhiPolicyEvaluator.java | 57 ++++++++++-----
3 files changed, 152 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ad4b635/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
new file mode 100644
index 0000000..72c2ae5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+ "envContextConfig" : {
+ "env" : "storm",
+ "mode" : "local",
+ "topologyName" : "SpadesMonitorTopology",
+ "stormConfigFile" : "spades-monitor-storm.yaml",
+ "parallelismConfig" : {
+ "SpadesMonitorStream" : 1,
+ "SpadesMonitorExecutor*" : 1
+ }
+ },
+ "dataSourceConfig": {
+ "topic" : "spades_monitor_sandbox",
+ "zkConnection" : "sandbox.hortonworks.com:2181",
+ "zkConnectionTimeoutMS" : 15000,
+ "consumerGroupId" : "eagle.consumer",
+ "fetchSize" : 1048586,
+ "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer",
+ "transactionZKServers" : "sandbox.hortonworks.com",
+ "transactionZKPort" : 2181,
+ "transactionZKRoot" : "/consumers",
+ "transactionStateUpdateMS" : 2000
+ },
+ "alertExecutorConfigs" : {
+ "SpadesMonitorExecutor" : {
+ "parallelism" : 1,
+ "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+ "needValidation" : "true"
+ }
+ },
+ "eagleProps" : {
+ "site" : "sandbox",
+ "application": "SpadesMonitor",
+ "dataJoinPollIntervalSec" : 30,
+ "mailHost" : "mailHost.com",
+ "mailSmtpPort":"25",
+ "mailDebug" : "true",
+ "balancePartitionEnabled" : true,
+ #"partitionRefreshIntervalInMin" : 60,
+ #"kafkaStatisticRangeInMin" : 60,
+ "eagleService": {
+ "host": "localhost",
+ "port": 9099,
+ "username": "admin",
+ "password": "secret"
+ }
+ "readHdfsUserCommandPatternFrom" : "file"
+ },
+ "dynamicConfigSource" : {
+ "enabled" : true,
+ "initDelayMillis" : 0,
+ "delayMillis" : 30000
+ },
+ "eagleNotificationProps" : {
+ "eagleStoreEnabled": true,
+ "kafka_broker":"127.0.0.1:6667"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ad4b635/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
new file mode 100644
index 0000000..ae58e6b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=info, stdout, DRFA
+
+eagle.log.dir=./logs
+eagle.log.file=eagle.log
+
+
+#log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
+#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
+log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG
+#log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+log4j.appender.DRFA.DatePattern=yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9ad4b635/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
index c289d08..cbee286 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
@@ -207,17 +207,25 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
@Override
public void evaluate(ValuesArray data) throws Exception {
if (!siddhiRuntime.markdownEnabled) {
- if (LOG.isDebugEnabled()) LOG.debug("Siddhi policy evaluator consumers data :" + data);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Siddhi policy evaluator consumers data :" + data);
+ }
Collector outputCollector = (Collector) data.get(0);
String streamName = (String) data.get(1);
- SortedMap map = (SortedMap) data.get(2);
- validateEventInRuntime(streamName, map);
+ SortedMap dataMap = (SortedMap) data.get(2);
+
+ // Get metadata keyset for the stream.
+ Set<String> metadataKeys = StreamMetadataManager.getInstance()
+ .getMetadataEntityMapForStream(streamName).keySet();
+
+ validateEventInRuntime(streamName, dataMap, metadataKeys);
+
synchronized (siddhiRuntime) {
// retain the collector in the context. This assignment is idempotent
context.outputCollector = outputCollector;
- List<Object> input = new ArrayList<>();
- putAttrsIntoInputStream(input, streamName, map);
+ List<Object> input = new ArrayList<Object>();
+ putAttrsIntoInputStream(input, streamName, metadataKeys, dataMap);
siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0]));
}
}
@@ -232,28 +240,41 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
* @param data input event
* @see <a href="https://issues.apache.org/jira/browse/EAGLE-49">https://issues.apache.org/jira/browse/EAGLE-49</a>
*/
- private void validateEventInRuntime(String sourceStream, SortedMap data) {
- if (!needValidation)
+ private void validateEventInRuntime(String sourceStream, SortedMap data, Set<String> metadataKeys) {
+ if (!needValidation) {
return;
- SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(sourceStream);
- if (!map.keySet().equals(data.keySet())) {
+ }
+
+ if (!metadataKeys.equals(data.keySet())) {
Set<Object> badKeys = new TreeSet<>();
- for (Object key : data.keySet()) if (!map.containsKey(key)) badKeys.add(key);
- LOG.warn(String.format("Ignore invalid fields %s in event: %s from stream: %s, valid fields are: %s", badKeys.toString(), data.toString(), sourceStream, map.keySet().toString()));
- for (Object key : badKeys) data.remove(key);
+ for (Object key : data.keySet()) {
+ if (!metadataKeys.contains(key)) {
+ badKeys.add(key);
+ }
+ }
+ LOG.warn(String.format("Ignore invalid fields %s in event: %s from stream: %s, valid fields are: %s",
+ badKeys.toString(), data.toString(), sourceStream, metadataKeys.toString()));
+
+ for (Object key : badKeys) {
+ data.remove(key);
+ }
}
}
- private void putAttrsIntoInputStream(List<Object> input, String streamName, SortedMap map) {
+ private void putAttrsIntoInputStream(List<Object> input, String streamName, Set<String> metadataKeys, SortedMap dataMap) {
if (!needValidation) {
- input.addAll(map.values());
+ input.addAll(dataMap.values());
return;
}
- for (Object key : map.keySet()) {
- Object value = map.get(key);
+
+ // If a metadata field is not set, we put null for the field's value.
+ for (String key : metadataKeys) {
+ Object value = dataMap.get(key);
if (value == null) {
- input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, (String) key));
- } else input.add(value);
+ input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, key));
+ } else {
+ input.add(value);
+ }
}
}