You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/14 06:23:11 UTC
[11/13] incubator-eagle git commit: EAGLE-341 clean inner process
alert engine code clean inner process alert engine code
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
deleted file mode 100644
index 25e5cff..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dedup;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.config.DeduplicatorConfig;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.DynamicPolicyLoader;
-import org.apache.eagle.policy.PolicyLifecycleMethods;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import scala.Tuple2;
-
-public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(AlertDeduplicationExecutorBase.class);
- protected Config config;
- protected DEDUP_TYPE dedupType;
-
- private List<String> alertExecutorIdList;
- private volatile CopyOnWriteHashMap<String, DefaultDeduplicator> alertDedups;
- private PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao;
-
- public enum DEDUP_TYPE {
- ENTITY,
- EMAIL
- }
-
- public AlertDeduplicationExecutorBase(List<String> alertExecutorIdList, DEDUP_TYPE dedupType, PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao){
- this.alertExecutorIdList = alertExecutorIdList;
- this.dedupType = dedupType;
- this.dao = dao;
- }
-
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- public DefaultDeduplicator createAlertDedup(AlertDefinitionAPIEntity alertDef) {
- DeduplicatorConfig dedupConfig = null;
- try {
- dedupConfig = JsonSerDeserUtils.deserialize(alertDef.getDedupeDef(), DeduplicatorConfig.class);
- }
- catch (Exception ex) {
- LOG.warn("Initial dedup Config error, " + ex.getMessage());
- }
-
- if (dedupConfig != null) {
- return new DefaultDeduplicator(dedupConfig.getAlertDedupIntervalMin(), dedupConfig.getFields());
- }
-
- return null;
- }
-
- @Override
- public void init() {
- String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
- String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION);
- Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
- try {
- initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
- }
- catch (Exception ex) {
- LOG.error("fail to initialize initialAlertDefs: ", ex);
- throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
- }
- Map<String, DefaultDeduplicator> tmpDeduplicators = new HashMap<>();
- if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
- LOG.warn("No alert definitions was found for site: "+site+", dataSource: "+dataSource);
- } else {
- for (String alertExecutorId: alertExecutorIdList) {
- if(initialAlertDefs.containsKey(alertExecutorId)){
- for(AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()){
- try {
- DefaultDeduplicator deduplicator = createAlertDedup(alertDef);
- if (deduplicator != null)
- tmpDeduplicators.put(alertDef.getTags().get(Constants.POLICY_ID), deduplicator);
- else LOG.warn("The dedup interval is not set, alertDef: " + alertDef);
- }
- catch (Throwable t) {
- LOG.error("Got an exception when initial dedup config, probably dedup config is not set: " + t.getMessage() + "," + alertDef);
- }
- }
- } else {
- LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId));
- }
- }
- }
-
- alertDedups = new CopyOnWriteHashMap<>();
- alertDedups.putAll(tmpDeduplicators);
- DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
- policyLoader.init(initialAlertDefs, dao, config);
- for (String alertExecutorId : alertExecutorIdList) {
- policyLoader.addPolicyChangeListener(alertExecutorId, this);
- }
- }
-
- @Override
- public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector){
- String policyId = (String) input.get(0);
- AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
- DefaultDeduplicator dedup;
- synchronized(alertDedups) {
- dedup = alertDedups.get(policyId);
- }
-
- List<AlertAPIEntity> ret = Arrays.asList(alertEntity);
- if (dedup == null) {
- LOG.warn("Dedup config for policyId " + policyId + " is not set or is not a valid config");
- } else {
- if (dedup.getDedupIntervalMin() == -1) {
- LOG.warn("the dedup interval is set as -1, which mean all alerts should be deduped(skipped)");
- return;
- }
- ret = dedup.dedup(ret);
- }
- for (AlertAPIEntity entity : ret) {
- outputCollector.collect(new Tuple2(policyId, entity));
- }
- }
-
- public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
- if(LOG.isDebugEnabled()) LOG.debug("Alert dedup config to be added : " + added);
- for(AlertDefinitionAPIEntity alertDef : added.values()){
- LOG.info("Alert dedup config really added " + alertDef);
- DefaultDeduplicator dedup = createAlertDedup(alertDef);
- if (dedup != null) {
- synchronized(alertDedups) {
- alertDedups.put(alertDef.getTags().get(Constants.POLICY_ID), dedup);
- }
- }
- }
- }
-
- public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
- LOG.info("Alert dedup config changed : " + changed);
- for(AlertDefinitionAPIEntity alertDef : changed.values()){
- LOG.info("Alert dedup config really changed " + alertDef);
- DefaultDeduplicator dedup = createAlertDedup(alertDef);
- if (dedup != null) {
- synchronized(alertDedups) {
- alertDedups.put(alertDef.getTags().get(Constants.POLICY_ID), dedup);
- }
- }
- }
- }
-
- public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
- LOG.info("alert dedup config deleted : " + deleted);
- for(AlertDefinitionAPIEntity alertDef : deleted.values()){
- LOG.info("alert dedup config deleted " + alertDef);
- // no cleanup to do, just remove it
- synchronized(alertDedups) {
- alertDedups.remove(alertDef.getTags().get(Constants.POLICY_ID));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
deleted file mode 100644
index 8947d2c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dedup;
-
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-
-import java.util.List;
-
-public class AlertEmailDeduplicationExecutor extends AlertDeduplicationExecutorBase {
-
- private static final long serialVersionUID = 1L;
-
- public AlertEmailDeduplicationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO dao){
- super(alertExecutorIdList, DEDUP_TYPE.EMAIL, dao);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
deleted file mode 100644
index b30dbda..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dedup;
-
-import java.util.List;
-
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-
-public class AlertEntityDeduplicationExecutor extends AlertDeduplicationExecutorBase {
-
- private static final long serialVersionUID = 1L;
-
- public AlertEntityDeduplicationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao){
- super(alertExecutorIdList, DEDUP_TYPE.ENTITY, dao);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
deleted file mode 100644
index 1d79f9f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dedup;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.time.DateUtils;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.common.metric.AlertContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DefaultDeduplicator implements EntityDeduplicator {
- protected long dedupIntervalMin;
- protected List<String> fields;
- protected Map<EntityDedupKey, Long> entites = new HashMap<>();
- public static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
-
- public static enum AlertDeduplicationStatus{
- NEW,
- DUPLICATED,
- IGNORED
- }
-
- public DefaultDeduplicator() {
- this.dedupIntervalMin = 0;
- fields = null;
- }
-
- public DefaultDeduplicator(long intervalMin, List<String> fields) {
- this.dedupIntervalMin = intervalMin;
- this.fields = fields;
- }
-
- public void clearOldCache() {
- List<EntityDedupKey> removedkeys = new ArrayList<>();
- for (Entry<EntityDedupKey, Long> entry : entites.entrySet()) {
- EntityDedupKey entity = entry.getKey();
- if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
- removedkeys.add(entry.getKey());
- }
- }
- for (EntityDedupKey alertKey : removedkeys) {
- entites.remove(alertKey);
- }
- }
-
- public AlertDeduplicationStatus checkDedup(EntityDedupKey key){
- long current = key.timestamp;
- if(!entites.containsKey(key)){
- entites.put(key, current);
- return AlertDeduplicationStatus.NEW;
- }
-
- long last = entites.get(key);
- if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE){
- entites.put(key, current);
- return AlertDeduplicationStatus.DUPLICATED;
- }
-
- return AlertDeduplicationStatus.IGNORED;
- }
-
- private List<String> getKeyList(AlertAPIEntity entity) {
- List<String> keys = new ArrayList<>(entity.getTags().values());
- if(fields != null && !fields.isEmpty()) {
- for (String field: fields) {
- AlertContext context = entity.getWrappedAlertContext();
- keys.add(context.getProperty(field));
- }
- }
- return keys;
- }
-
- public List<AlertAPIEntity> dedup(List<AlertAPIEntity> list) {
- clearOldCache();
- List<AlertAPIEntity> dedupList = new ArrayList<>();
- int totalCount = list.size();
- int dedupedCount = 0;
- for(AlertAPIEntity entity: list) {
- if (entity.getTags() == null) {
- if(LOG.isDebugEnabled()) LOG.debug("Tags is null, don't know how to deduplicate, do nothing");
- } else {
- AlertDeduplicationStatus status = checkDedup(new EntityDedupKey(getKeyList(entity), entity.getTimestamp()));
- if (!status.equals(AlertDeduplicationStatus.IGNORED)) {
- dedupList.add(entity);
- } else {
- dedupedCount++;
- if (LOG.isDebugEnabled())
- LOG.debug(String.format("Entity is skipped because it's duplicated: " + entity.toString()));
- }
- }
- }
-
- if(dedupedCount>0){
- LOG.info(String.format("Skipped %s of %s alerts because they are duplicated", dedupedCount, totalCount));
- }else if(LOG.isDebugEnabled()){
- LOG.debug(String.format("Skipped %s of %s duplicated alerts",dedupedCount,totalCount));
- }
-
- return dedupList;
- }
-
- public EntityDeduplicator setDedupIntervalMin(long dedupIntervalMin) {
- this.dedupIntervalMin = dedupIntervalMin;
- return this;
- }
-
- public long getDedupIntervalMin() {
- return dedupIntervalMin;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java
deleted file mode 100644
index 36b83e1..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.dedup;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-public class EntityDedupKey {
- public List<String> values;
- public Long timestamp; // entity's timestamp
- public long createdTime; // entityTagsUniq's created time, for cache removal;
-
- private static final Logger LOG = LoggerFactory.getLogger(EntityDedupKey.class);
-
- public EntityDedupKey(List<String> values, long timestamp) {
- this.values = new ArrayList<>(values);
- this.timestamp = timestamp;
- this.createdTime = System.currentTimeMillis();
- }
-
- public boolean equals(Object obj) {
- if (obj instanceof EntityDedupKey) {
- EntityDedupKey key = (EntityDedupKey) obj;
- if (key == null || key.values.size() != values.size()) {
- return false;
- }
- return values.equals(key.values);
- }
- return false;
- }
-
- public int hashCode() {
- HashCodeBuilder builder = new HashCodeBuilder();
- for (String value : values) {
- builder.append(value);
- }
- return builder.build();
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
deleted file mode 100644
index 85dd19a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dedup;
-
-import java.util.List;
-
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-/**
- * Dedup Eagle entities.
- */
-public interface EntityDeduplicator {
-
- EntityDeduplicator setDedupIntervalMin(long intervalMin);
-
- long getDedupIntervalMin();
-
- List dedup(List<AlertAPIEntity> list);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
deleted file mode 100644
index 81c8ba6..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package org.apache.eagle.alert.dedup;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @since Mar 19, 2015
- */
-public class EntityTagsUniq {
- public Map<String, String> tags;
- public Long timestamp; // entity's timestamp
- public long createdTime; // entityTagsUniq's created time, for cache removal;
-
- private static final Logger LOG = LoggerFactory.getLogger(EntityTagsUniq.class);
-
- public EntityTagsUniq(Map<String, String> tags, long timestamp) {
- this.tags = new HashMap<String, String>(tags);
- this.timestamp = timestamp;
- this.createdTime = System.currentTimeMillis();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof EntityTagsUniq) {
- EntityTagsUniq au = (EntityTagsUniq) obj;
- if (tags.size() != au.tags.size()) return false;
- for (Entry<String, String> keyValue : au.tags.entrySet()) {
- boolean keyExist = tags.containsKey(keyValue.getKey());
- // sanity check
- if (tags.get(keyValue.getKey()) == null || keyValue.getValue() == null) {
- return true;
- }
- if ( !keyExist || !tags.get(keyValue.getKey()).equals(keyValue.getValue())) {
- return false;
- }
- }
- return true;
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- int hashCode = 0;
- for (Map.Entry<String,String> entry : tags.entrySet()) {
- if(entry.getValue() == null) {
- LOG.warn("Tag value for key ["+entry.getKey()+"] is null, skipped for hash code");
- }else {
- try {
- hashCode ^= entry.getValue().hashCode();
- } catch (Throwable t) {
- LOG.info("Got exception because of entry: " + entry, t);
- }
- }
- }
- return hashCode;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
deleted file mode 100644
index 2eee6c5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.executor;
-
-import org.apache.eagle.policy.ResultRender;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.PolicyPartitioner;
-import org.apache.eagle.policy.executor.PolicyProcessExecutor;
-import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender;
-
-public class AlertExecutor extends PolicyProcessExecutor<AlertDefinitionAPIEntity, AlertAPIEntity> {
-
- private final SiddhiAlertAPIEntityRender resultRender = new SiddhiAlertAPIEntityRender();
-
- public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
- PolicyDefinitionDAO<AlertDefinitionAPIEntity> alertDefinitionDao, String[] sourceStreams) {
- super(alertExecutorId, partitioner, numPartitions, partitionSeq, alertDefinitionDao, sourceStreams,
- AlertDefinitionAPIEntity.class);
- }
-
- @Override
- public ResultRender<AlertDefinitionAPIEntity, AlertAPIEntity> getResultRender() {
- return resultRender;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
deleted file mode 100644
index 8ab290e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.executor;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.apache.eagle.policy.PolicyPartitioner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-
-/**
- * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
- *
- * <br/><br/>
- * Explanations for programId, alertExecutorId and policy<br/><br/>
- * - programId - distributed or single-process program for example one storm topology<br/>
- * - alertExecutorId - one process/thread which executes multiple policies<br/>
- * - policy - some rules to be evaluated<br/>
- *
- * <br/>
- *
- * Normally the mapping is like following:
- * <pre>
- * programId (1:N) alertExecutorId
- * alertExecutorId (1:N) policy
- * </pre>
- */
-public class AlertExecutorCreationUtils {
- private final static Logger LOG = LoggerFactory.getLogger(AlertExecutorCreationUtils.class);
-
-
- /**
- * Build DAG Tasks based on persisted alert definition and schemas from eagle store.
- *
- * <h3>Require configuration:</h3>
- *
- * <ul>
- * <li>eagleProps.site: program site id.</li>
- * <li>eagleProps.dataSource: program data source.</li>
- * <li>alertExecutorConfigs: only configured executor will be built into execution tasks.</li>
- * </ul>
- *
- * <h3>Steps:</h3>
- *
- * <ol>
- * <li>(upstreamTasks) => Map[streamName:String,upstreamTask:Task]</li>
- * <li>(dataSource) => Map[alertExecutorId:String,streamName:List[String]]</li>
- * <li>(site,dataSource) => Map[alertExecutorId,Map[policyId,alertDefinition]]</li>
- * <li>(config["alertExecutorConfigs"]) => AlertExecutor(alertExecutorID, partitioner, numPartitions, partitionSeq, alertDefs, alertDefDAO, sourceStreams)[]</li>
- * </ol>
- */
- public static AlertExecutor[] createAlertExecutors(Config config, PolicyDefinitionDAO<AlertDefinitionAPIEntity> alertDefDAO,
- List<String> streamNames, String alertExecutorId) throws Exception{
- // Read `alertExecutorConfigs` from configuration and get config for this alertExecutorId
- int numPartitions =1;
- String partitionerCls = DefaultPolicyPartitioner.class.getCanonicalName();
- String alertExecutorConfigsKey = "alertExecutorConfigs";
- if(config.hasPath(alertExecutorConfigsKey)) {
- Map<String, ConfigValue> alertExecutorConfigs = config.getObject(alertExecutorConfigsKey);
- if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorId)) {
- Map<String, Object> alertExecutorConfig = (Map<String, Object>) alertExecutorConfigs.get(alertExecutorId).unwrapped();
- int parts = 0;
- if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism"));
- numPartitions = parts == 0 ? 1 : parts;
- if(alertExecutorConfig.containsKey("partitioner")) partitionerCls = (String) alertExecutorConfig.get("partitioner");
- }
- }
-
- return createAlertExecutors(alertDefDAO, streamNames, alertExecutorId, numPartitions, partitionerCls);
- }
-
- /**
- * Build alert executors and assign alert definitions between these executors by partitioner (alertExecutorConfigs["${alertExecutorId}"]["partitioner"])
- */
- public static AlertExecutor[] createAlertExecutors(PolicyDefinitionDAO alertDefDAO, List<String> sourceStreams,
- String alertExecutorID, int numPartitions, String partitionerCls) throws Exception{
- LOG.info("Creating alert executors with alertExecutorID: " + alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: "+ partitionerCls);
-
- // TODO: Create sourceStreams with alertExecutorID into AlertExecutorService
-
- PolicyPartitioner partitioner = (PolicyPartitioner)Class.forName(partitionerCls).newInstance();
- AlertExecutor[] alertExecutors = new AlertExecutor[numPartitions];
- String[] _sourceStreams = sourceStreams.toArray(new String[0]);
-
- for(int i = 0; i < numPartitions; i++){
- alertExecutors[i] = new AlertExecutor(alertExecutorID, partitioner, numPartitions, i, alertDefDAO,_sourceStreams);
- }
- return alertExecutors;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
deleted file mode 100644
index af42dd3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package org.apache.eagle.alert.notification;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-
-import org.apache.eagle.common.metric.AlertContext;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.common.AlertEmailSender;
-import org.apache.eagle.alert.email.AlertEmailComponent;
-import org.apache.eagle.alert.email.AlertEmailContext;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.ConfigObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class AlertEmailGenerator{
- private String tplFile;
- private String sender;
- private String recipients;
- private String subject;
- private ConfigObject eagleProps;
-
- private ThreadPoolExecutor executorPool;
-
- private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
-
- private final static long MAX_TIMEOUT_MS =60000;
-
- public void sendAlertEmail(AlertAPIEntity entity) {
- sendAlertEmail(entity, recipients, null);
- }
-
- public void sendAlertEmail(AlertAPIEntity entity, String recipients) {
- sendAlertEmail(entity, recipients, null);
- }
-
- public void sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) {
- AlertEmailContext email = new AlertEmailContext();
-
- AlertEmailComponent component = new AlertEmailComponent();
- component.setAlertContext(AlertContext.fromJsonString(entity.getAlertContext()));
- List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
- components.add(component);
- email.setComponents(components);
- if (AlertContext.fromJsonString(entity.getAlertContext()).getProperty(Constants.SUBJECT) != null) {
- email.setSubject(AlertContext.fromJsonString(entity.getAlertContext()).getProperty(Constants.SUBJECT));
- }
- else email.setSubject(subject);
- email.setVelocityTplFile(tplFile);
- email.setRecipients(recipients);
- email.setCc(cc);
- email.setSender(sender);
-
- /** asynchronized email sending */
- @SuppressWarnings("rawtypes")
- AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
-
- if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
-
- LOG.info("Sending email in asynchronous to: "+recipients+", cc: "+cc);
- Future future = this.executorPool.submit(thread);
- try {
- future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- LOG.info(String.format("Successfully send email to %s", recipients));
- } catch (InterruptedException | ExecutionException e) {
- LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e);
- } catch (TimeoutException e) {
- LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
- }
- }
-
- public String getTplFile() {
- return tplFile;
- }
-
- public void setTplFile(String tplFile) {
- this.tplFile = tplFile;
- }
-
- public String getSender() {
- return sender;
- }
-
- public void setSender(String sender) {
- this.sender = sender;
- }
-
- public String getRecipients() {
- return recipients;
- }
-
- public void setRecipients(String recipients) {
- this.recipients = recipients;
- }
-
- public String getSubject() {
- return subject;
- }
-
- public void setSubject(String subject) {
- this.subject = subject;
- }
-
- public ConfigObject getEagleProps() {
- return eagleProps;
- }
-
- public void setEagleProps(ConfigObject eagleProps) {
- this.eagleProps = eagleProps;
- }
-
- public void setExecutorPool(ThreadPoolExecutor executorPool) {
- this.executorPool = executorPool;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
deleted file mode 100644
index b024c39..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.notification;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.eagle.notification.plugin.NotificationPluginManagerImpl;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.DynamicPolicyLoader;
-import org.apache.eagle.policy.PolicyLifecycleMethods;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor1;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import scala.Tuple1;
-
-/**
- * notify alert by email, kafka message, storage or other means
- */
-public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
- private static final long serialVersionUID = 1690354365435407034L;
- private static final Logger LOG = LoggerFactory.getLogger(AlertNotificationExecutor.class);
- private Config config;
- /** Notification Manager - Responsible for forward and invoke configured Notification Plugin **/
- private NotificationPluginManagerImpl notificationManager;
-
- private List<String> alertExecutorIdList;
- private PolicyDefinitionDAO dao;
-
-
- public AlertNotificationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO dao){
- this.alertExecutorIdList = alertExecutorIdList;
- this.dao = dao;
- }
-
- @Override
- public void init() {
- String site = config.getString("eagleProps.site");
- String application = config.getString("eagleProps.application");
- Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
- try {
- initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId( site, application );
- }
- catch (Exception ex) {
- LOG.error("fail to initialize initialAlertDefs: ", ex);
- throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
- }
-
- if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
- LOG.warn("No alert definitions found for site: "+site+", application: "+ application);
- }
- try{
- notificationManager = new NotificationPluginManagerImpl(config);
- }catch (Exception ex ){
- LOG.error("Fail to initialize NotificationManager: ", ex);
- throw new IllegalStateException("Fail to initialize NotificationManager: ", ex);
- }
-
- DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
- policyLoader.init(initialAlertDefs, dao, config);
- for (String alertExecutorId : alertExecutorIdList) {
- policyLoader.addPolicyChangeListener(alertExecutorId, this);
- }
- }
-
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- @Override
- public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
- AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
- processAlerts(Arrays.asList(alertEntity));
- }
-
- private void processAlerts(List<AlertAPIEntity> list) {
- for (AlertAPIEntity entity : list) {
- notificationManager.notifyAlert(entity);
- }
- }
-
- @Override
- public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
- if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
- for(AlertDefinitionAPIEntity alertDef : added.values()){
- LOG.info("alert notification config really changed " + alertDef);
- notificationManager.updateNotificationPlugins( alertDef , false );
- }
- }
-
- @Override
- public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
- if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be added : " + changed);
- for(AlertDefinitionAPIEntity alertDef : changed.values()){
- LOG.info("alert notification config really added " + alertDef);
- notificationManager.updateNotificationPlugins( alertDef , false );
- }
- }
-
- @Override
- public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
- if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be deleted : " + deleted);
- for(AlertDefinitionAPIEntity alertDef : deleted.values()){
- LOG.info("alert notification config really deleted " + alertDef);
- notificationManager.updateNotificationPlugins( alertDef , true );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
deleted file mode 100644
index 61bb7dc..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.persist;
-
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.Config;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor1;
-import scala.Tuple1;
-
-import java.util.Arrays;
-
-public class AlertPersistExecutor extends JavaStormStreamExecutor1<String> {
-
- private static final long serialVersionUID = 1L;
- private Config config;
- private EaglePersist persist;
-
- public AlertPersistExecutor(){
- }
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- @Override
- public void init() {
- String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
- String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME)
- ? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
- String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)
- ? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
- this.persist = new EaglePersist(host, port, username, password);
- }
-
- @Override
- public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
- persist.doPersist(Arrays.asList((AlertAPIEntity)(input.get(1))));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
deleted file mode 100644
index ebba518..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package org.apache.eagle.alert.persist;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class EaglePersist {
-
- private static Logger LOG = LoggerFactory.getLogger(EaglePersist.class);
- private String eagleServiceHost;
- private int eagleServicePort;
- private String username;
- private String password;
-
- public EaglePersist(String eagleServiceHost, int eagleServicePort) {
- this(eagleServiceHost, eagleServicePort, null, null);
- }
-
- public EaglePersist(String eagleServiceHost, int eagleServicePort, String username, String password) {
- this.eagleServiceHost = eagleServiceHost;
- this.eagleServicePort = eagleServicePort;
- this.username = username;
- this.password = password;
- }
-
- public boolean doPersist(List<? extends TaggedLogAPIEntity> list) {
- if (list.isEmpty()) return false;
- LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
- try {
- IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
- GenericServiceAPIResponseEntity<String> response = client.create(list);
- client.close();
- if (response.isSuccess()) {
- LOG.info("Successfully create entities " + list.toString());
- return true;
- }
- else {
- LOG.error("Fail to create entities");
- return false;
- }
- }
- catch (Exception ex) {
- LOG.error("Got an exception in persisting entities" + ex.getMessage(), ex);
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
deleted file mode 100644
index c7ff74c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.common.metric.AlertContext;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.ResultRender;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.common.UrlBuilder;
-import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
-import org.apache.eagle.policy.siddhi.SiddhiQueryCallbackImpl;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class SiddhiAlertAPIEntityRender implements ResultRender<AlertDefinitionAPIEntity, AlertAPIEntity>, Serializable {
-
- public static final Logger LOG = LoggerFactory.getLogger(SiddhiAlertAPIEntityRender.class);
- public static final String source = ManagementFactory.getRuntimeMXBean().getName();
-
- @Override
- @SuppressWarnings("unchecked")
- public AlertAPIEntity render(Config config, List<Object> results, PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> siddhiAlertContext, long timestamp) {
- List<String> rets = SiddhiQueryCallbackImpl.convertToString(results);
- SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> evaluator = (SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity>) siddhiAlertContext.evaluator;
- String alertExecutorId = siddhiAlertContext.alertExecutor.getExecutorId();
- AlertAPIEntity entity = new AlertAPIEntity();
- AlertContext context = new AlertContext();
- String sourceStreams = evaluator.getAdditionalContext().get(Constants.SOURCE_STREAMS);
- String[] sourceStreamsArr = sourceStreams.split(",");
- List<String> attrRenameList = evaluator.getOutputStreamAttrNameList();
- Map<String, String> tags = new HashMap<String, String>();
- for (String sourceStream : sourceStreamsArr) {
- List<AlertStreamSchemaEntity> list = StreamMetadataManager.getInstance().getMetadataEntitiesForStream(sourceStream.trim());
- for (AlertStreamSchemaEntity alertStream : list) {
- if (alertStream.getUsedAsTag() != null && alertStream.getUsedAsTag() == true) {
- String attrName = alertStream.getTags().get(Constants.ATTR_NAME);
- tags.put(attrName, rets.get(attrRenameList.indexOf(attrName)));
- }
- }
- }
-
- for (int index = 0; index < rets.size(); index++) {
- //attrRenameList.get(0) -> "eagleAlertContext". We need to skip "eagleAlertContext", index is from 1 for attRenameList.
- context.addProperty(attrRenameList.get(index), rets.get(index));
- }
-
- StringBuilder sb = new StringBuilder();
- for (Entry<String, String> entry : context.getProperties().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- sb.append(key + "=\"" + value + "\" ");
- }
- context.addAll(evaluator.getAdditionalContext());
- String policyId = context.getProperty(Constants.POLICY_ID);
- String alertMessage = "The Policy \"" + policyId + "\" has been detected with the below information: " + sb.toString() ;
- String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
- String application = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION);
- String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
- context.addProperty(Constants.ALERT_EVENT, sb.toString());
- context.addProperty(Constants.ALERT_MESSAGE, alertMessage);
- context.addProperty(Constants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
- context.addProperty(EagleConfigConstants.APPLICATION, application);
- context.addProperty(EagleConfigConstants.SITE, site);
- entity.setTimestamp(timestamp);
- /** If we need to add severity tag, we should add severity filed in AbstractpolicyDefinition, and pass it down **/
- tags.put(EagleConfigConstants.SITE, site);
- tags.put(EagleConfigConstants.APPLICATION, application);
- tags.put(Constants.SOURCE_STREAMS, context.getProperty(Constants.SOURCE_STREAMS));
- tags.put(Constants.POLICY_ID, context.getProperty(Constants.POLICY_ID));
- tags.put(Constants.ALERT_SOURCE, source);
- tags.put(Constants.ALERT_EXECUTOR_ID, alertExecutorId);
- entity.setTags(tags);
-
- context.addProperty(Constants.POLICY_DETAIL_URL, UrlBuilder.buiildPolicyDetailUrl(host, port, tags));
- context.addProperty(Constants.ALERT_DETAIL_URL, UrlBuilder.buildAlertDetailUrl(host, port, entity));
- entity.setAlertContext(context.toJsonString());
- return entity;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index eac2bfd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script
deleted file mode 100644
index d4d3795..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-##### create alert related tables
-create 'eagle_metric', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'}
-create 'alertdetail', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'}
-
-create 'alertDataSource', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'}
-create 'alertStream', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'}
-create 'alertExecutor', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'}
-create 'alertStreamSchema', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'}
-create 'alertdef', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF
deleted file mode 100644
index edfb15f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-Manifest-Version: 1.0
-Class-Path:
-
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
deleted file mode 100644
index 20afc95..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.cep;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.executor.AlertExecutor;
-import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.*;
-import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition;
-import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import scala.Tuple2;
-
-import java.util.*;
-import java.util.concurrent.Semaphore;
-
-public class TestSiddhiEvaluator {
- private AlertStreamSchemaEntity createStreamMetaEntity(String attrName, String type) {
- AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
- Map<String, String> tags = new HashMap<String, String>();
- tags.put("application", "hdfsAuditLog");
- tags.put("streamName", "hdfsAuditLogEventStream");
- tags.put("attrName", attrName);
- entity.setTags(tags);
- entity.setAttrType(type);
- return entity;
- }
-
- private int alertCount;
- private Semaphore semaphore;
- @Before
- public void before(){
- alertCount = 0;
- semaphore = new Semaphore(0);
- }
-
- @Test
- public void test() throws Exception{
- Config config = ConfigFactory.load("unittest.conf");
- AlertStreamSchemaDAO streamDao = new AlertStreamSchemaDAOImpl(null, null) {
- @Override
- public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication(String dataSource) throws Exception {
- List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>();
- list.add(createStreamMetaEntity("cmd", "string"));
- list.add(createStreamMetaEntity("dst", "string"));
- list.add(createStreamMetaEntity("src", "string"));
- list.add(createStreamMetaEntity("host", "string"));
- list.add(createStreamMetaEntity("user", "string"));
- list.add(createStreamMetaEntity("timestamp", "long"));
- list.add(createStreamMetaEntity("securityZone", "string"));
- list.add(createStreamMetaEntity("sensitivityType", "string"));
- list.add(createStreamMetaEntity("allowed", "string"));
- return list;
- }
- };
- StreamMetadataManager.getInstance().reset();
- StreamMetadataManager.getInstance().init(config, streamDao);
-
- Map<String, Object> data1 = new TreeMap<String, Object>(){{
- put("cmd", "open");
- put("dst", "");
- put("src", "");
- put("host", "");
- put("user", "");
- put("timestamp", String.valueOf(System.currentTimeMillis()));
- put("securityZone", "");
- put("sensitivityType", "");
- put("allowed", "true");
- }};
- final SiddhiPolicyDefinition policyDef = new SiddhiPolicyDefinition();
- policyDef.setType("siddhiCEPEngine");
- String expression = "from hdfsAuditLogEventStream[cmd=='open'] " +
- "select * " +
- "insert into outputStream ;";
- policyDef.setExpression(expression);
-
- PolicyDefinitionDAO alertDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(null, null),
- Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
- @Override
- public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception {
- return null;
- }
-
- @Override
- public void updatePolicyDetails(AbstractPolicyDefinitionEntity entity) { /* do nothing */ }
- };
-
- AlertExecutor alertExecutor = new AlertExecutor("alertExecutorId", null, 3, 1, alertDao, new String[]{"hdfsAuditLogEventStream"}) {
- @Override
- protected Map<String, String> getDimensions(String policyId) {
- return new HashMap<>();
- }
- };
- alertExecutor.prepareConfig(config);
- alertExecutor.init();
-
- PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> context = new PolicyEvaluationContext<>();
- context.alertExecutor = alertExecutor;
- context.policyId = "testPolicy";
- context.resultRender = new SiddhiAlertAPIEntityRender();
- context.outputCollector = (Collector<Tuple2<String, AlertAPIEntity>>) (stringAlertAPIEntityTuple2) -> {
- alertCount++;
- semaphore.release();
- };
- SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> evaluator =
- new SiddhiPolicyEvaluator<>(config, context, policyDef, new String[]{"hdfsAuditLogEventStream"}, false);
- evaluator.evaluate(new ValuesArray(context.outputCollector, "hdfsAuditLogEventStream", data1));
- Thread.sleep(2 * 1000);
- semaphore.acquire();
- Assert.assertEquals(alertCount, 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
deleted file mode 100644
index f6d6a63..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.config;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-
-public class TestAlertDedup {
-
- @Test
- public void test() throws Exception{
- String alertDef = "{\"alertDedupIntervalMin\":\"10\",\"fields\":[\"key1\",\"key2\",\"key3\"]}";
- DeduplicatorConfig dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
- Assert.assertEquals(dedupConfig.getAlertDedupIntervalMin(), 10);
- Assert.assertEquals(dedupConfig.getFields().size(), 3);
-
- alertDef = "null";
- dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
- Assert.assertEquals(dedupConfig, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
deleted file mode 100644
index f7dcdde..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestAlertDefinitionDAOImpl {
-
- public AlertDefinitionAPIEntity buildTestAlertDefEntity(String site, String programId, String alertExecutorId, String policyId, String policyType) {
- AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
- entity.setEnabled(true);
- Map<String, String> tags = new HashMap<String, String>();
- tags.put("site", site);
- tags.put("programId", programId);
- tags.put("alertExecutorId", alertExecutorId);
- tags.put("policyId", policyId);
- tags.put("policyType", policyType);
- entity.setTags(tags);
- return entity;
- }
-
- @Test
- public void test() throws Exception{
- Config config = ConfigFactory.load();
- String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
- String site = "sandbox";
- String dataSource = "UnitTest";
- PolicyDefinitionDAO dao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(eagleServiceHost, eagleServicePort),
- Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
- @Override
- public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) throws Exception {
- List<AlertDefinitionAPIEntity> list = new ArrayList<AlertDefinitionAPIEntity>();
- list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDA", "TestPolicyTypeA"));
- list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDB", "TestPolicyTypeB"));
- list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDC", "TestPolicyTypeC"));
- list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDD", "TestPolicyTypeD"));
- return list;
- }
-
- @Override
- public void updatePolicyDetails(AbstractPolicyDefinitionEntity entity) { /* do nothing */ }
- };
-
- Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
-
- Assert.assertEquals(2, retMap.size());
- Assert.assertEquals(2, retMap.get("TestExecutor1").size());
- Assert.assertEquals(2, retMap.get("TestExecutor2").size());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
deleted file mode 100644
index d703214..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.siddhi.SiddhiStreamMetadataUtils;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-public class TestSiddhiStreamMetadataUtils {
- @Test
- public void test() throws Exception {
- Config config = ConfigFactory.load();
- StreamMetadataManager.getInstance().reset();
- StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAO() {
- @Override
- public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication(
- String application) {
- return Arrays.asList(generateStreamMetadataAPIEntity("attrName1", "STRING"),
- generateStreamMetadataAPIEntity("attrName2", "LONG")
- );
- }
- });
- String siddhiStreamDef = SiddhiStreamMetadataUtils.convertToStreamDef("testStreamName");
- Assert.assertEquals("define stream " + "testStreamName" + "(attrName1 string,attrName2 long);", siddhiStreamDef);
- }
-
- private AlertStreamSchemaEntity generateStreamMetadataAPIEntity(final String attrName, String attrType){
- AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
- entity.setTags(new HashMap<String, String>(){{
- put("programId", "testProgramId");
- put("streamName", "testStreamName");
- put("attrName", attrName);
- }});
- entity.setAttrType(attrType);
- return entity;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
deleted file mode 100644
index 0bbfc4a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestStreamDefinitionDAOImpl {
-
- public AlertStreamSchemaEntity buildTestStreamDefEntity(String programId, String streamName, String attrName) {
- AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
- entity.setAttrType("String");
- entity.setAttrValueResolver("DefaultAttrValueResolver");
- entity.setCategory("SimpleType");
- Map<String, String> tags = new HashMap<String, String>();
- tags.put("programId", programId);
- tags.put("streamName", streamName);
- tags.put("attrName", attrName);
- entity.setTags(tags);
- return entity;
- }
-
- @Test
- public void test() throws Exception{
- Config config = ConfigFactory.load();
- AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl(null, null) {
- public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication(String application) throws Exception {
- List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>();
- String programId = "UnitTest";
- list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr1"));
- list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr2"));
- list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr3"));
- list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr4"));
- return list;
- }
- };
- StreamMetadataManager.getInstance().reset();
- StreamMetadataManager.getInstance().init(config, dao);
- Map<String, List<AlertStreamSchemaEntity>> retMap = StreamMetadataManager.getInstance().getMetadataEntitiesForAllStreams();
- Assert.assertTrue(retMap.get("TestStream").size() == 4);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
deleted file mode 100644
index bb94908..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.executor;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyManager;
-import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.junit.Assert;
-
-/**
- * @since Dec 18, 2015
- *
- */
-public class TestPolicyExecutor {
-
- public static class T2 extends AbstractPolicyDefinitionEntity {
- @Override
- public String getPolicyDef() {
- return null;
- }
- @Override
- public boolean isEnabled() {
- return false;
- }
- }
-
- // not feasible to Unit test, it requires the local service.
- @Ignore
- @Test
- public void testReflectCreatePolicyEvaluator() throws Exception {
- System.setProperty("config.resource", "/unittest.conf");
- String policyType = Constants.policyType.siddhiCEPEngine.name();
- Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType);
- Config config = ConfigFactory.load();
-
- String def = "{\"expression\":\"from hdfsAuditLogEventStream select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}";
- // test1 : test json deserialization
- AbstractPolicyDefinition policyDef = null;
- policyDef = JsonSerDeserUtils.deserialize(def, AbstractPolicyDefinition.class,
- PolicyManager.getInstance().getPolicyModules(policyType));
- // Assert conversion succeed
- Assert.assertEquals(SiddhiPolicyDefinition.class, policyDef.getClass());
-
- // make sure meta data manager initialized
- StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAOImpl(config));
-
- String[] sourceStreams = new String[] { "hdfsAuditLogEventStream" };
- // test2 : test evaluator
- PolicyEvaluator pe = evalCls.getConstructor(Config.class, String.class, AbstractPolicyDefinition.class,
- String[].class, boolean.class).newInstance(config, "policy-id", policyDef, sourceStreams, false);
-
- PolicyEvaluator<AlertDefinitionAPIEntity> e1 = (PolicyEvaluator<AlertDefinitionAPIEntity>) pe;
-
- PolicyEvaluator<T2> e2 = (PolicyEvaluator<T2>) pe;
-
- }
-
-}