You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:55 UTC

[48/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
new file mode 100644
index 0000000..f0f81c5
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertStream")
+@ColumnFamily("f")
+@Prefix("alertStream")
+@Service(AlertConstants.ALERT_STREAM_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "streamName"})
+public class AlertStreamEntity extends TaggedLogAPIEntity{
+	@Column("a")
+	private String desc;	
+
+	public String getDesc() {
+		return desc;
+	}
+	public void setDesc(String desc) {
+		this.desc = desc;
+		valueChanged("desc");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
new file mode 100644
index 0000000..76b6097
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+/**
+ * ddl to create streammetadata table
+ * 
+ * create 'alertStreamSchema', {NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => '1', COMPRESSION => 'SNAPPY'}
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertStreamSchema")
+@ColumnFamily("f")
+@Prefix("alertStreamSchema")
+@Service(AlertConstants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "streamName", "attrName"})
+public class AlertStreamSchemaEntity extends TaggedLogAPIEntity{
+	@Column("a")
+	private String attrType;
+	@Column("b")
+	private String category;
+	@Column("c")
+	private String attrValueResolver;
+	/* all tags form the key for alert de-duplication */
+	@Column("d")
+	private Boolean usedAsTag;
+	@Column("e")
+	private String attrDescription;
+	@Column("f")
+	private String attrDisplayName;	
+	@Column("g")
+	private String defaultValue;
+
+	public String getAttrType() {
+		return attrType;
+	}
+	public void setAttrType(String attrType) {
+		this.attrType = attrType;
+		valueChanged("attrType");
+	}
+	public String getCategory() {
+		return category;
+	}
+	public void setCategory(String category) {
+		this.category = category;
+		valueChanged("category");
+	}
+	public String getAttrValueResolver() {
+		return attrValueResolver;
+	}
+	public void setAttrValueResolver(String attrValueResolver) {
+		this.attrValueResolver = attrValueResolver;
+		valueChanged("attrValueResolver");
+	}
+	public Boolean getUsedAsTag() {
+		return usedAsTag;
+	}
+	public void setUsedAsTag(Boolean usedAsTag) {
+		this.usedAsTag = usedAsTag;
+		valueChanged("usedAsTag");
+	}
+	public String getAttrDescription() {
+		return attrDescription;
+	}
+	public void setAttrDescription(String attrDescription) {
+		this.attrDescription = attrDescription;
+		valueChanged("attrDescription");
+	}
+	public String getAttrDisplayName() {
+		return attrDisplayName;
+	}
+	public void setAttrDisplayName(String attrDisplayName) {
+		this.attrDisplayName = attrDisplayName;
+		valueChanged("attrDisplayName");
+	}
+	public String getDefaultValue() {
+		return defaultValue;
+	}
+	public void setDefaultValue(String defaultValue) {
+		this.defaultValue = defaultValue;
+		valueChanged("defaultValue");
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java
deleted file mode 100644
index c235225..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-/**
- * base fields for all policy definition
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class AbstractPolicyDefinition {
-	private String type;
-    /**
-     * @return type in string
-     */
-	public String getType() {
-		return type;
-	}
-
-    /**
-     * @param type set type value
-     */
-	public void setType(String type) {
-		this.type = type;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java
deleted file mode 100644
index 04f20ff..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import java.io.Serializable;
-
-public class DeduplicatorConfig implements Serializable{
-	private static final long serialVersionUID = 1L;
-	public int getAlertDedupIntervalMin() {
-		return alertDedupIntervalMin;
-	}
-	public void setAlertDedupIntervalMin(int alertDedupIntervalMin) {
-		this.alertDedupIntervalMin = alertDedupIntervalMin;
-	}
-	public int getEmailDedupIntervalMin() {
-		return emailDedupIntervalMin;
-	}
-	public void setEmailDedupIntervalMin(int emailDedupIntervalMin) {
-		this.emailDedupIntervalMin = emailDedupIntervalMin;
-	}
-	private int alertDedupIntervalMin;
-	private int emailDedupIntervalMin;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java
deleted file mode 100644
index fae24d5..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-public class EmailNotificationConfig extends NotificationConfig{
-	private static final long serialVersionUID = 1L;
-	private String sender;
-	private String recipients;
-	private String tplFileName;
-	private String subject;
-	public String getSubject() {
-		return subject;
-	}
-	public void setSubject(String subject) {
-		this.subject = subject;
-	}
-	public String getRecipients() {
-		return recipients;
-	}
-	public void setRecipients(String recipients) {
-		this.recipients = recipients;
-	}
-	public String getSender() {
-		return sender;
-	}
-	public void setSender(String sender) {
-		this.sender = sender;
-	}
-	public String getTplFileName() {
-		return tplFileName;
-	}
-	public void setTplFileName(String tplFileName) {
-		this.tplFileName = tplFileName;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java
deleted file mode 100644
index 6903a57..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import java.io.Serializable;
-
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "flavor", visible=true)
-public class NotificationConfig implements Serializable{
-	private static final long serialVersionUID = 1L;
-	private String id;
-	private String flavor;
-
-	public String getId() {
-		return id;
-	}
-
-	public void setId(String id) {
-		this.id = id;
-	}
-
-	public String getFlavor() {
-		return flavor;
-	}
-
-	public void setFlavor(String flavor) {
-		this.flavor = flavor;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java
deleted file mode 100644
index c644a31..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import java.io.Serializable;
-
-public class Remediation implements Serializable{
-	private static final long serialVersionUID = 1L;
-	private String id;
-
-	public String getId() {
-		return id;
-	}
-
-	public void setId(String id) {
-		this.id = id;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
deleted file mode 100644
index b459130..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.config.DeduplicatorConfig;
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.alert.policy.DynamicPolicyLoader;
-import eagle.alert.policy.PolicyLifecycleMethods;
-import eagle.common.config.EagleConfigConstants;
-import eagle.datastream.Collector;
-import eagle.datastream.JavaStormStreamExecutor2;
-import eagle.datastream.Tuple2;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import eagle.dataproc.core.JsonSerDeserUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods {
-	private static final long serialVersionUID = 1L;
-	private static final Logger LOG = LoggerFactory.getLogger(AlertDeduplicationExecutorBase.class);
-	protected Config config;
-	protected DEDUP_TYPE dedupType;
-
-	private List<String> alertExecutorIdList;
-	private volatile CopyOnWriteHashMap<String, DefaultDeduplicator<AlertAPIEntity>> alertDedups;
-	private AlertDefinitionDAO dao;
-
-	public enum DEDUP_TYPE {
-		ENTITY,
-		EMAIL
-	}
-
-	public AlertDeduplicationExecutorBase(List<String> alertExecutorIdList, DEDUP_TYPE dedupType, AlertDefinitionDAO dao){
-		this.alertExecutorIdList = alertExecutorIdList;
-		this.dedupType = dedupType;
-		this.dao = dao;
-	}
-	
-	@Override
-	public void prepareConfig(Config config) {
-		this.config = config;
-	}
-	
-	public DefaultDeduplicator<AlertAPIEntity> createAlertDedup(AlertDefinitionAPIEntity alertDef) {
-		DeduplicatorConfig dedupConfig = null;
-		try {
-			dedupConfig = JsonSerDeserUtils.deserialize(alertDef.getDedupeDef(), DeduplicatorConfig.class);
-		}
-		catch (Exception ex) {
-			LOG.warn("Initial dedupConfig error, " + ex.getMessage());
-		}
-
-        if (dedupConfig != null) {
-			if (dedupType.equals(DEDUP_TYPE.ENTITY)) {
-				return new DefaultDeduplicator<>(dedupConfig.getAlertDedupIntervalMin());
-			} else if (dedupType.equals(DEDUP_TYPE.EMAIL)) {
-				return new DefaultDeduplicator<>(dedupConfig.getEmailDedupIntervalMin());
-			}
-		}
-
-		return null;
-	}
-	
-	@Override
-	public void init() {		
-        String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
-        String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
-	    Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;	    	    
-	    try {
-	 		initialAlertDefs = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
-	    }
-	    catch (Exception ex) {
- 			LOG.error("fail to initialize initialAlertDefs: ", ex);
-	        throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
-        }
-	    Map<String, DefaultDeduplicator<AlertAPIEntity>> tmpDeduplicators = new HashMap<String, DefaultDeduplicator<AlertAPIEntity>>();
-        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
-            LOG.warn("No alert definitions was found for site: "+site+", dataSource: "+dataSource);
-        } else {
-		    for (String alertExecutorId: alertExecutorIdList) {
-			    if(initialAlertDefs.containsKey(alertExecutorId)){
-                    for(AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()){
-                       try {
-                          DefaultDeduplicator<AlertAPIEntity> deduplicator = createAlertDedup(alertDef);
-                          if (deduplicator != null)
-                              tmpDeduplicators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), deduplicator);
-                          else LOG.warn("The dedup interval is not set, alertDef: " + alertDef);
-                        }
-                        catch (Throwable t) {
-                            LOG.error("Got an exception when initial dedup config, probably dedup config is not set: " + t.getMessage() + "," + alertDef);
-                        }
-                    }
-                } else {
-                    LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId));
-                }
-		    }
-        }
-
-		alertDedups = new CopyOnWriteHashMap<>();
-		alertDedups.putAll(tmpDeduplicators);
-		DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
-		policyLoader.init(initialAlertDefs, dao, config);
-		for (String alertExecutorId : alertExecutorIdList) {
-		 	policyLoader.addPolicyChangeListener(alertExecutorId, this);
-		}
-	}
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector){
-        String policyId = (String) input.get(0);
-        AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
-        DefaultDeduplicator<AlertAPIEntity> dedup;
-        synchronized(alertDedups) {
-            dedup = alertDedups.get(policyId);
-        }
-
-        List<AlertAPIEntity> ret = Arrays.asList(alertEntity);
-        if (dedup == null) {
-            LOG.warn("Dedup config for policyId " + policyId + " is not set or is not a valid config");
-        } else {
-            if (dedup.getDedupIntervalMin() == -1) {
-                LOG.warn("the dedup interval is set as -1, which mean all alerts should be deduped(skipped)");
-                return;
-            }
-            ret = dedup.dedup(ret);
-        }
-        for (AlertAPIEntity entity : ret) {
-            outputCollector.collect(new Tuple2(policyId, entity));
-        }
-    }
-
-	public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
-		if(LOG.isDebugEnabled()) LOG.debug("Alert dedup config to be added : " + added);
-		for(AlertDefinitionAPIEntity alertDef : added.values()){
-			LOG.info("Alert dedup config really added " + alertDef);
-			DefaultDeduplicator<AlertAPIEntity> dedup = createAlertDedup(alertDef);
-			if (dedup != null) {
-				synchronized(alertDedups) {		
-					alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup);
-				}
-			}
-		}
-	}
-	
-	public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
-		LOG.info("Alert dedup config changed : " + changed);
-		for(AlertDefinitionAPIEntity alertDef : changed.values()){
-			LOG.info("Alert dedup config really changed " + alertDef);
-			DefaultDeduplicator<AlertAPIEntity> dedup = createAlertDedup(alertDef);
-			if (dedup != null) {
-				synchronized(alertDedups) {
-					alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup);
-				}
-			}
-		}
-	}
-	
-	public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
-		LOG.info("alert dedup config deleted : " + deleted);
-		for(AlertDefinitionAPIEntity alertDef : deleted.values()){
-			LOG.info("alert dedup config deleted " + alertDef);
-			// no cleanup to do, just remove it
-			synchronized(alertDedups) {		
-				alertDedups.remove(alertDef.getTags().get(AlertConstants.POLICY_ID));
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
deleted file mode 100644
index 7dd0ddc..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import eagle.alert.dao.AlertDefinitionDAO;
-
-import java.util.List;
-
-public class AlertEmailDeduplicationExecutor extends AlertDeduplicationExecutorBase {
-
-	private static final long serialVersionUID = 1L;
-
-	public AlertEmailDeduplicationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
-		super(alertExecutorIdList, DEDUP_TYPE.EMAIL, dao);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
deleted file mode 100644
index b6050a0..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import eagle.alert.dao.AlertDefinitionDAO;
-
-import java.util.List;
-
-public class AlertEntityDeduplicationExecutor extends AlertDeduplicationExecutorBase {
-
-	private static final long serialVersionUID = 1L;
-
-	public AlertEntityDeduplicationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
-		super(alertExecutorIdList, DEDUP_TYPE.ENTITY, dao);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java
deleted file mode 100644
index 97e58ac..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.time.DateUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-public class DefaultDeduplicator<T extends TaggedLogAPIEntity> implements EntityDeduplicator<T>{
-	protected long dedupIntervalMin;
-	protected Map<EntityTagsUniq, Long> entites = new HashMap<EntityTagsUniq, Long>();
-	public static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
-	
-	public static enum AlertDeduplicationStatus{
-		NEW,
-		DUPLICATED,
-		IGNORED
-	}
-	
-	public DefaultDeduplicator() {
-		this.dedupIntervalMin = 0;
-	}
-	
-	public DefaultDeduplicator(long intervalMin) {
-		this.dedupIntervalMin = intervalMin;
-	}
-	
-	public void clearOldCache() {
-		List<EntityTagsUniq> removedkeys = new ArrayList<EntityTagsUniq>();
-		for (Entry<EntityTagsUniq, Long> entry : entites.entrySet()) {
-			EntityTagsUniq entity = entry.getKey();
-			if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
-				removedkeys.add(entry.getKey());
-			}
-		}
-		for (EntityTagsUniq alertKey : removedkeys) {
-			entites.remove(alertKey);
-		}
-	}
-	
-	public AlertDeduplicationStatus checkDedup(EntityTagsUniq key){
-		long current = key.timestamp;
-		if(!entites.containsKey(key)){
-			entites.put(key, current);
-			return AlertDeduplicationStatus.NEW;
-		}
-		
-		long last = entites.get(key);
-		if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE){
-			entites.put(key, current);
-			return AlertDeduplicationStatus.DUPLICATED;
-		}
-		
-		return AlertDeduplicationStatus.IGNORED;
-	}
-	
-	public List<T> dedup(List<T> list) {
-		clearOldCache();
-		List<T> dedupList = new ArrayList<T>();
-        int totalCount = list.size();
-        int dedupedCount = 0;
-		for(T entity: list) {
-			if (entity.getTags() == null) {
-				if(LOG.isDebugEnabled()) LOG.debug("Tags is null, don't know how to deduplicate, do nothing");
-			} else {
-                AlertDeduplicationStatus status = checkDedup(new EntityTagsUniq(entity.getTags(), entity.getTimestamp()));
-                if (!status.equals(AlertDeduplicationStatus.IGNORED)) {
-                    dedupList.add(entity);
-                } else {
-                    dedupedCount++;
-                    if (LOG.isDebugEnabled())
-                        LOG.debug(String.format("Entity is skipped because it's duplicated: " + entity.toString()));
-                }
-            }
-		}
-
-        if(dedupedCount>0){
-            LOG.info(String.format("Skipped %s of %s alerts because they are duplicated",dedupedCount,totalCount));
-        }else if(LOG.isDebugEnabled()){
-            LOG.debug(String.format("Skipped %s of %s duplicated alerts",dedupedCount,totalCount));
-        }
-
-		return dedupList;
-	}
-
-	public EntityDeduplicator<T> setDedupIntervalMin(long dedupIntervalMin) {
-		this.dedupIntervalMin = dedupIntervalMin;
-		return this;
-	}
-	
-	public long getDedupIntervalMin() {
-		return dedupIntervalMin;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java
deleted file mode 100644
index f24d31a..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import java.util.List;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-/**
- * Dedup Eagle entities.
- *
- * @param <T> Eagle entity
- */
-public interface EntityDeduplicator<T extends TaggedLogAPIEntity> {
-	
-	EntityDeduplicator<T> setDedupIntervalMin(long intervalMin);
-	
-	long getDedupIntervalMin();
-	
-	List<T> dedup(List<T> list);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java
deleted file mode 100644
index b0e5dcf..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package eagle.alert.dedup;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @since Mar 19, 2015
- */
-public class EntityTagsUniq {
-	public Map<String, String> tags;
-	public Long timestamp;	 // entity's timestamp
-	public long createdTime; // entityTagsUniq's created time, for cache removal;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(EntityTagsUniq.class);
-	
-	public EntityTagsUniq(Map<String, String> tags, long timestamp) {
-		this.tags = new HashMap<String, String>(tags);
-		this.timestamp = timestamp;
-		this.createdTime = System.currentTimeMillis();
-	}
-	
-	@Override	
-	public boolean equals(Object obj) {		
-		if (obj instanceof EntityTagsUniq) {
-			EntityTagsUniq au = (EntityTagsUniq) obj;
-			if (tags.size() != au.tags.size()) return false;
-			for (Entry<String, String> keyValue : au.tags.entrySet()) {
-				boolean keyExist = tags.containsKey(keyValue.getKey());
-				// sanity check
-				if (tags.get(keyValue.getKey()) == null || keyValue.getValue() == null) {
-					return true;
-				}
-				if ( !keyExist || !tags.get(keyValue.getKey()).equals(keyValue.getValue())) {				
-					return false;
-				}
-			}
-			return true; 
-		}
-		return false;
-	}
-	
-	@Override
-	public int hashCode() {	
-		int hashCode = 0;
-		for (Map.Entry<String,String> entry : tags.entrySet()) {
-            if(entry.getValue() == null) {
-                LOG.warn("Tag value for key ["+entry.getKey()+"] is null, skipped for hash code");
-            }else {
-                try {
-                    hashCode ^= entry.getValue().hashCode();
-                } catch (Throwable t) {
-                    LOG.info("Got exception because of entry: " + entry, t);
-                }
-            }
-		}
-		return hashCode;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java
deleted file mode 100644
index 58ae733..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package eagle.alert.notification;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.common.AlertEmailSender;
-import eagle.alert.email.AlertEmailComponent;
-import eagle.alert.email.AlertEmailContext;
-import eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.ConfigObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AlertEmailGenerator{
-	private String tplFile;
-	private String sender;
-	private String recipients;
-	private String subject;
-	private ConfigObject eagleProps;
-
-    private ThreadPoolExecutor executorPool;
-
-    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
-
-    private final static long MAX_TIMEOUT_MS =60000;
-
-    public void sendAlertEmail(AlertAPIEntity entity) {
-		sendAlertEmail(entity, recipients, null);
-	}
-	
-	public void sendAlertEmail(AlertAPIEntity entity, String recipients) {
-		sendAlertEmail(entity, recipients, null);	
-	}
-	
-	public void sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) {
-		AlertEmailContext email = new AlertEmailContext();
-		
-		AlertEmailComponent component = new AlertEmailComponent();
-		component.setAlertContext(entity.getAlertContext());
-		List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
-		components.add(component);		
-		email.setComponents(components);
-		if (entity.getAlertContext().getProperty(AlertConstants.SUBJECT) != null) {
-			email.setSubject(entity.getAlertContext().getProperty(AlertConstants.SUBJECT));
-		}
-		else email.setSubject(subject);
-		email.setVelocityTplFile(tplFile);
-		email.setRecipients(recipients);
-		email.setCc(cc);
-		email.setSender(sender);
-		
-		/** asynchronized email sending */
-		@SuppressWarnings("rawtypes")
-		AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
-
-        if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
-
-        LOG.info("Sending email  in asynchronous to: "+recipients+", cc: "+cc);
-        Future future = this.executorPool.submit(thread);
-        try {
-            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-            LOG.info(String.format("Successfully send email to %s", recipients));
-        } catch (InterruptedException | ExecutionException  e) {
-            LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e);
-        } catch (TimeoutException e) {
-            LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
-        }
-    }
-	
-	public String getTplFile() {
-		return tplFile;
-	}
-	
-	public void setTplFile(String tplFile) {
-		this.tplFile = tplFile;
-	}
-
-	public String getSender() {
-		return sender;
-	}
-
-	public void setSender(String sender) {
-		this.sender = sender;
-	}
-
-	public String getRecipients() {
-		return recipients;
-	}
-
-	public void setRecipients(String recipients) {
-		this.recipients = recipients;
-	}
-
-	public String getSubject() {
-		return subject;
-	}
-
-	public void setSubject(String subject) {
-		this.subject = subject;
-	}
-
-	public ConfigObject getEagleProps() {
-		return eagleProps;
-	}
-
-	public void setEagleProps(ConfigObject eagleProps) {
-		this.eagleProps = eagleProps;
-	}
-
-    public void setExecutorPool(ThreadPoolExecutor executorPool) {
-        this.executorPool = executorPool;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java
deleted file mode 100644
index 10be162..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.notification;
-
-import java.util.concurrent.ThreadPoolExecutor;
-
-import com.typesafe.config.ConfigObject;
-
-public class AlertEmailGeneratorBuilder {
-	private AlertEmailGenerator generator;
-	private AlertEmailGeneratorBuilder(){
-		generator = new AlertEmailGenerator();
-	}
-	public static AlertEmailGeneratorBuilder newBuilder(){
-		return new AlertEmailGeneratorBuilder();
-	}
-	public AlertEmailGeneratorBuilder withSubject(String subject){
-		generator.setSubject(subject);
-		return this;
-	}
-	public AlertEmailGeneratorBuilder withSender(String sender){
-		generator.setSender(sender);
-		return this;
-	}
-	public AlertEmailGeneratorBuilder withRecipients(String recipients){
-		generator.setRecipients(recipients);
-		return this;
-	}
-	public AlertEmailGeneratorBuilder withTplFile(String tplFile){
-		generator.setTplFile(tplFile);
-		return this;
-	}
-	public AlertEmailGeneratorBuilder withEagleProps(ConfigObject eagleProps) {
-		generator.setEagleProps(eagleProps);
-		return this;
-	}
-    public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
-        generator.setExecutorPool(threadPoolExecutor);
-        return this;
-    }
-
-    public AlertEmailGenerator build(){
-		return this.generator;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java
deleted file mode 100644
index 6e5a3d7..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.notification;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import eagle.alert.config.EmailNotificationConfig;
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.alert.policy.DynamicPolicyLoader;
-import eagle.alert.policy.PolicyLifecycleMethods;
-import eagle.dataproc.core.JsonSerDeserUtils;
-import eagle.datastream.Collector;
-import eagle.datastream.JavaStormStreamExecutor1;
-import eagle.datastream.Tuple1;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * notify alert by email, sms or other means
- * currently we only implements email notification
- */
-public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods {
-
-	private static final long serialVersionUID = 1690354365435407034L;
-	private static final Logger LOG = LoggerFactory.getLogger(AlertNotificationExecutor.class);
-	private Config config;
-
-	private List<String> alertExecutorIdList;
-	private volatile CopyOnWriteHashMap<String, List<AlertEmailGenerator>> alertEmailGeneratorsMap;
-	private AlertDefinitionDAO dao;
-
-    private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
-    private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
-    private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
-
-    private transient ThreadPoolExecutor executorPool;
-
-    public AlertNotificationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
-		this.alertExecutorIdList = alertExecutorIdList;
-		this.dao = dao;
-	}
-	
-	public List<AlertEmailGenerator> createAlertEmailGenerator(AlertDefinitionAPIEntity alertDef) {		
-		Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email"));
-		EmailNotificationConfig[] emailConfigs = new EmailNotificationConfig[0];
-		try {			
-			emailConfigs = JsonSerDeserUtils.deserialize(alertDef.getNotificationDef(), EmailNotificationConfig[].class, Arrays.asList(module));
-		}
-		catch (Exception ex) {
-			LOG.warn("Initial emailConfig error, wrong format or it's error " + ex.getMessage());
-		}
-		List<AlertEmailGenerator> gens = new ArrayList<AlertEmailGenerator>();
-		if (emailConfigs == null) {
-			return gens;		
-		}
-		for(EmailNotificationConfig emailConfig : emailConfigs) {
-			String tplFileName = emailConfig.getTplFileName();			
-			if (tplFileName == null || tplFileName.equals("")) { // empty tplFileName, use default tpl file name
-				tplFileName = "ALERT_DEFAULT.vm";
-			}
-			AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
-																withEagleProps(config.getObject("eagleProps")).
-																withSubject(emailConfig.getSubject()).
-																withSender(emailConfig.getSender()).
-																withRecipients(emailConfig.getRecipients()).
-																withTplFile(tplFileName).
-                                                                withExecutorPool(executorPool).
-																build();
-			gens.add(gen);
-		}
-		return gens;
-	}
-	
-	/**
-	 * 1. register both file and database configuration
-	 * 2. create email generator from configuration
-	 */
-    @Override
-	public void init(){
-        executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-
-		Map<String, List<AlertEmailGenerator>> tmpEmailGenerators = new HashMap<String, List<AlertEmailGenerator>> ();
-		
-        String site = config.getString("eagleProps.site");
-        String dataSource = config.getString("eagleProps.dataSource");
-	    Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
-	    try {
-	 		initialAlertDefs = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
-	    }
-	    catch (Exception ex) {
- 			LOG.error("fail to initialize initialAlertDefs: ", ex);
-	        throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
-        }
- 	   
-        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
-            LOG.warn("No alert definitions found for site: "+site+", dataSource: "+dataSource);
-        }
-        else {
-		    for (String alertExecutorId: alertExecutorIdList) {
-                if(initialAlertDefs.containsKey(alertExecutorId)) {
-                    for (AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()) {
-                        List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
-                        tmpEmailGenerators.put(alertDef.getTags().get("policyId"), gens);
-                    }
-                }else{
-                    LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId));
-                }
-		    }
-        }
-		
-		alertEmailGeneratorsMap = new CopyOnWriteHashMap<String, List<AlertEmailGenerator>>();
-		alertEmailGeneratorsMap.putAll(tmpEmailGenerators);				
-		DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
-		policyLoader.init(initialAlertDefs, dao, config);
-		for (String alertExecutorId : alertExecutorIdList) {
-			policyLoader.addPolicyChangeListener(alertExecutorId, this);
-		}
-	}
-
-    @Override
-	public void prepareConfig(Config config) {
-		this.config = config;
-	}
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
-        String policyId = (String) input.get(0);
-        AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
-        processAlerts(policyId, Arrays.asList(alertEntity));
-    }
-	
-	//TODO: add a thread pool for email sender?
-	private void processAlerts(String policyId, List<AlertAPIEntity> list) {
-		List<AlertEmailGenerator> generators;
-		synchronized(alertEmailGeneratorsMap) {		
-			generators = alertEmailGeneratorsMap.get(policyId);
-		}
-		if (generators == null) {
-			LOG.warn("Notification config of policyId " + policyId + " has been deleted");
-			return;
-		}
-		for (AlertAPIEntity entity : list) {
-			for(AlertEmailGenerator generator : generators){
-				generator.sendAlertEmail(entity);
-			}
-		}
-	}
-
-	@Override
-	public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
-		if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
-		for(AlertDefinitionAPIEntity alertDef : added.values()){
-			LOG.info("alert notification config really changed " + alertDef);
-			List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
-			synchronized(alertEmailGeneratorsMap) {		
-				alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
-			}
-		}		
-	}
-
-	@Override
-	public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
-		if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be added : " + changed);
-		for(AlertDefinitionAPIEntity alertDef : changed.values()){
-			LOG.info("alert notification config really added " + alertDef);
-			List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
-			synchronized(alertEmailGeneratorsMap) {					
-				alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
-			}
-		}			
-	}
-
-	@Override
-	public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
-		if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be deleted : " + deleted);
-		for(AlertDefinitionAPIEntity alertDef : deleted.values()){
-			LOG.info("alert notification config really deleted " + alertDef);
-			synchronized(alertEmailGeneratorsMap) {		
-				alertEmailGeneratorsMap.remove(alertDef.getTags().get("policyId"));
-			}
-		}		
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java
deleted file mode 100644
index fa5dc82..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package eagle.alert.notification;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.common.EagleBase64Wrapper;
-import eagle.common.config.EagleConfigConstants;
-import eagle.log.entity.HBaseInternalLogHelper;
-import eagle.log.entity.InternalLog;
-import eagle.log.entity.RowkeyBuilder;
-import eagle.log.entity.meta.EntityDefinitionManager;
-import org.mortbay.util.UrlEncoded;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class UrlBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(UrlBuilder.class);
-
-    public static String getEncodedRowkey(AlertAPIEntity entity) throws Exception {
-        InternalLog log = HBaseInternalLogHelper.convertToInternalLog(entity, EntityDefinitionManager.getEntityDefinitionByEntityClass(entity.getClass()));
-        return EagleBase64Wrapper.encodeByteArray2URLSafeString(RowkeyBuilder.buildRowkey(log));
-    }
-
-    public static String buildAlertDetailUrl(String host, int port, AlertAPIEntity entity) {
-        String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/alertDetail/";
-        try {
-            return baseUrl + UrlEncoded.encodeString(getEncodedRowkey(entity));
-        }
-        catch (Exception ex) {
-            logger.error("Fail to populate encodedRowkey for alert Entity" + entity.toString());
-            return "N/A";
-        }
-    }
-
-    public static String buiildPolicyDetailUrl(String host, int port, Map<String, String> tags) {
-        String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/policyDetail?";
-        String format = "policy=%s&site=%s&executor=%s";
-        String policy = tags.get(AlertConstants.POLICY_ID);
-        String site = tags.get(EagleConfigConstants.SITE);
-        String alertExecutorID = tags.get(AlertConstants.ALERT_EXECUTOR_ID);
-        if (policy != null && site != null && alertExecutorID != null) {
-            return baseUrl + String.format(format, policy, site, alertExecutorID);
-        }
-        return "N/A";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java
deleted file mode 100644
index 543405d..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.persist;
-
-import eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.Config;
-import eagle.common.config.EagleConfigConstants;
-import eagle.datastream.Collector;
-import eagle.datastream.JavaStormStreamExecutor1;
-import eagle.datastream.Tuple1;
-
-import java.util.Arrays;
-
-public class AlertPersistExecutor extends JavaStormStreamExecutor1<String> {
-
-	private static final long serialVersionUID = 1L;
-	private Config config;
-	private EaglePersist persist;
-
-	public AlertPersistExecutor(){
-	}
-    @Override
-	public void prepareConfig(Config config) {
-		this.config = config;		
-	}
-
-    @Override
-	public void init() {
-		String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-		int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-		String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME)
-				? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
-		String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)
-				? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
-		this.persist = new EaglePersist(host, port, username, password);
-	}
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
-        persist.doPersist(Arrays.asList((AlertAPIEntity)(input.get(1))));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java
deleted file mode 100644
index b5820c4..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package eagle.alert.persist;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.log.entity.GenericServiceAPIResponseEntity;
-import eagle.service.client.IEagleServiceClient;
-import eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class EaglePersist {
-		
-	private static Logger LOG = LoggerFactory.getLogger(EaglePersist.class);
-	private String eagleServiceHost;
-	private int eagleServicePort;
-	private String username;
-	private String password;
-
-	public EaglePersist(String eagleServiceHost, int eagleServicePort) {
-		this(eagleServiceHost, eagleServicePort, null, null);
-	}
-
-	public EaglePersist(String eagleServiceHost, int eagleServicePort, String username, String password) {
-		this.eagleServiceHost = eagleServiceHost;
-		this.eagleServicePort = eagleServicePort;
-		this.username = username;
-		this.password = password;
-	}
-	
-	public boolean doPersist(List<? extends TaggedLogAPIEntity> list) {
-		if (list.isEmpty()) return false;
-		LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
-		try {
-			IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
-			GenericServiceAPIResponseEntity<String> response = client.create(list);
-			client.close();
-			if (response.isSuccess()) {
-				LOG.info("Successfully create entities " + list.toString());
-				return true;
-			}
-			else {
-				LOG.error("Fail to create entities");
-				return false;
-			}
-		}
-		catch (Exception ex) {
-			LOG.error("Got an exception in persisting entities" + ex.getMessage(), ex);
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java
deleted file mode 100644
index ef76c2f..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-
-public class DefaultPolicyPartitioner implements PolicyPartitioner{
-	@Override
-	public int partition(int numTotalPartitions, String policyType,
-			String policyId) {
-		final int prime = 31;
-		int result = 1;
-		result = result * prime + policyType.hashCode();
-		result = result < 0 ? result*-1 : result;
-		result = result * prime + policyId.hashCode();
-		result = result < 0 ? result*-1 : result;
-		return result % numTotalPartitions;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java
deleted file mode 100644
index ba76a15..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import eagle.common.config.EagleConfigConstants;
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import com.netflix.config.AbstractPollingScheduler;
-import com.netflix.config.ConcurrentCompositeConfiguration;
-import com.netflix.config.DynamicConfiguration;
-import com.netflix.config.FixedDelayPollingScheduler;
-import com.netflix.config.PollListener;
-import com.netflix.config.PollResult;
-import com.netflix.config.PolledConfigurationSource;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-
-public class DynamicPolicyLoader {
-	private static final Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
-	
-	private final int defaultInitialDelayMillis = 30*1000;
-	private final int defaultDelayMillis = 60*1000;
-	private final boolean defaultIgnoreDeleteFromSource = true;
-	private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>> policyChangeListeners = new CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>>();
-	private static DynamicPolicyLoader instance = new DynamicPolicyLoader();
-	private volatile boolean initialized = false;
-	
-	public void addPolicyChangeListener(String alertExecutorId, PolicyLifecycleMethods alertExecutor){
-		synchronized(policyChangeListeners) {
-			if (policyChangeListeners.get(alertExecutorId) == null) {
-				policyChangeListeners.put(alertExecutorId, new ArrayList<PolicyLifecycleMethods>());
-			}
-			policyChangeListeners.get(alertExecutorId).add(alertExecutor);
-		}
-	}
-	
-	public static DynamicPolicyLoader getInstance(){
-		return instance;
-	}
-	
-	/**
-	 * singleton with init would be good for unit test as well, and it ensures that
-	 * initialization happens only once before you use it.  
-	 * @param config
-	 * @param dao
-	 */
-	public void init(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs, 
-			AlertDefinitionDAO dao, Config config){
-		if(!initialized){
-			synchronized(this){
-				if(!initialized){
-					internalInit(initialAlertDefs, dao, config);
-					initialized = true;
-				}
-			}
-		}
-	}
-	
-	/**
-	 * map from alertExecutorId+partitionId to AlertExecutor which implements PolicyLifecycleMethods
-	 * @param initialAlertDefs
-	 * @param dao
-	 * @param config
-	 */
-	private void internalInit(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs,
-			AlertDefinitionDAO dao, Config config){
-		if(!config.getBoolean("dynamicConfigSource.enabled")) {
-            return;
-        }
-		AbstractPollingScheduler scheduler = new FixedDelayPollingScheduler(
-                config.getInt("dynamicConfigSource.initDelayMillis"),
-                config.getInt("dynamicConfigSource.delayMillis"),
-                false
-        );
-
-		scheduler.addPollListener(new PollListener(){
-			@Override
-			public void handleEvent(EventType eventType, PollResult lastResult,
-					Throwable exception) {
-				if (lastResult == null) {
-					LOG.error("The lastResult is null, something must be wrong, probably the eagle service is dead!");
-					throw new RuntimeException("The lastResult is null, probably the eagle service is dead! ", exception);
-				}
-				Map<String, Object> added = lastResult.getAdded();
-				Map<String, Object> changed = lastResult.getChanged();
-				Map<String, Object> deleted = lastResult.getDeleted();
-				for(Map.Entry<String, List<PolicyLifecycleMethods>> entry : policyChangeListeners.entrySet()){
-					String alertExecutorId = entry.getKey();
-					for (PolicyLifecycleMethods policyLifecycleMethod : entry.getValue()) {
-						Map<String, AlertDefinitionAPIEntity> addedPolicies = (Map<String, AlertDefinitionAPIEntity>)added.get(trimPartitionNum(alertExecutorId));
-						if(addedPolicies != null && addedPolicies.size() > 0){
-							policyLifecycleMethod.onPolicyCreated(addedPolicies);
-						}
-						Map<String, AlertDefinitionAPIEntity> changedPolicies = (Map<String, AlertDefinitionAPIEntity>)changed.get(trimPartitionNum(alertExecutorId));
-						if(changedPolicies != null && changedPolicies.size() > 0){
-							policyLifecycleMethod.onPolicyChanged(changedPolicies);
-						}
-						Map<String, AlertDefinitionAPIEntity> deletedPolicies = (Map<String, AlertDefinitionAPIEntity>)deleted.get(trimPartitionNum(alertExecutorId));
-						if(deletedPolicies != null && deletedPolicies.size() > 0){
-							policyLifecycleMethod.onPolicyDeleted(deletedPolicies);
-						}
-					}
-				}
-			}
-			private String trimPartitionNum(String alertExecutorId){
-				int i = alertExecutorId.lastIndexOf('_');
-				if(i != -1){
-					return alertExecutorId.substring(0, i);
-				}
-				return alertExecutorId;
-			}
-		});
-		
-		ConcurrentCompositeConfiguration finalConfig = new ConcurrentCompositeConfiguration();
-		      
-		PolledConfigurationSource source = new DynamicPolicySource(initialAlertDefs, dao, config);
-
-		try{
-			DynamicConfiguration dbSourcedConfiguration = new DynamicConfiguration(source, scheduler);
-			finalConfig.addConfiguration(dbSourcedConfiguration);
-		}catch(Exception ex){
-			LOG.warn("Fail loading from DB, continue without DB sourced configuration", ex);
-		}
-	}
-	
-	public static class DynamicPolicySource implements PolledConfigurationSource{
-		private static Logger LOG = LoggerFactory.getLogger(DynamicPolicySource.class);
-		private Config config;
-		private AlertDefinitionDAO dao;
-		/**
-		 * mapping from alertExecutorId to list of policies 
-		 */
-		private Map<String, Map<String, AlertDefinitionAPIEntity>> cachedAlertDefs;
-		
-		public DynamicPolicySource(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs, AlertDefinitionDAO dao, Config config){
-			this.cachedAlertDefs = initialAlertDefs;
-			this.dao = dao;
-			this.config = config;
-		}
-
-		public PollResult poll(boolean initial, Object checkPoint) throws Exception {
-			LOG.info("Poll policy from eagle service " +  config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST) +
-					":" + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT) );
-			Map<String, Map<String, AlertDefinitionAPIEntity>> newAlertDefs = 
-					dao.findActiveAlertDefsGroupbyAlertExecutorId(config.getString("eagleProps.site"),
-                            config.getString("eagleProps.dataSource"));
-			
-			// compare runtime alertDefs with cachedAlertDefs and figure out what are added/deleted/updated
-			Map<String, Object> added = new HashMap<String, Object>();
-			Map<String, Object> changed = new HashMap<String, Object>();
-			Map<String, Object> deleted = new HashMap<String, Object>();
-			
-			Set<String> newAlertExecutorIds = newAlertDefs.keySet();
-			Set<String> cachedAlertExecutorIds = cachedAlertDefs.keySet();
-			
-			// dynamically adding new alert executor is not supported, because alert executor is pre-built while program starts up
-			Collection<String> addedAlertExecutorIds = CollectionUtils.subtract(newAlertExecutorIds, cachedAlertExecutorIds);
-			if(addedAlertExecutorIds != null && addedAlertExecutorIds.size() > 0){
-				LOG.warn("New alertExecutorIds are found : " + addedAlertExecutorIds);
-			}
-			
-			// if one alert executor is missing, it means all policy under that alert executor should be removed
-			Collection<String> deletedAlertExecutorIds = CollectionUtils.subtract(cachedAlertExecutorIds, newAlertExecutorIds);
-			if(deletedAlertExecutorIds != null && deletedAlertExecutorIds.size() > 0){
-				LOG.warn("Some alertExecutorIds are deleted : " + deletedAlertExecutorIds);
-				for(String deletedAlertExecutorId : deletedAlertExecutorIds){
-					deleted.put(deletedAlertExecutorId, cachedAlertDefs.get(deletedAlertExecutorId));
-				}
-			}
-			
-			// we need calculate added/updated/deleted policy for all executors which are not deleted
-//			Collection<String> updatedAlertExecutorIds = CollectionUtils.intersection(newAlertExecutorIds, cachedAlertExecutorIds);
-            Collection<String> updatedAlertExecutorIds = newAlertExecutorIds;
-			for(String updatedAlertExecutorId : updatedAlertExecutorIds){
-				Map<String, AlertDefinitionAPIEntity> newPolicies = newAlertDefs.get(updatedAlertExecutorId);
-				Map<String, AlertDefinitionAPIEntity> cachedPolicies = cachedAlertDefs.get(updatedAlertExecutorId);
-				PolicyComparator.compare(updatedAlertExecutorId, newPolicies, cachedPolicies, added, changed, deleted);
-			}
-			
-			cachedAlertDefs = newAlertDefs;
-			return PollResult.createIncremental(added, changed, deleted, new Date().getTime());
-		}
-	}
-	
-	public static class PolicyComparator{
-		public static void compare(String alertExecutorId, Map<String, AlertDefinitionAPIEntity> newPolicies, Map<String, AlertDefinitionAPIEntity> cachedPolicies, 
-				Map<String, Object> added, Map<String, Object> changed, Map<String, Object> deleted){
-			Set<String> newPolicyIds = newPolicies.keySet();
-            Set<String> cachedPolicyIds = cachedPolicies != null ? cachedPolicies.keySet() : new HashSet<String>();
-			Collection<String> addedPolicyIds = CollectionUtils.subtract(newPolicyIds, cachedPolicyIds);
-			Collection<String> deletedPolicyIds = CollectionUtils.subtract(cachedPolicyIds, newPolicyIds);
-			Collection<String> changedPolicyIds = CollectionUtils.intersection(cachedPolicyIds, newPolicyIds);
-			if(addedPolicyIds != null && addedPolicyIds.size() > 0){
-				Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
-				for(String addedPolicyId : addedPolicyIds){
-					tmp.put(addedPolicyId, newPolicies.get(addedPolicyId));
-				}
-				added.put(alertExecutorId, tmp);
-			}
-			if(deletedPolicyIds != null && deletedPolicyIds.size() > 0){
-				Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
-				for(String deletedPolicyId : deletedPolicyIds){
-					tmp.put(deletedPolicyId, cachedPolicies.get(deletedPolicyId));
-				}
-				deleted.put(alertExecutorId, tmp);
-			}
-			if(changedPolicyIds != null && changedPolicyIds.size() > 0){
-				Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
-				for(String changedPolicyId : changedPolicyIds){
-					// check if policy is really changed
-					if(!newPolicies.get(changedPolicyId).equals(cachedPolicies.get(changedPolicyId))){
-						tmp.put(changedPolicyId, newPolicies.get(changedPolicyId));
-					}
-				}
-				changed.put(alertExecutorId, tmp);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java
deleted file mode 100644
index c3b53c2..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-
-public class PartitionUtils {
-	
-	public static boolean accept(AlertDefinitionAPIEntity alertDef, PolicyPartitioner partitioner, int numPartitions, int partitionSeq){
-		int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID));
-		if(targetPartitionSeq == partitionSeq)
-			return true;
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java
deleted file mode 100644
index c209b97..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.List;
-import java.util.Map;
-
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.dataproc.core.ValuesArray;
-
-public interface PolicyEvaluator {
-	/**
-	 * take input and evaluate expression
-	 * input has 3 fields, first is siddhiAlertContext, second one is streamName, the third is map of attribute name/value
-	 * @param input
-	 * @throws Exception
-	 */
-	public void evaluate(ValuesArray input) throws Exception;
-	
-	/**
-	 * notify policy evaluator that policy is updated
-	 */
-	public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef);
-	
-	/**
-	 * notify policy evaluator that policy is deleted, here is cleanup work for this policy evaluator
-	 */
-	public void onPolicyDelete();
-	
-	/**
-	 * get additional context
-	 */	
-	public Map<String, String> getAdditionalContext();
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
deleted file mode 100644
index a022f98..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.List;
-
-import com.fasterxml.jackson.databind.Module;
-
-/**
- * to provide extensibility, we need a clear differentiation between framework job and provider logic
- * policy evaluator framework:
- * - connect to eagle data source
- * - read all policy definitions
- * - compare with cached policy definitions
- * - figure out if policy is created, deleted or updated
- *   - if policy is created, then invoke onPolicyCreated
- *   - if policy is deleted, then invoke onPolicyDeleted
- *   - if policy is updated, then invoke onPolicyUpdated
- * - for policy update, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
- * - specify # of executors for this alert executor id
- * - dynamically balance # of policies evaluated by each alert executor
- *   - use zookeeper to balance. eaglePolicies/${alertExecutorId}/${alertExecutorInstanceId} => list of policies
- * 
- * policy evaluator business features:
- * - register mapping between policy type and PolicyEvaluator
- * - create evaluator engine runtime when configuration is changed
- *
- */
-public interface PolicyEvaluatorServiceProvider {
-	String getPolicyType();
-	Class<? extends PolicyEvaluator> getPolicyEvaluator();
-	List<Module> getBindingModules();
-}