You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/05/05 05:02:27 UTC

eagle git commit: [EAGLE-993] add duplicate removal settings in policy definition

Repository: eagle
Updated Branches:
  refs/heads/master 47f00f159 -> 8da06636b


[EAGLE-993] add duplicate removal settings in policy definition

https://issues.apache.org/jira/browse/EAGLE-993

Author: Zhao, Qingwen <qi...@apache.org>

Closes #926 from qingwen220/EAGLE-993.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/8da06636
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/8da06636
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/8da06636

Branch: refs/heads/master
Commit: 8da06636b8e3834f7bebf6b1af923ad24b7d6d8b
Parents: 47f00f1
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Fri May 5 13:02:20 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Fri May 5 13:02:20 2017 +0800

----------------------------------------------------------------------
 eagle-assembly/src/main/doc/metadata-ddl.sql    |   7 -
 .../src/assembly/alert-assembly.xml             |   3 +-
 .../engine/coordinator/AlertDeduplication.java  |  71 +++++++
 .../engine/coordinator/PolicyDefinition.java    |  13 +-
 .../engine/coordinator/PublishmentType.java     |  21 +-
 .../publisher/email/AlertEmailGenerator.java    |   4 +-
 .../publisher/impl/AbstractPublishPlugin.java   |  17 +-
 .../publisher/impl/AlertEmailPublisher.java     |   2 -
 .../publisher/impl/AlertPublisherImpl.java      |  43 ++---
 .../publisher/impl/DefaultDeduplicator.java     |  10 +-
 .../template/VelocityAlertTemplateEngine.java   |   8 +-
 .../alert/engine/runner/AlertPublisherBolt.java |  18 +-
 .../main/resources/ALERT_INLINED_TEMPLATE.vm    |   2 +-
 .../VelocityAlertTemplateEngineTest.java        |   2 +-
 .../metadata/impl/JdbcMetadataDaoImpl.java      |   2 +-
 .../metadata/impl/JdbcMetadataHandler.java      |  30 +++
 .../jpm/mr/history/JHFEventReaderBaseTest.java  |  74 +++++++
 .../src/test/resources/application.conf         |  74 +++++++
 .../resources/job_1479206441898_508949_conf.xml | 191 +++++++++++++++++--
 19 files changed, 515 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-assembly/src/main/doc/metadata-ddl.sql
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/doc/metadata-ddl.sql b/eagle-assembly/src/main/doc/metadata-ddl.sql
index 0334623..3312576 100644
--- a/eagle-assembly/src/main/doc/metadata-ddl.sql
+++ b/eagle-assembly/src/main/doc/metadata-ddl.sql
@@ -164,10 +164,3 @@ CREATE TABLE IF NOT EXISTS analysis_email (
   modifiedtime  bigint(20) DEFAULT NULL,
   UNIQUE (siteId, userId)
 );
-
-INSERT INTO publishment_type(id, content) VALUES
-('Kafka', '{"name":"Kafka","type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher","description":null,"fields":[{"name":"kafka_broker","value":"sandbox.hortonworks.com:6667"},{"name":"topic"}]}'),
-('Email', '{"name":"Email","type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher","description":null,"fields":[{"name":"subject"},{"name":"sender"}, {"name":"recipients"}]}'),
-('Slack', '{"name":"Slack","type":"org.apache.eagle.alert.engine.publisher.impl.AlertSlackPublisher","description":null,"fields":[{"name":"token"},{"name":"channels"}, {"name":"severitys"}, {"name":"urltemplate"}]}'),
-('HBaseStorage', '{"name":"HBaseStorage","type":"org.apache.eagle.alert.app.AlertEagleStorePlugin","description":null,"fields":[]}'),
-('JDBCStorage', '{"name":"JDBCStorage","type":"org.apache.eagle.alert.engine.publisher.impl.AlertEagleStorePlugin","description":null,"fields":[]}');

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml
index 9f25ec0..b361f99 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml
@@ -9,7 +9,8 @@
 	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
 	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
 	License for the specific language governing permissions and ~ limitations
-	under the License. -->
+	under the License.
+-->
 <assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
