You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:55 UTC
[48/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46]
Rename package name as "org.apache.eagle"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
new file mode 100644
index 0000000..f0f81c5
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertStream")
+@ColumnFamily("f")
+@Prefix("alertStream")
+@Service(AlertConstants.ALERT_STREAM_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "streamName"})
+public class AlertStreamEntity extends TaggedLogAPIEntity{
+ @Column("a")
+ private String desc;
+
+ public String getDesc() {
+ return desc;
+ }
+ public void setDesc(String desc) {
+ this.desc = desc;
+ valueChanged("desc");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
new file mode 100644
index 0000000..76b6097
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+/**
+ * ddl to create streammetadata table
+ *
+ * create 'alertStreamSchema', {NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => '1', COMPRESSION => 'SNAPPY'}
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertStreamSchema")
+@ColumnFamily("f")
+@Prefix("alertStreamSchema")
+@Service(AlertConstants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "streamName", "attrName"})
+public class AlertStreamSchemaEntity extends TaggedLogAPIEntity{
+ @Column("a")
+ private String attrType;
+ @Column("b")
+ private String category;
+ @Column("c")
+ private String attrValueResolver;
+ /* all tags form the key for alert de-duplication */
+ @Column("d")
+ private Boolean usedAsTag;
+ @Column("e")
+ private String attrDescription;
+ @Column("f")
+ private String attrDisplayName;
+ @Column("g")
+ private String defaultValue;
+
+ public String getAttrType() {
+ return attrType;
+ }
+ public void setAttrType(String attrType) {
+ this.attrType = attrType;
+ valueChanged("attrType");
+ }
+ public String getCategory() {
+ return category;
+ }
+ public void setCategory(String category) {
+ this.category = category;
+ valueChanged("category");
+ }
+ public String getAttrValueResolver() {
+ return attrValueResolver;
+ }
+ public void setAttrValueResolver(String attrValueResolver) {
+ this.attrValueResolver = attrValueResolver;
+ valueChanged("attrValueResolver");
+ }
+ public Boolean getUsedAsTag() {
+ return usedAsTag;
+ }
+ public void setUsedAsTag(Boolean usedAsTag) {
+ this.usedAsTag = usedAsTag;
+ valueChanged("usedAsTag");
+ }
+ public String getAttrDescription() {
+ return attrDescription;
+ }
+ public void setAttrDescription(String attrDescription) {
+ this.attrDescription = attrDescription;
+ valueChanged("attrDescription");
+ }
+ public String getAttrDisplayName() {
+ return attrDisplayName;
+ }
+ public void setAttrDisplayName(String attrDisplayName) {
+ this.attrDisplayName = attrDisplayName;
+ valueChanged("attrDisplayName");
+ }
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+ public void setDefaultValue(String defaultValue) {
+ this.defaultValue = defaultValue;
+ valueChanged("defaultValue");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java
deleted file mode 100644
index c235225..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-/**
- * base fields for all policy definition
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class AbstractPolicyDefinition {
- private String type;
- /**
- * @return type in string
- */
- public String getType() {
- return type;
- }
-
- /**
- * @param type set type value
- */
- public void setType(String type) {
- this.type = type;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java
deleted file mode 100644
index 04f20ff..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import java.io.Serializable;
-
-public class DeduplicatorConfig implements Serializable{
- private static final long serialVersionUID = 1L;
- public int getAlertDedupIntervalMin() {
- return alertDedupIntervalMin;
- }
- public void setAlertDedupIntervalMin(int alertDedupIntervalMin) {
- this.alertDedupIntervalMin = alertDedupIntervalMin;
- }
- public int getEmailDedupIntervalMin() {
- return emailDedupIntervalMin;
- }
- public void setEmailDedupIntervalMin(int emailDedupIntervalMin) {
- this.emailDedupIntervalMin = emailDedupIntervalMin;
- }
- private int alertDedupIntervalMin;
- private int emailDedupIntervalMin;
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java
deleted file mode 100644
index fae24d5..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-public class EmailNotificationConfig extends NotificationConfig{
- private static final long serialVersionUID = 1L;
- private String sender;
- private String recipients;
- private String tplFileName;
- private String subject;
- public String getSubject() {
- return subject;
- }
- public void setSubject(String subject) {
- this.subject = subject;
- }
- public String getRecipients() {
- return recipients;
- }
- public void setRecipients(String recipients) {
- this.recipients = recipients;
- }
- public String getSender() {
- return sender;
- }
- public void setSender(String sender) {
- this.sender = sender;
- }
- public String getTplFileName() {
- return tplFileName;
- }
- public void setTplFileName(String tplFileName) {
- this.tplFileName = tplFileName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java
deleted file mode 100644
index 6903a57..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import java.io.Serializable;
-
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "flavor", visible=true)
-public class NotificationConfig implements Serializable{
- private static final long serialVersionUID = 1L;
- private String id;
- private String flavor;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getFlavor() {
- return flavor;
- }
-
- public void setFlavor(String flavor) {
- this.flavor = flavor;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java
deleted file mode 100644
index c644a31..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import java.io.Serializable;
-
-public class Remediation implements Serializable{
- private static final long serialVersionUID = 1L;
- private String id;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
deleted file mode 100644
index b459130..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.config.DeduplicatorConfig;
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.alert.policy.DynamicPolicyLoader;
-import eagle.alert.policy.PolicyLifecycleMethods;
-import eagle.common.config.EagleConfigConstants;
-import eagle.datastream.Collector;
-import eagle.datastream.JavaStormStreamExecutor2;
-import eagle.datastream.Tuple2;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import eagle.dataproc.core.JsonSerDeserUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods {
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(AlertDeduplicationExecutorBase.class);
- protected Config config;
- protected DEDUP_TYPE dedupType;
-
- private List<String> alertExecutorIdList;
- private volatile CopyOnWriteHashMap<String, DefaultDeduplicator<AlertAPIEntity>> alertDedups;
- private AlertDefinitionDAO dao;
-
- public enum DEDUP_TYPE {
- ENTITY,
- EMAIL
- }
-
- public AlertDeduplicationExecutorBase(List<String> alertExecutorIdList, DEDUP_TYPE dedupType, AlertDefinitionDAO dao){
- this.alertExecutorIdList = alertExecutorIdList;
- this.dedupType = dedupType;
- this.dao = dao;
- }
-
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- public DefaultDeduplicator<AlertAPIEntity> createAlertDedup(AlertDefinitionAPIEntity alertDef) {
- DeduplicatorConfig dedupConfig = null;
- try {
- dedupConfig = JsonSerDeserUtils.deserialize(alertDef.getDedupeDef(), DeduplicatorConfig.class);
- }
- catch (Exception ex) {
- LOG.warn("Initial dedupConfig error, " + ex.getMessage());
- }
-
- if (dedupConfig != null) {
- if (dedupType.equals(DEDUP_TYPE.ENTITY)) {
- return new DefaultDeduplicator<>(dedupConfig.getAlertDedupIntervalMin());
- } else if (dedupType.equals(DEDUP_TYPE.EMAIL)) {
- return new DefaultDeduplicator<>(dedupConfig.getEmailDedupIntervalMin());
- }
- }
-
- return null;
- }
-
- @Override
- public void init() {
- String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
- String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
- Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
- try {
- initialAlertDefs = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
- }
- catch (Exception ex) {
- LOG.error("fail to initialize initialAlertDefs: ", ex);
- throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
- }
- Map<String, DefaultDeduplicator<AlertAPIEntity>> tmpDeduplicators = new HashMap<String, DefaultDeduplicator<AlertAPIEntity>>();
- if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
- LOG.warn("No alert definitions was found for site: "+site+", dataSource: "+dataSource);
- } else {
- for (String alertExecutorId: alertExecutorIdList) {
- if(initialAlertDefs.containsKey(alertExecutorId)){
- for(AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()){
- try {
- DefaultDeduplicator<AlertAPIEntity> deduplicator = createAlertDedup(alertDef);
- if (deduplicator != null)
- tmpDeduplicators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), deduplicator);
- else LOG.warn("The dedup interval is not set, alertDef: " + alertDef);
- }
- catch (Throwable t) {
- LOG.error("Got an exception when initial dedup config, probably dedup config is not set: " + t.getMessage() + "," + alertDef);
- }
- }
- } else {
- LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId));
- }
- }
- }
-
- alertDedups = new CopyOnWriteHashMap<>();
- alertDedups.putAll(tmpDeduplicators);
- DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
- policyLoader.init(initialAlertDefs, dao, config);
- for (String alertExecutorId : alertExecutorIdList) {
- policyLoader.addPolicyChangeListener(alertExecutorId, this);
- }
- }
-
- @Override
- public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector){
- String policyId = (String) input.get(0);
- AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
- DefaultDeduplicator<AlertAPIEntity> dedup;
- synchronized(alertDedups) {
- dedup = alertDedups.get(policyId);
- }
-
- List<AlertAPIEntity> ret = Arrays.asList(alertEntity);
- if (dedup == null) {
- LOG.warn("Dedup config for policyId " + policyId + " is not set or is not a valid config");
- } else {
- if (dedup.getDedupIntervalMin() == -1) {
- LOG.warn("the dedup interval is set as -1, which mean all alerts should be deduped(skipped)");
- return;
- }
- ret = dedup.dedup(ret);
- }
- for (AlertAPIEntity entity : ret) {
- outputCollector.collect(new Tuple2(policyId, entity));
- }
- }
-
- public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
- if(LOG.isDebugEnabled()) LOG.debug("Alert dedup config to be added : " + added);
- for(AlertDefinitionAPIEntity alertDef : added.values()){
- LOG.info("Alert dedup config really added " + alertDef);
- DefaultDeduplicator<AlertAPIEntity> dedup = createAlertDedup(alertDef);
- if (dedup != null) {
- synchronized(alertDedups) {
- alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup);
- }
- }
- }
- }
-
- public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
- LOG.info("Alert dedup config changed : " + changed);
- for(AlertDefinitionAPIEntity alertDef : changed.values()){
- LOG.info("Alert dedup config really changed " + alertDef);
- DefaultDeduplicator<AlertAPIEntity> dedup = createAlertDedup(alertDef);
- if (dedup != null) {
- synchronized(alertDedups) {
- alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup);
- }
- }
- }
- }
-
- public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
- LOG.info("alert dedup config deleted : " + deleted);
- for(AlertDefinitionAPIEntity alertDef : deleted.values()){
- LOG.info("alert dedup config deleted " + alertDef);
- // no cleanup to do, just remove it
- synchronized(alertDedups) {
- alertDedups.remove(alertDef.getTags().get(AlertConstants.POLICY_ID));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
deleted file mode 100644
index 7dd0ddc..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import eagle.alert.dao.AlertDefinitionDAO;
-
-import java.util.List;
-
-public class AlertEmailDeduplicationExecutor extends AlertDeduplicationExecutorBase {
-
- private static final long serialVersionUID = 1L;
-
- public AlertEmailDeduplicationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
- super(alertExecutorIdList, DEDUP_TYPE.EMAIL, dao);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
deleted file mode 100644
index b6050a0..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import eagle.alert.dao.AlertDefinitionDAO;
-
-import java.util.List;
-
-public class AlertEntityDeduplicationExecutor extends AlertDeduplicationExecutorBase {
-
- private static final long serialVersionUID = 1L;
-
- public AlertEntityDeduplicationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
- super(alertExecutorIdList, DEDUP_TYPE.ENTITY, dao);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java
deleted file mode 100644
index 97e58ac..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.time.DateUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-public class DefaultDeduplicator<T extends TaggedLogAPIEntity> implements EntityDeduplicator<T>{
- protected long dedupIntervalMin;
- protected Map<EntityTagsUniq, Long> entites = new HashMap<EntityTagsUniq, Long>();
- public static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
-
- public static enum AlertDeduplicationStatus{
- NEW,
- DUPLICATED,
- IGNORED
- }
-
- public DefaultDeduplicator() {
- this.dedupIntervalMin = 0;
- }
-
- public DefaultDeduplicator(long intervalMin) {
- this.dedupIntervalMin = intervalMin;
- }
-
- public void clearOldCache() {
- List<EntityTagsUniq> removedkeys = new ArrayList<EntityTagsUniq>();
- for (Entry<EntityTagsUniq, Long> entry : entites.entrySet()) {
- EntityTagsUniq entity = entry.getKey();
- if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
- removedkeys.add(entry.getKey());
- }
- }
- for (EntityTagsUniq alertKey : removedkeys) {
- entites.remove(alertKey);
- }
- }
-
- public AlertDeduplicationStatus checkDedup(EntityTagsUniq key){
- long current = key.timestamp;
- if(!entites.containsKey(key)){
- entites.put(key, current);
- return AlertDeduplicationStatus.NEW;
- }
-
- long last = entites.get(key);
- if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE){
- entites.put(key, current);
- return AlertDeduplicationStatus.DUPLICATED;
- }
-
- return AlertDeduplicationStatus.IGNORED;
- }
-
- public List<T> dedup(List<T> list) {
- clearOldCache();
- List<T> dedupList = new ArrayList<T>();
- int totalCount = list.size();
- int dedupedCount = 0;
- for(T entity: list) {
- if (entity.getTags() == null) {
- if(LOG.isDebugEnabled()) LOG.debug("Tags is null, don't know how to deduplicate, do nothing");
- } else {
- AlertDeduplicationStatus status = checkDedup(new EntityTagsUniq(entity.getTags(), entity.getTimestamp()));
- if (!status.equals(AlertDeduplicationStatus.IGNORED)) {
- dedupList.add(entity);
- } else {
- dedupedCount++;
- if (LOG.isDebugEnabled())
- LOG.debug(String.format("Entity is skipped because it's duplicated: " + entity.toString()));
- }
- }
- }
-
- if(dedupedCount>0){
- LOG.info(String.format("Skipped %s of %s alerts because they are duplicated",dedupedCount,totalCount));
- }else if(LOG.isDebugEnabled()){
- LOG.debug(String.format("Skipped %s of %s duplicated alerts",dedupedCount,totalCount));
- }
-
- return dedupList;
- }
-
- public EntityDeduplicator<T> setDedupIntervalMin(long dedupIntervalMin) {
- this.dedupIntervalMin = dedupIntervalMin;
- return this;
- }
-
- public long getDedupIntervalMin() {
- return dedupIntervalMin;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java
deleted file mode 100644
index f24d31a..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import java.util.List;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-/**
- * Dedup Eagle entities.
- *
- * @param <T> Eagle entity
- */
-public interface EntityDeduplicator<T extends TaggedLogAPIEntity> {
-
- EntityDeduplicator<T> setDedupIntervalMin(long intervalMin);
-
- long getDedupIntervalMin();
-
- List<T> dedup(List<T> list);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java
deleted file mode 100644
index b0e5dcf..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package eagle.alert.dedup;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @since Mar 19, 2015
- */
-public class EntityTagsUniq {
- public Map<String, String> tags;
- public Long timestamp; // entity's timestamp
- public long createdTime; // entityTagsUniq's created time, for cache removal;
-
- private static final Logger LOG = LoggerFactory.getLogger(EntityTagsUniq.class);
-
- public EntityTagsUniq(Map<String, String> tags, long timestamp) {
- this.tags = new HashMap<String, String>(tags);
- this.timestamp = timestamp;
- this.createdTime = System.currentTimeMillis();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof EntityTagsUniq) {
- EntityTagsUniq au = (EntityTagsUniq) obj;
- if (tags.size() != au.tags.size()) return false;
- for (Entry<String, String> keyValue : au.tags.entrySet()) {
- boolean keyExist = tags.containsKey(keyValue.getKey());
- // sanity check
- if (tags.get(keyValue.getKey()) == null || keyValue.getValue() == null) {
- return true;
- }
- if ( !keyExist || !tags.get(keyValue.getKey()).equals(keyValue.getValue())) {
- return false;
- }
- }
- return true;
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- int hashCode = 0;
- for (Map.Entry<String,String> entry : tags.entrySet()) {
- if(entry.getValue() == null) {
- LOG.warn("Tag value for key ["+entry.getKey()+"] is null, skipped for hash code");
- }else {
- try {
- hashCode ^= entry.getValue().hashCode();
- } catch (Throwable t) {
- LOG.info("Got exception because of entry: " + entry, t);
- }
- }
- }
- return hashCode;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java
deleted file mode 100644
index 58ae733..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package eagle.alert.notification;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.common.AlertEmailSender;
-import eagle.alert.email.AlertEmailComponent;
-import eagle.alert.email.AlertEmailContext;
-import eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.ConfigObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AlertEmailGenerator{
- private String tplFile;
- private String sender;
- private String recipients;
- private String subject;
- private ConfigObject eagleProps;
-
- private ThreadPoolExecutor executorPool;
-
- private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
-
- private final static long MAX_TIMEOUT_MS =60000;
-
- public void sendAlertEmail(AlertAPIEntity entity) {
- sendAlertEmail(entity, recipients, null);
- }
-
- public void sendAlertEmail(AlertAPIEntity entity, String recipients) {
- sendAlertEmail(entity, recipients, null);
- }
-
- public void sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) {
- AlertEmailContext email = new AlertEmailContext();
-
- AlertEmailComponent component = new AlertEmailComponent();
- component.setAlertContext(entity.getAlertContext());
- List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
- components.add(component);
- email.setComponents(components);
- if (entity.getAlertContext().getProperty(AlertConstants.SUBJECT) != null) {
- email.setSubject(entity.getAlertContext().getProperty(AlertConstants.SUBJECT));
- }
- else email.setSubject(subject);
- email.setVelocityTplFile(tplFile);
- email.setRecipients(recipients);
- email.setCc(cc);
- email.setSender(sender);
-
- /** asynchronized email sending */
- @SuppressWarnings("rawtypes")
- AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
-
- if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
-
- LOG.info("Sending email in asynchronous to: "+recipients+", cc: "+cc);
- Future future = this.executorPool.submit(thread);
- try {
- future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- LOG.info(String.format("Successfully send email to %s", recipients));
- } catch (InterruptedException | ExecutionException e) {
- LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e);
- } catch (TimeoutException e) {
- LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
- }
- }
-
- public String getTplFile() {
- return tplFile;
- }
-
- public void setTplFile(String tplFile) {
- this.tplFile = tplFile;
- }
-
- public String getSender() {
- return sender;
- }
-
- public void setSender(String sender) {
- this.sender = sender;
- }
-
- public String getRecipients() {
- return recipients;
- }
-
- public void setRecipients(String recipients) {
- this.recipients = recipients;
- }
-
- public String getSubject() {
- return subject;
- }
-
- public void setSubject(String subject) {
- this.subject = subject;
- }
-
- public ConfigObject getEagleProps() {
- return eagleProps;
- }
-
- public void setEagleProps(ConfigObject eagleProps) {
- this.eagleProps = eagleProps;
- }
-
- public void setExecutorPool(ThreadPoolExecutor executorPool) {
- this.executorPool = executorPool;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java
deleted file mode 100644
index 10be162..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.notification;
-
-import java.util.concurrent.ThreadPoolExecutor;
-
-import com.typesafe.config.ConfigObject;
-
-public class AlertEmailGeneratorBuilder {
- private AlertEmailGenerator generator;
- private AlertEmailGeneratorBuilder(){
- generator = new AlertEmailGenerator();
- }
- public static AlertEmailGeneratorBuilder newBuilder(){
- return new AlertEmailGeneratorBuilder();
- }
- public AlertEmailGeneratorBuilder withSubject(String subject){
- generator.setSubject(subject);
- return this;
- }
- public AlertEmailGeneratorBuilder withSender(String sender){
- generator.setSender(sender);
- return this;
- }
- public AlertEmailGeneratorBuilder withRecipients(String recipients){
- generator.setRecipients(recipients);
- return this;
- }
- public AlertEmailGeneratorBuilder withTplFile(String tplFile){
- generator.setTplFile(tplFile);
- return this;
- }
- public AlertEmailGeneratorBuilder withEagleProps(ConfigObject eagleProps) {
- generator.setEagleProps(eagleProps);
- return this;
- }
- public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
- generator.setExecutorPool(threadPoolExecutor);
- return this;
- }
-
- public AlertEmailGenerator build(){
- return this.generator;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java
deleted file mode 100644
index 6e5a3d7..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.notification;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import eagle.alert.config.EmailNotificationConfig;
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.alert.policy.DynamicPolicyLoader;
-import eagle.alert.policy.PolicyLifecycleMethods;
-import eagle.dataproc.core.JsonSerDeserUtils;
-import eagle.datastream.Collector;
-import eagle.datastream.JavaStormStreamExecutor1;
-import eagle.datastream.Tuple1;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * notify alert by email, sms or other means
- * currently we only implements email notification
- */
-public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods {
-
- private static final long serialVersionUID = 1690354365435407034L;
- private static final Logger LOG = LoggerFactory.getLogger(AlertNotificationExecutor.class);
- private Config config;
-
- private List<String> alertExecutorIdList;
- private volatile CopyOnWriteHashMap<String, List<AlertEmailGenerator>> alertEmailGeneratorsMap;
- private AlertDefinitionDAO dao;
-
- private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
- private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
- private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
-
- private transient ThreadPoolExecutor executorPool;
-
- public AlertNotificationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
- this.alertExecutorIdList = alertExecutorIdList;
- this.dao = dao;
- }
-
- public List<AlertEmailGenerator> createAlertEmailGenerator(AlertDefinitionAPIEntity alertDef) {
- Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email"));
- EmailNotificationConfig[] emailConfigs = new EmailNotificationConfig[0];
- try {
- emailConfigs = JsonSerDeserUtils.deserialize(alertDef.getNotificationDef(), EmailNotificationConfig[].class, Arrays.asList(module));
- }
- catch (Exception ex) {
- LOG.warn("Initial emailConfig error, wrong format or it's error " + ex.getMessage());
- }
- List<AlertEmailGenerator> gens = new ArrayList<AlertEmailGenerator>();
- if (emailConfigs == null) {
- return gens;
- }
- for(EmailNotificationConfig emailConfig : emailConfigs) {
- String tplFileName = emailConfig.getTplFileName();
- if (tplFileName == null || tplFileName.equals("")) { // empty tplFileName, use default tpl file name
- tplFileName = "ALERT_DEFAULT.vm";
- }
- AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
- withEagleProps(config.getObject("eagleProps")).
- withSubject(emailConfig.getSubject()).
- withSender(emailConfig.getSender()).
- withRecipients(emailConfig.getRecipients()).
- withTplFile(tplFileName).
- withExecutorPool(executorPool).
- build();
- gens.add(gen);
- }
- return gens;
- }
-
- /**
- * 1. register both file and database configuration
- * 2. create email generator from configuration
- */
- @Override
- public void init(){
- executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-
- Map<String, List<AlertEmailGenerator>> tmpEmailGenerators = new HashMap<String, List<AlertEmailGenerator>> ();
-
- String site = config.getString("eagleProps.site");
- String dataSource = config.getString("eagleProps.dataSource");
- Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
- try {
- initialAlertDefs = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
- }
- catch (Exception ex) {
- LOG.error("fail to initialize initialAlertDefs: ", ex);
- throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
- }
-
- if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
- LOG.warn("No alert definitions found for site: "+site+", dataSource: "+dataSource);
- }
- else {
- for (String alertExecutorId: alertExecutorIdList) {
- if(initialAlertDefs.containsKey(alertExecutorId)) {
- for (AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()) {
- List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
- tmpEmailGenerators.put(alertDef.getTags().get("policyId"), gens);
- }
- }else{
- LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId));
- }
- }
- }
-
- alertEmailGeneratorsMap = new CopyOnWriteHashMap<String, List<AlertEmailGenerator>>();
- alertEmailGeneratorsMap.putAll(tmpEmailGenerators);
- DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
- policyLoader.init(initialAlertDefs, dao, config);
- for (String alertExecutorId : alertExecutorIdList) {
- policyLoader.addPolicyChangeListener(alertExecutorId, this);
- }
- }
-
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- @Override
- public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
- String policyId = (String) input.get(0);
- AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
- processAlerts(policyId, Arrays.asList(alertEntity));
- }
-
- //TODO: add a thread pool for email sender?
- private void processAlerts(String policyId, List<AlertAPIEntity> list) {
- List<AlertEmailGenerator> generators;
- synchronized(alertEmailGeneratorsMap) {
- generators = alertEmailGeneratorsMap.get(policyId);
- }
- if (generators == null) {
- LOG.warn("Notification config of policyId " + policyId + " has been deleted");
- return;
- }
- for (AlertAPIEntity entity : list) {
- for(AlertEmailGenerator generator : generators){
- generator.sendAlertEmail(entity);
- }
- }
- }
-
- @Override
- public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
- if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
- for(AlertDefinitionAPIEntity alertDef : added.values()){
- LOG.info("alert notification config really changed " + alertDef);
- List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
- synchronized(alertEmailGeneratorsMap) {
- alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
- }
- }
- }
-
- @Override
- public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
- if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be added : " + changed);
- for(AlertDefinitionAPIEntity alertDef : changed.values()){
- LOG.info("alert notification config really added " + alertDef);
- List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
- synchronized(alertEmailGeneratorsMap) {
- alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
- }
- }
- }
-
- @Override
- public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
- if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be deleted : " + deleted);
- for(AlertDefinitionAPIEntity alertDef : deleted.values()){
- LOG.info("alert notification config really deleted " + alertDef);
- synchronized(alertEmailGeneratorsMap) {
- alertEmailGeneratorsMap.remove(alertDef.getTags().get("policyId"));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java
deleted file mode 100644
index fa5dc82..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package eagle.alert.notification;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.common.EagleBase64Wrapper;
-import eagle.common.config.EagleConfigConstants;
-import eagle.log.entity.HBaseInternalLogHelper;
-import eagle.log.entity.InternalLog;
-import eagle.log.entity.RowkeyBuilder;
-import eagle.log.entity.meta.EntityDefinitionManager;
-import org.mortbay.util.UrlEncoded;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class UrlBuilder {
-
- private static final Logger logger = LoggerFactory.getLogger(UrlBuilder.class);
-
- public static String getEncodedRowkey(AlertAPIEntity entity) throws Exception {
- InternalLog log = HBaseInternalLogHelper.convertToInternalLog(entity, EntityDefinitionManager.getEntityDefinitionByEntityClass(entity.getClass()));
- return EagleBase64Wrapper.encodeByteArray2URLSafeString(RowkeyBuilder.buildRowkey(log));
- }
-
- public static String buildAlertDetailUrl(String host, int port, AlertAPIEntity entity) {
- String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/alertDetail/";
- try {
- return baseUrl + UrlEncoded.encodeString(getEncodedRowkey(entity));
- }
- catch (Exception ex) {
- logger.error("Fail to populate encodedRowkey for alert Entity" + entity.toString());
- return "N/A";
- }
- }
-
- public static String buiildPolicyDetailUrl(String host, int port, Map<String, String> tags) {
- String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/policyDetail?";
- String format = "policy=%s&site=%s&executor=%s";
- String policy = tags.get(AlertConstants.POLICY_ID);
- String site = tags.get(EagleConfigConstants.SITE);
- String alertExecutorID = tags.get(AlertConstants.ALERT_EXECUTOR_ID);
- if (policy != null && site != null && alertExecutorID != null) {
- return baseUrl + String.format(format, policy, site, alertExecutorID);
- }
- return "N/A";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java
deleted file mode 100644
index 543405d..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.persist;
-
-import eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.Config;
-import eagle.common.config.EagleConfigConstants;
-import eagle.datastream.Collector;
-import eagle.datastream.JavaStormStreamExecutor1;
-import eagle.datastream.Tuple1;
-
-import java.util.Arrays;
-
-public class AlertPersistExecutor extends JavaStormStreamExecutor1<String> {
-
- private static final long serialVersionUID = 1L;
- private Config config;
- private EaglePersist persist;
-
- public AlertPersistExecutor(){
- }
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- @Override
- public void init() {
- String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
- String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME)
- ? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
- String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)
- ? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
- this.persist = new EaglePersist(host, port, username, password);
- }
-
- @Override
- public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
- persist.doPersist(Arrays.asList((AlertAPIEntity)(input.get(1))));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java
deleted file mode 100644
index b5820c4..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package eagle.alert.persist;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.log.entity.GenericServiceAPIResponseEntity;
-import eagle.service.client.IEagleServiceClient;
-import eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class EaglePersist {
-
- private static Logger LOG = LoggerFactory.getLogger(EaglePersist.class);
- private String eagleServiceHost;
- private int eagleServicePort;
- private String username;
- private String password;
-
- public EaglePersist(String eagleServiceHost, int eagleServicePort) {
- this(eagleServiceHost, eagleServicePort, null, null);
- }
-
- public EaglePersist(String eagleServiceHost, int eagleServicePort, String username, String password) {
- this.eagleServiceHost = eagleServiceHost;
- this.eagleServicePort = eagleServicePort;
- this.username = username;
- this.password = password;
- }
-
- public boolean doPersist(List<? extends TaggedLogAPIEntity> list) {
- if (list.isEmpty()) return false;
- LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
- try {
- IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
- GenericServiceAPIResponseEntity<String> response = client.create(list);
- client.close();
- if (response.isSuccess()) {
- LOG.info("Successfully create entities " + list.toString());
- return true;
- }
- else {
- LOG.error("Fail to create entities");
- return false;
- }
- }
- catch (Exception ex) {
- LOG.error("Got an exception in persisting entities" + ex.getMessage(), ex);
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java
deleted file mode 100644
index ef76c2f..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-
-public class DefaultPolicyPartitioner implements PolicyPartitioner{
- @Override
- public int partition(int numTotalPartitions, String policyType,
- String policyId) {
- final int prime = 31;
- int result = 1;
- result = result * prime + policyType.hashCode();
- result = result < 0 ? result*-1 : result;
- result = result * prime + policyId.hashCode();
- result = result < 0 ? result*-1 : result;
- return result % numTotalPartitions;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java
deleted file mode 100644
index ba76a15..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import eagle.common.config.EagleConfigConstants;
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import com.netflix.config.AbstractPollingScheduler;
-import com.netflix.config.ConcurrentCompositeConfiguration;
-import com.netflix.config.DynamicConfiguration;
-import com.netflix.config.FixedDelayPollingScheduler;
-import com.netflix.config.PollListener;
-import com.netflix.config.PollResult;
-import com.netflix.config.PolledConfigurationSource;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-
-public class DynamicPolicyLoader {
- private static final Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
-
- private final int defaultInitialDelayMillis = 30*1000;
- private final int defaultDelayMillis = 60*1000;
- private final boolean defaultIgnoreDeleteFromSource = true;
- private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>> policyChangeListeners = new CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>>();
- private static DynamicPolicyLoader instance = new DynamicPolicyLoader();
- private volatile boolean initialized = false;
-
- public void addPolicyChangeListener(String alertExecutorId, PolicyLifecycleMethods alertExecutor){
- synchronized(policyChangeListeners) {
- if (policyChangeListeners.get(alertExecutorId) == null) {
- policyChangeListeners.put(alertExecutorId, new ArrayList<PolicyLifecycleMethods>());
- }
- policyChangeListeners.get(alertExecutorId).add(alertExecutor);
- }
- }
-
- public static DynamicPolicyLoader getInstance(){
- return instance;
- }
-
- /**
- * singleton with init would be good for unit test as well, and it ensures that
- * initialization happens only once before you use it.
- * @param config
- * @param dao
- */
- public void init(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs,
- AlertDefinitionDAO dao, Config config){
- if(!initialized){
- synchronized(this){
- if(!initialized){
- internalInit(initialAlertDefs, dao, config);
- initialized = true;
- }
- }
- }
- }
-
- /**
- * map from alertExecutorId+partitionId to AlertExecutor which implements PolicyLifecycleMethods
- * @param initialAlertDefs
- * @param dao
- * @param config
- */
- private void internalInit(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs,
- AlertDefinitionDAO dao, Config config){
- if(!config.getBoolean("dynamicConfigSource.enabled")) {
- return;
- }
- AbstractPollingScheduler scheduler = new FixedDelayPollingScheduler(
- config.getInt("dynamicConfigSource.initDelayMillis"),
- config.getInt("dynamicConfigSource.delayMillis"),
- false
- );
-
- scheduler.addPollListener(new PollListener(){
- @Override
- public void handleEvent(EventType eventType, PollResult lastResult,
- Throwable exception) {
- if (lastResult == null) {
- LOG.error("The lastResult is null, something must be wrong, probably the eagle service is dead!");
- throw new RuntimeException("The lastResult is null, probably the eagle service is dead! ", exception);
- }
- Map<String, Object> added = lastResult.getAdded();
- Map<String, Object> changed = lastResult.getChanged();
- Map<String, Object> deleted = lastResult.getDeleted();
- for(Map.Entry<String, List<PolicyLifecycleMethods>> entry : policyChangeListeners.entrySet()){
- String alertExecutorId = entry.getKey();
- for (PolicyLifecycleMethods policyLifecycleMethod : entry.getValue()) {
- Map<String, AlertDefinitionAPIEntity> addedPolicies = (Map<String, AlertDefinitionAPIEntity>)added.get(trimPartitionNum(alertExecutorId));
- if(addedPolicies != null && addedPolicies.size() > 0){
- policyLifecycleMethod.onPolicyCreated(addedPolicies);
- }
- Map<String, AlertDefinitionAPIEntity> changedPolicies = (Map<String, AlertDefinitionAPIEntity>)changed.get(trimPartitionNum(alertExecutorId));
- if(changedPolicies != null && changedPolicies.size() > 0){
- policyLifecycleMethod.onPolicyChanged(changedPolicies);
- }
- Map<String, AlertDefinitionAPIEntity> deletedPolicies = (Map<String, AlertDefinitionAPIEntity>)deleted.get(trimPartitionNum(alertExecutorId));
- if(deletedPolicies != null && deletedPolicies.size() > 0){
- policyLifecycleMethod.onPolicyDeleted(deletedPolicies);
- }
- }
- }
- }
- private String trimPartitionNum(String alertExecutorId){
- int i = alertExecutorId.lastIndexOf('_');
- if(i != -1){
- return alertExecutorId.substring(0, i);
- }
- return alertExecutorId;
- }
- });
-
- ConcurrentCompositeConfiguration finalConfig = new ConcurrentCompositeConfiguration();
-
- PolledConfigurationSource source = new DynamicPolicySource(initialAlertDefs, dao, config);
-
- try{
- DynamicConfiguration dbSourcedConfiguration = new DynamicConfiguration(source, scheduler);
- finalConfig.addConfiguration(dbSourcedConfiguration);
- }catch(Exception ex){
- LOG.warn("Fail loading from DB, continue without DB sourced configuration", ex);
- }
- }
-
- public static class DynamicPolicySource implements PolledConfigurationSource{
- private static Logger LOG = LoggerFactory.getLogger(DynamicPolicySource.class);
- private Config config;
- private AlertDefinitionDAO dao;
- /**
- * mapping from alertExecutorId to list of policies
- */
- private Map<String, Map<String, AlertDefinitionAPIEntity>> cachedAlertDefs;
-
- public DynamicPolicySource(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs, AlertDefinitionDAO dao, Config config){
- this.cachedAlertDefs = initialAlertDefs;
- this.dao = dao;
- this.config = config;
- }
-
- public PollResult poll(boolean initial, Object checkPoint) throws Exception {
- LOG.info("Poll policy from eagle service " + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST) +
- ":" + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT) );
- Map<String, Map<String, AlertDefinitionAPIEntity>> newAlertDefs =
- dao.findActiveAlertDefsGroupbyAlertExecutorId(config.getString("eagleProps.site"),
- config.getString("eagleProps.dataSource"));
-
- // compare runtime alertDefs with cachedAlertDefs and figure out what are added/deleted/updated
- Map<String, Object> added = new HashMap<String, Object>();
- Map<String, Object> changed = new HashMap<String, Object>();
- Map<String, Object> deleted = new HashMap<String, Object>();
-
- Set<String> newAlertExecutorIds = newAlertDefs.keySet();
- Set<String> cachedAlertExecutorIds = cachedAlertDefs.keySet();
-
- // dynamically adding new alert executor is not supported, because alert executor is pre-built while program starts up
- Collection<String> addedAlertExecutorIds = CollectionUtils.subtract(newAlertExecutorIds, cachedAlertExecutorIds);
- if(addedAlertExecutorIds != null && addedAlertExecutorIds.size() > 0){
- LOG.warn("New alertExecutorIds are found : " + addedAlertExecutorIds);
- }
-
- // if one alert executor is missing, it means all policy under that alert executor should be removed
- Collection<String> deletedAlertExecutorIds = CollectionUtils.subtract(cachedAlertExecutorIds, newAlertExecutorIds);
- if(deletedAlertExecutorIds != null && deletedAlertExecutorIds.size() > 0){
- LOG.warn("Some alertExecutorIds are deleted : " + deletedAlertExecutorIds);
- for(String deletedAlertExecutorId : deletedAlertExecutorIds){
- deleted.put(deletedAlertExecutorId, cachedAlertDefs.get(deletedAlertExecutorId));
- }
- }
-
- // we need calculate added/updated/deleted policy for all executors which are not deleted
-// Collection<String> updatedAlertExecutorIds = CollectionUtils.intersection(newAlertExecutorIds, cachedAlertExecutorIds);
- Collection<String> updatedAlertExecutorIds = newAlertExecutorIds;
- for(String updatedAlertExecutorId : updatedAlertExecutorIds){
- Map<String, AlertDefinitionAPIEntity> newPolicies = newAlertDefs.get(updatedAlertExecutorId);
- Map<String, AlertDefinitionAPIEntity> cachedPolicies = cachedAlertDefs.get(updatedAlertExecutorId);
- PolicyComparator.compare(updatedAlertExecutorId, newPolicies, cachedPolicies, added, changed, deleted);
- }
-
- cachedAlertDefs = newAlertDefs;
- return PollResult.createIncremental(added, changed, deleted, new Date().getTime());
- }
- }
-
- public static class PolicyComparator{
- public static void compare(String alertExecutorId, Map<String, AlertDefinitionAPIEntity> newPolicies, Map<String, AlertDefinitionAPIEntity> cachedPolicies,
- Map<String, Object> added, Map<String, Object> changed, Map<String, Object> deleted){
- Set<String> newPolicyIds = newPolicies.keySet();
- Set<String> cachedPolicyIds = cachedPolicies != null ? cachedPolicies.keySet() : new HashSet<String>();
- Collection<String> addedPolicyIds = CollectionUtils.subtract(newPolicyIds, cachedPolicyIds);
- Collection<String> deletedPolicyIds = CollectionUtils.subtract(cachedPolicyIds, newPolicyIds);
- Collection<String> changedPolicyIds = CollectionUtils.intersection(cachedPolicyIds, newPolicyIds);
- if(addedPolicyIds != null && addedPolicyIds.size() > 0){
- Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
- for(String addedPolicyId : addedPolicyIds){
- tmp.put(addedPolicyId, newPolicies.get(addedPolicyId));
- }
- added.put(alertExecutorId, tmp);
- }
- if(deletedPolicyIds != null && deletedPolicyIds.size() > 0){
- Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
- for(String deletedPolicyId : deletedPolicyIds){
- tmp.put(deletedPolicyId, cachedPolicies.get(deletedPolicyId));
- }
- deleted.put(alertExecutorId, tmp);
- }
- if(changedPolicyIds != null && changedPolicyIds.size() > 0){
- Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
- for(String changedPolicyId : changedPolicyIds){
- // check if policy is really changed
- if(!newPolicies.get(changedPolicyId).equals(cachedPolicies.get(changedPolicyId))){
- tmp.put(changedPolicyId, newPolicies.get(changedPolicyId));
- }
- }
- changed.put(alertExecutorId, tmp);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java
deleted file mode 100644
index c3b53c2..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-
-public class PartitionUtils {
-
- public static boolean accept(AlertDefinitionAPIEntity alertDef, PolicyPartitioner partitioner, int numPartitions, int partitionSeq){
- int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID));
- if(targetPartitionSeq == partitionSeq)
- return true;
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java
deleted file mode 100644
index c209b97..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.List;
-import java.util.Map;
-
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.dataproc.core.ValuesArray;
-
-public interface PolicyEvaluator {
- /**
- * take input and evaluate expression
- * input has 3 fields, first is siddhiAlertContext, second one is streamName, the third is map of attribute name/value
- * @param input
- * @throws Exception
- */
- public void evaluate(ValuesArray input) throws Exception;
-
- /**
- * notify policy evaluator that policy is updated
- */
- public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef);
-
- /**
- * notify policy evaluator that policy is deleted, here is cleanup work for this policy evaluator
- */
- public void onPolicyDelete();
-
- /**
- * get additional context
- */
- public Map<String, String> getAdditionalContext();
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
deleted file mode 100644
index a022f98..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.List;
-
-import com.fasterxml.jackson.databind.Module;
-
-/**
- * to provide extensibility, we need a clear differentiation between framework job and provider logic
- * policy evaluator framework:
- * - connect to eagle data source
- * - read all policy definitions
- * - compare with cached policy definitions
- * - figure out if policy is created, deleted or updated
- * - if policy is created, then invoke onPolicyCreated
- * - if policy is deleted, then invoke onPolicyDeleted
- * - if policy is updated, then invoke onPolicyUpdated
- * - for policy update, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
- * - specify # of executors for this alert executor id
- * - dynamically balance # of policies evaluated by each alert executor
- * - use zookeeper to balance. eaglePolicies/${alertExecutorId}/${alertExecutorInstanceId} => list of policies
- *
- * policy evaluator business features:
- * - register mapping between policy type and PolicyEvaluator
- * - create evaluator engine runtime when configuration is changed
- *
- */
-public interface PolicyEvaluatorServiceProvider {
- String getPolicyType();
- Class<? extends PolicyEvaluator> getPolicyEvaluator();
- List<Module> getBindingModules();
-}