new file mode 100644
index 0000000..78fef7a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
@@ -0,0 +1,71 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.List;
+import java.util.Objects;
+
+public class AlertDeduplication {
+    private String dedupIntervalMin;
+    private List<String> dedupFields;
+
+    public String getDedupIntervalMin() {
+        return dedupIntervalMin;
+    }
+
+    public void setDedupIntervalMin(String dedupIntervalMin) {
+        this.dedupIntervalMin = dedupIntervalMin;
+    }
+
+    public List<String> getDedupFields() {
+        return dedupFields;
+    }
+
+    public void setDedupFields(List<String> dedupFields) {
+        this.dedupFields = dedupFields;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+                .append(dedupFields)
+                .append(dedupIntervalMin)
+                .build();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+        if (that == this) {
+            return true;
+        }
+        if (!(that instanceof AlertDeduplication)) {
+            return false;
+        }
+        AlertDeduplication another = (AlertDeduplication) that;
+        if (ListUtils.isEqualList(another.dedupFields, this.dedupFields)
+                && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)) {
+            return true;
+        }
+        return false;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index c377e41..5004513 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -43,6 +43,7 @@ public class PolicyDefinition implements Serializable {
     private Definition stateDefinition;
     private PolicyStatus policyStatus = PolicyStatus.ENABLED;
     private AlertDefinition alertDefinition;
+    private AlertDeduplication deduplication;
 
     // one stream only have one partition in one policy, since we don't support stream alias
     private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>();
@@ -147,6 +148,7 @@ public class PolicyDefinition implements Serializable {
                 .append(policyStatus)
                 .append(parallelismHint)
                 .append(alertDefinition)
+                .append(deduplication)
                 .build();
     }
 
@@ -172,7 +174,8 @@ public class PolicyDefinition implements Serializable {
                 && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
                 && another.policyStatus.equals(this.policyStatus)
                 && another.parallelismHint == this.parallelismHint
-                && Objects.equals(another.alertDefinition, alertDefinition)) {
+                && Objects.equals(another.alertDefinition, alertDefinition)
+                && Objects.equals(another.deduplication, deduplication)) {
             return true;
         }
         return false;
@@ -202,6 +205,14 @@ public class PolicyDefinition implements Serializable {
         this.siteId = siteId;
     }
 
+    public AlertDeduplication getDeduplication() {
+        return deduplication;
+    }
+
+    public void setDeduplication(AlertDeduplication deduplication) {
+        this.deduplication = deduplication;
+    }
+
     @JsonIgnoreProperties(ignoreUnknown = true)
     public static class Definition implements Serializable {
         private static final long serialVersionUID = -622366527887848346L;

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
index f7025f2..3119ee6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
@@ -26,17 +26,6 @@ import java.util.*;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class PublishmentType {
     private String name;
-
-    @Override
-    public String toString() {
-        return "PublishmentType{"
-                + "name='" + name + '\''
-                + ", type='" + type + '\''
-                + ", description='" + description + '\''
-                + ", fields=" + fields
-                + '}';
-    }
-
     private String type;
     private String description;
     private List<Map<String, String>> fields = new LinkedList<>();
@@ -73,7 +62,15 @@ public class PublishmentType {
         this.fields = fields;
     }
 
-
+    @Override
+    public String toString() {
+        return "PublishmentType{"
+                + "name='" + name + '\''
+                + ", type='" + type + '\''
+                + ", description='" + description + '\''
+                + ", fields=" + fields
+                + '}';
+    }
 
     @Override
     public boolean equals(Object obj) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
index 1bcac17..a57941e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
@@ -139,7 +139,9 @@ public class AlertEmailGenerator {
         alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DATA_DESC, generateAlertDataDesc(event));
         alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_CATEGORY, event.getCategory());
         alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, event.getSeverity().toString());
-        alertContext.put(PublishConstants.ALERT_EMAIL_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
+        alertContext.put(PublishConstants.ALERT_EMAIL_TIME, String.format("%s %s",
+                DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()),
+                DateTimeUtil.CURRENT_TIME_ZONE.getID()));
         alertContext.put(PublishConstants.ALERT_EMAIL_STREAM_ID, event.getStreamId());
         alertContext.put(PublishConstants.ALERT_EMAIL_CREATOR, event.getCreatedBy());
         alertContext.put(PublishConstants.ALERT_EMAIL_VERSION, Version.version);

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index b155bb8..c5c9e04 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -29,6 +29,7 @@ import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
 import org.apache.eagle.alert.engine.publisher.dedup.ExtendedDeduplicator;
 import org.slf4j.Logger;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -71,8 +72,14 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
                 getLogger().error(String.format("initialize extended deduplicator %s failed", spec.getClassName()), t);
             }
         } else {
-            this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin(),
-                publishment.getDedupFields(), publishment.getDedupStateField(), publishment.getDedupStateCloseValue(), dedupCache);
+            if (publishment.getDedupIntervalMin() != null && !publishment.getDedupIntervalMin().isEmpty()) {
+                this.deduplicator = new DefaultDeduplicator(
+                        publishment.getDedupIntervalMin(),
+                        publishment.getDedupFields(),
+                        publishment.getDedupStateField(),
+                        publishment.getDedupStateCloseValue(),
+                        dedupCache);
+            }
             this.pubName = publishment.getName();
         }
         String serializerClz = publishment.getSerializer();
@@ -98,7 +105,11 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
 
     @Override
     public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
-        return deduplicator.dedup(event);
+        if (null != deduplicator) {
+            return deduplicator.dedup(event);
+        } else {
+            return Collections.singletonList(event);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index 152a9f1..f40680c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -40,7 +40,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*;
 import static org.apache.eagle.common.mail.AlertEmailConstants.*;
 
 public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
@@ -215,7 +214,6 @@ public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertP
                 .name("Email")
                 .type(AlertEmailPublisher.class)
                 .description("Email alert publisher")
-                .field("subject")
                 .field("sender")
                 .field("recipients")
                 .build();

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index 5b902f9..e38799f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -43,7 +43,10 @@ public class AlertPublisherImpl implements AlertPublisher {
 
     private final String name;
 
-    private volatile Map<PublishPartition, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
+    // <publishId, PublishPlugin>
+    private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
+    //private volatile Map<PublishPartition, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
+
     private Config config;
     private Map conf;
 
@@ -73,11 +76,11 @@ public class AlertPublisherImpl implements AlertPublisher {
     private void notifyAlert(PublishPartition partition, AlertStreamEvent event) {
         // remove the column values for publish plugin match
         partition.getColumnValues().clear();
-        if (!publishPluginMapping.containsKey(partition)) {
+        if (!publishPluginMapping.containsKey(partition.getPublishId())) {
             LOG.warn("PublishPartition {} is not found in publish plugin map", partition);
             return;
         }
-        AlertPublishPlugin plugin = publishPluginMapping.get(partition);
+        AlertPublishPlugin plugin = publishPluginMapping.get(partition.getPublishId());
         if (plugin == null) {
             LOG.warn("PublishPartition {} has problems while initializing publish plugin", partition);
             return;
@@ -120,7 +123,7 @@ public class AlertPublisherImpl implements AlertPublisher {
         }
 
         // copy and swap to avoid concurrency issue
-        Map<PublishPartition, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping);
+        Map<String, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping);
 
         // added
         for (Publishment publishment : added) {
@@ -128,9 +131,7 @@ public class AlertPublisherImpl implements AlertPublisher {
 
             AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
             if (plugin != null) {
-                for (PublishPartition p : getPublishPartitions(publishment)) {
-                    newPublishMap.put(p, plugin);
-                }
+                newPublishMap.put(publishment.getName(), plugin);
             } else {
                 LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment);
             }
@@ -138,16 +139,9 @@ public class AlertPublisherImpl implements AlertPublisher {
         //removed
         List<AlertPublishPlugin> toBeClosed = new ArrayList<>();
         for (Publishment publishment : removed) {
-            AlertPublishPlugin plugin = null;
-            for (PublishPartition p : getPublishPartitions(publishment)) {
-                if (plugin == null) {
-                    plugin = newPublishMap.remove(p);
-                } else {
-                    newPublishMap.remove(p);
-                }
-            }
-            if (plugin != null) {
-                toBeClosed.add(plugin);
+            AlertPublishPlugin publishPlugin = newPublishMap.remove(publishment.getName());
+            if (publishPlugin != null) {
+                toBeClosed.add(publishPlugin);
             }
         }
         // updated
@@ -155,16 +149,11 @@ public class AlertPublisherImpl implements AlertPublisher {
             // for updated publishment, need to init them too
             AlertPublishPlugin newPlugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
             if (newPlugin != null) {
-                AlertPublishPlugin plugin = null;
-                for (PublishPartition p : getPublishPartitions(publishment)) {
-                    if (plugin == null) {
-                        plugin = newPublishMap.get(p);
-                    }
-                    newPublishMap.put(p, newPlugin);
-                }
-                if (plugin != null) {
-                    toBeClosed.add(plugin);
+                AlertPublishPlugin oldPlugin = newPublishMap.get(publishment.getName());
+                if (oldPlugin != null) {
+                    toBeClosed.add(oldPlugin);
                 }
+                newPublishMap.put(publishment.getName(), newPlugin);
             } else {
                 LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment);
             }
@@ -199,7 +188,7 @@ public class AlertPublisherImpl implements AlertPublisher {
             try {
                 p.close();
             } catch (Exception e) {
-                LOG.error(String.format("Error when close publish plugin {}!", p.getClass().getCanonicalName()), e);
+                LOG.error("Error when close publish plugin {}!", p.getClass().getCanonicalName(), e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
index ac99db3..54d551e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -20,6 +20,7 @@ package org.apache.eagle.alert.engine.publisher.impl;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.coordinator.AlertDeduplication;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
@@ -59,6 +60,13 @@ public class DefaultDeduplicator implements AlertDeduplicator {
         this.dedupIntervalSec = intervalMin;
     }
 
+    public DefaultDeduplicator(AlertDeduplication alertDeduplication) {
+        this.customDedupFields = alertDeduplication.getDedupFields();
+        this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60;
+        this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
+                this.dedupIntervalSec, TimeUnit.SECONDS).build();
+    }
+
     public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
                                String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) {
         setDedupIntervalMin(intervalMin);
@@ -81,7 +89,7 @@ public class DefaultDeduplicator implements AlertDeduplicator {
      * @param key
      * @return
      */
-    public List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) {
+    private List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) {
         if (StringUtils.isBlank(stateFiledValue)) {
             // without state field, we cannot determine whether it is duplicated
             // without custom filed values, we cannot determine whether it is duplicated

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
index 87a067f..c0b765a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
@@ -151,9 +151,13 @@ public class VelocityAlertTemplateEngine implements AlertTemplateEngine {
         context.put(AlertContextFields.ALERT_ID, event.getAlertId());
         context.put(AlertContextFields.CREATED_BY, event.getCreatedBy());
         context.put(AlertContextFields.CREATED_TIMESTAMP, event.getCreatedTime());
-        context.put(AlertContextFields.CREATED_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
+        context.put(AlertContextFields.CREATED_TIME,  String.format("%s %s",
+                DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()),
+                DateTimeUtil.CURRENT_TIME_ZONE.getID()));
         context.put(AlertContextFields.ALERT_TIMESTAMP, event.getTimestamp());
-        context.put(AlertContextFields.ALERT_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getTimestamp()));
+        context.put(AlertContextFields.ALERT_TIME,  String.format("%s %s",
+                DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getTimestamp()),
+                DateTimeUtil.CURRENT_TIME_ZONE.getID()));
         context.put(AlertContextFields.ALERT_SCHEMA, event.getSchema());
         context.put(AlertContextFields.ALERT_EVENT, event);
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 44a5fe9..d6829d6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -33,6 +33,7 @@ import org.apache.eagle.alert.engine.publisher.AlertPublisher;
 import org.apache.eagle.alert.engine.publisher.AlertStreamFilter;
 import org.apache.eagle.alert.engine.publisher.PipeStreamFilter;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
 import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine;
 import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider;
 import org.apache.eagle.alert.utils.AlertConstants;
@@ -43,6 +44,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener {
@@ -51,6 +53,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
     private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
     private volatile Map<String, PolicyDefinition> policyDefinitionMap;
     private volatile Map<String, StreamDefinition> streamDefinitionMap;
+    private volatile Map<String, DefaultDeduplicator> deduplicatorMap = new ConcurrentHashMap<>();
     private AlertTemplateEngine alertTemplateEngine;
 
     private boolean logEventEnabled;
@@ -87,6 +90,13 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
             if (logEventEnabled) {
                 LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event);
             }
+            if (deduplicatorMap != null && deduplicatorMap.containsKey(event.getPolicyId())) {
+                List<AlertStreamEvent> eventList = deduplicatorMap.get(event.getPolicyId()).dedup(event);
+                if (eventList == null || eventList.isEmpty()) {
+                    collector.ack(input);
+                    return;
+                }
+            }
             AlertStreamEvent filteredEvent = alertFilter.filter(event);
             if (filteredEvent != null) {
                 alertPublisher.nextEvent(partition, filteredEvent);
@@ -139,7 +149,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
     }
 
     @Override
-    public void onAlertPolicyChange(Map<String, PolicyDefinition> pds, Map<String, StreamDefinition> sds) {
+    public synchronized void onAlertPolicyChange(Map<String, PolicyDefinition> pds, Map<String, StreamDefinition> sds) {
         List<String> policyToRemove = new ArrayList<>();
         if (this.policyDefinitionMap != null) {
             policyToRemove.addAll(this.policyDefinitionMap.keySet().stream().filter(policyId -> !pds.containsKey(policyId)).collect(Collectors.toList()));
@@ -151,6 +161,9 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
         for (Map.Entry<String, PolicyDefinition> entry : pds.entrySet()) {
             try {
                 this.alertTemplateEngine.register(entry.getValue());
+                if (entry.getValue().getDeduplication() != null) {
+                    this.deduplicatorMap.put(entry.getKey(), new DefaultDeduplicator(entry.getValue().getDeduplication()));
+                }
             } catch (Throwable throwable) {
                 LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable);
             }
@@ -159,6 +172,9 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
         for (String policyId : policyToRemove) {
             try {
                 this.alertTemplateEngine.unregister(policyId);
+                if (deduplicatorMap != null && deduplicatorMap.containsKey(policyId)) {
+                    deduplicatorMap.remove(policyId);
+                }
             } catch (Throwable throwable) {
                 LOG.error("Failed to unregister policy {} from template engine", policyId, throwable);
             }

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm
index 0e3d5fe..70013c3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm
@@ -145,7 +145,7 @@
                                 <td class="content-block"
                                     style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0; padding: 0 0 20px;"
                                     valign="top">
-                                    <small>CATEGORY:</small> <strong style="color: $alertColor">#if($alert["alertCategory"]) $alert["alertCategory"] #else N/A #end</strong> <small>TIME:</small> <strong>$alert["alertTime"]</strong>
+                                    <small>CATEGORY:</small> <strong style="color: $alertColor">#if($alert["alertCategory"]) $alert["alertCategory"] #else N/A #end</strong> <small>CREATE TIME:</small> <strong>$alert["alertTime"]</strong>
                                 </td>
                             </tr>
                                 <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
index 7b1d494..e5ec474 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
@@ -36,7 +36,7 @@ public class VelocityAlertTemplateEngineTest {
         templateEngine.init(ConfigFactory.load());
         templateEngine.register(mockPolicy("testPolicy"));
         AlertStreamEvent event = templateEngine.filter(mockAlertEvent("testPolicy"));
-        Assert.assertEquals("Alert (2016-11-30 07:31:15): cpu usage on hadoop of cluster test_cluster at localhost is 0.98, " +
+        Assert.assertEquals("Alert (2016-11-30 07:31:15 UTC): cpu usage on hadoop of cluster test_cluster at localhost is 0.98, " +
             "exceeding thread hold: 90%. (policy: testPolicy, description: Policy for monitoring cpu usage > 90%), " +
             "definition: from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " +
             "select site, metric, host, role, value insert into capacityUsageAlert", event.getBody());

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
index e0b5c9d..6427d8c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
@@ -227,7 +227,7 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
 
     @Override
     public OpResult removePolicy(String policyId) {
-        return handler.removeById(PolicyDefinition.class.getSimpleName(), policyId);
+        return handler.removePolicyById(PolicyDefinition.class.getSimpleName(), policyId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
index a9e3c5e..7fffa55 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
@@ -455,6 +455,36 @@ public class JdbcMetadataHandler {
         return result;
     }
 
+    public OpResult removePolicyById(String clzName, String policyId) {
+        Connection connection = null;
+        PreparedStatement statement = null;
+        OpResult result = new OpResult();
+        try {
+            String tb = getTableName(clzName);
+            connection = dataSource.getConnection();
+            connection.setAutoCommit(false);
+            statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
+            statement.setString(1, policyId);
+            int status = statement.executeUpdate();
+            LOG.info("delete {} policy {} from {}", status, policyId, tb);
+            closeResource(null, statement, null);
+
+            statement = connection.prepareStatement(DELETE_PUBLISHMENT_STATEMENT);
+            statement.setString(1, policyId);
+            status = statement.executeUpdate();
+            LOG.info("delete {} records from policy_publishment", status);
+
+            connection.commit();
+            connection.setAutoCommit(true);
+        } catch (SQLException e) {
+            e.printStackTrace();
+        } finally {
+            closeResource(null, statement, connection);
+        }
+        LOG.info(result.message);
+        return result;
+    }
+
     public OpResult removeById(String clzName, String key) {
         Connection connection = null;
         PreparedStatement statement = null;

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/JHFEventReaderBaseTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/JHFEventReaderBaseTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/JHFEventReaderBaseTest.java
new file mode 100644
index 0000000..9c8146d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/JHFEventReaderBaseTest.java
@@ -0,0 +1,74 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.history;
+
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
+import org.apache.eagle.jpm.mr.history.parser.JHFMRVer2EventReader;
+import org.apache.eagle.jpm.mr.history.parser.JobConfigurationCreationServiceListener;
+import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.JobConfigurationAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public class JHFEventReaderBaseTest {
+
+    @Test
+    public void testParseConfiguration() throws Exception {
+        Configuration conf = new org.apache.hadoop.conf.Configuration();
+        conf.addResource("job_1479206441898_508949_conf.xml");
+
+        final JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
+        List<String> confKeyPatterns = new ArrayList<>();
+        confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB);
+        confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB);
+        confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB);
+        confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB);
+        for (String key : confKeyPatterns) {
+            builder.includeJobKeyPatterns(Pattern.compile(key));
+        }
+        JobHistoryContentFilter filter = builder.build();
+
+        MRHistoryJobConfig appConfig = MRHistoryJobConfig.newInstance(ConfigFactory.load());
+        Map<String, String> tags = new HashMap<>();
+        tags.put("site", "sandbox");
+        tags.put("jobId", "job_1490593856016_152289");
+        tags.put("jobType", "HIVE");
+        tags.put("jobDefId", "INSERT OVERWRITE TABLE kyl...'2017-04-06')))(Stage-1)");
+        JHFMRVer2EventReader reader = new JHFMRVer2EventReader(tags, conf, filter, appConfig);
+        reader.addListener(new JobConfigurationCreationServiceListener(appConfig.getEagleServiceConfig()) {
+            @Override
+            public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
+                Assert.assertTrue(null != entity);
+                Assert.assertTrue(entity instanceof  JobConfigurationAPIEntity);
+                JobConfigurationAPIEntity configurationAPIEntity = (JobConfigurationAPIEntity) entity;
+                Assert.assertTrue(configurationAPIEntity.getJobConfig().getConfig().size() == 1);
+            }
+        });
+        reader.parseConfiguration();
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf
new file mode 100644
index 0000000..00b14a8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "appId":"mrHistoryJob",
+  "mode":"LOCAL",
+  "workers" : 3,
+  "siteId" : "sandbox",
+  application.storm.nimbusHost=localhost
+
+  "stormConfig" : {
+    "mrHistoryJobSpoutTasks" : 6,
+    "jobKafkaSinkTasks" : 1,
+    "taskAttemptKafkaSinkTasks" : 1
+  },
+
+  "zookeeper" : {
+    "zkQuorum" : "sandbox.hortonworks.com:2181",
+    "zkRoot" : "/test_mrjobhistory",
+    "zkSessionTimeoutMs" : 15000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 20000
+  },
+
+  "endpointConfig" : {
+    "timeZone" : "UTC",
+    "mrHistoryServerUrl" : "http://sandbox.hortonworks.com:19888",
+    "basePath" : "/mr-history/done",
+    "hdfs" : {
+      fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020",
+      #if not need, then do not set
+      # hdfs.kerberos.principal = ,
+      # hdfs.keytab.file =
+      # ....
+    }
+  },
+
+  "service": {
+    "host": "localhost",
+    "port": 9090,
+    "username": "admin",
+    "password": "secret",
+    "readTimeOutSeconds" : 10,
+    context = "/rest"
+  },
+
+  "dataSinkConfig": {
+    "topic" : "map_reduce_failed_job",
+    "brokerList" : "sandbox.hortonworks.com:6667",
+    "serializerClass" : "kafka.serializer.StringEncoder",
+    "keySerializerClass" : "kafka.serializer.StringEncoder"
+    "producerType" : "async",
+    "numBatchMessages" : "4096",
+    "maxQueueBufferMs" : "5000",
+    "requestRequiredAcks" : "0"
+  },
+
+  "MRConfigureKeys" : {
+    "jobNameKey" : "eagle.job.name",
+    "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml
index 6d22996..b670a3f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml
@@ -1,18 +1,177 @@
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
+	~ contributor license agreements. See the NOTICE file distributed with ~
+	this work for additional information regarding copyright ownership. ~ The
+	ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
+	"License"); you may not use this file except in compliance with ~ the License.
+	You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
+	~ ~ Unless required by applicable law or agreed to in writing, software ~
+	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
+	License for the specific language governing permissions and ~ limitations
+	under the License.
 -->
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<configuration>
+    <property>
+        <name>hive.optimize.skewjoin.compiletime</name>
+        <value>false</value>
+        <source>programatically</source>
+        <source>
+            org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24
+        </source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>hive.query.string</name>
+        <value>
+            select a.phone_number from customer_details a, call_detail_records b where a.phone_number=b.phone_number
+        </value>
+        <source>programatically</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>dfs.blockreport.initialDelay</name>
+        <value>120</value>
+        <source>hdfs-site.xml</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>mapreduce.reduce.markreset.buffer.percent</name>
+        <value>0.0</value>
+        <source>mapred-default.xml</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>file.client-write-packet-size</name>
+        <value>65536</value>
+        <source>core-default.xml</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>
+            hadoop.http.authentication.simple.anonymous.allowed
+        </name>
+        <value>true</value>
+        <source>core-site.xml</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>hive.querylog.location</name>
+        <value>/tmp/hive</value>
+        <source>programatically</source>
+        <source>
+            org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24
+        </source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>yarn.timeline-service.leveldb-timeline-store.path</name>
+        <value>/hadoop/yarn/timeline</value>
+        <source>yarn-site.xml</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>yarn.resourcemanager.proxy-user-privileges.enabled</name>
+        <value>false</value>
+        <source>yarn-default.xml</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>hive.exec.script.allow.partial.consumption</name>
+        <value>false</value>
+        <source>programatically</source>
+        <source>
+            org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24
+        </source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>hive.server2.global.init.file.location</name>
+        <value>/etc/hive/conf</value>
+        <source>programatically</source>
+        <source>
+            org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24
+        </source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>
+            yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms
+        </name>
+        <value>10000</value>
+        <source>yarn-site.xml</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>dfs.datanode.slow.io.warning.threshold.ms</name>
+        <value>300</value>
+        <source>hdfs-default.xml</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>hive.support.concurrency</name>
+        <value>true</value>
+        <source>file:/etc/hive/conf/hive-site.xml</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>mapreduce.reduce.shuffle.merge.percent</name>
+        <value>0.66</value>
+        <source>mapred-site.xml</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
+    <property>
+        <name>mapreduce.task.skip.start.attempts</name>
+        <value>2</value>
+        <source>mapred-default.xml</source>
+        <source>job.xml</source>
+        <source>
+            hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+        </source>
+    </property>
 </configuration>
\ No newline at end of file