You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/05/05 05:02:27 UTC
eagle git commit: [EAGLE-993] add duplicate removal settings in
policy definition
Repository: eagle
Updated Branches:
refs/heads/master 47f00f159 -> 8da06636b
[EAGLE-993] add duplicate removal settings in policy definition
https://issues.apache.org/jira/browse/EAGLE-993
Author: Zhao, Qingwen <qi...@apache.org>
Closes #926 from qingwen220/EAGLE-993.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/8da06636
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/8da06636
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/8da06636
Branch: refs/heads/master
Commit: 8da06636b8e3834f7bebf6b1af923ad24b7d6d8b
Parents: 47f00f1
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Fri May 5 13:02:20 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Fri May 5 13:02:20 2017 +0800
----------------------------------------------------------------------
eagle-assembly/src/main/doc/metadata-ddl.sql | 7 -
.../src/assembly/alert-assembly.xml | 3 +-
.../engine/coordinator/AlertDeduplication.java | 71 +++++++
.../engine/coordinator/PolicyDefinition.java | 13 +-
.../engine/coordinator/PublishmentType.java | 21 +-
.../publisher/email/AlertEmailGenerator.java | 4 +-
.../publisher/impl/AbstractPublishPlugin.java | 17 +-
.../publisher/impl/AlertEmailPublisher.java | 2 -
.../publisher/impl/AlertPublisherImpl.java | 43 ++---
.../publisher/impl/DefaultDeduplicator.java | 10 +-
.../template/VelocityAlertTemplateEngine.java | 8 +-
.../alert/engine/runner/AlertPublisherBolt.java | 18 +-
.../main/resources/ALERT_INLINED_TEMPLATE.vm | 2 +-
.../VelocityAlertTemplateEngineTest.java | 2 +-
.../metadata/impl/JdbcMetadataDaoImpl.java | 2 +-
.../metadata/impl/JdbcMetadataHandler.java | 30 +++
.../jpm/mr/history/JHFEventReaderBaseTest.java | 74 +++++++
.../src/test/resources/application.conf | 74 +++++++
.../resources/job_1479206441898_508949_conf.xml | 191 +++++++++++++++++--
19 files changed, 515 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-assembly/src/main/doc/metadata-ddl.sql
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/doc/metadata-ddl.sql b/eagle-assembly/src/main/doc/metadata-ddl.sql
index 0334623..3312576 100644
--- a/eagle-assembly/src/main/doc/metadata-ddl.sql
+++ b/eagle-assembly/src/main/doc/metadata-ddl.sql
@@ -164,10 +164,3 @@ CREATE TABLE IF NOT EXISTS analysis_email (
modifiedtime bigint(20) DEFAULT NULL,
UNIQUE (siteId, userId)
);
-
-INSERT INTO publishment_type(id, content) VALUES
-('Kafka', '{"name":"Kafka","type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher","description":null,"fields":[{"name":"kafka_broker","value":"sandbox.hortonworks.com:6667"},{"name":"topic"}]}'),
-('Email', '{"name":"Email","type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher","description":null,"fields":[{"name":"subject"},{"name":"sender"}, {"name":"recipients"}]}'),
-('Slack', '{"name":"Slack","type":"org.apache.eagle.alert.engine.publisher.impl.AlertSlackPublisher","description":null,"fields":[{"name":"token"},{"name":"channels"}, {"name":"severitys"}, {"name":"urltemplate"}]}'),
-('HBaseStorage', '{"name":"HBaseStorage","type":"org.apache.eagle.alert.app.AlertEagleStorePlugin","description":null,"fields":[]}'),
-('JDBCStorage', '{"name":"JDBCStorage","type":"org.apache.eagle.alert.engine.publisher.impl.AlertEagleStorePlugin","description":null,"fields":[]}');
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml
index 9f25ec0..b361f99 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml
@@ -9,7 +9,8 @@
distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
License for the specific language governing permissions and ~ limitations
- under the License. -->
+ under the License.
+-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
new file mode 100644
index 0000000..78fef7a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.List;
+import java.util.Objects;
+
+public class AlertDeduplication {
+ private String dedupIntervalMin;
+ private List<String> dedupFields;
+
+ public String getDedupIntervalMin() {
+ return dedupIntervalMin;
+ }
+
+ public void setDedupIntervalMin(String dedupIntervalMin) {
+ this.dedupIntervalMin = dedupIntervalMin;
+ }
+
+ public List<String> getDedupFields() {
+ return dedupFields;
+ }
+
+ public void setDedupFields(List<String> dedupFields) {
+ this.dedupFields = dedupFields;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(dedupFields)
+ .append(dedupIntervalMin)
+ .build();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == this) {
+ return true;
+ }
+ if (!(that instanceof AlertDeduplication)) {
+ return false;
+ }
+ AlertDeduplication another = (AlertDeduplication) that;
+ if (ListUtils.isEqualList(another.dedupFields, this.dedupFields)
+ && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)) {
+ return true;
+ }
+ return false;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index c377e41..5004513 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -43,6 +43,7 @@ public class PolicyDefinition implements Serializable {
private Definition stateDefinition;
private PolicyStatus policyStatus = PolicyStatus.ENABLED;
private AlertDefinition alertDefinition;
+ private AlertDeduplication deduplication;
// one stream only have one partition in one policy, since we don't support stream alias
private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>();
@@ -147,6 +148,7 @@ public class PolicyDefinition implements Serializable {
.append(policyStatus)
.append(parallelismHint)
.append(alertDefinition)
+ .append(deduplication)
.build();
}
@@ -172,7 +174,8 @@ public class PolicyDefinition implements Serializable {
&& CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
&& another.policyStatus.equals(this.policyStatus)
&& another.parallelismHint == this.parallelismHint
- && Objects.equals(another.alertDefinition, alertDefinition)) {
+ && Objects.equals(another.alertDefinition, alertDefinition)
+ && Objects.equals(another.deduplication, deduplication)) {
return true;
}
return false;
@@ -202,6 +205,14 @@ public class PolicyDefinition implements Serializable {
this.siteId = siteId;
}
+ public AlertDeduplication getDeduplication() {
+ return deduplication;
+ }
+
+ public void setDeduplication(AlertDeduplication deduplication) {
+ this.deduplication = deduplication;
+ }
+
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Definition implements Serializable {
private static final long serialVersionUID = -622366527887848346L;
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
index f7025f2..3119ee6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
@@ -26,17 +26,6 @@ import java.util.*;
@JsonIgnoreProperties(ignoreUnknown = true)
public class PublishmentType {
private String name;
-
- @Override
- public String toString() {
- return "PublishmentType{"
- + "name='" + name + '\''
- + ", type='" + type + '\''
- + ", description='" + description + '\''
- + ", fields=" + fields
- + '}';
- }
-
private String type;
private String description;
private List<Map<String, String>> fields = new LinkedList<>();
@@ -73,7 +62,15 @@ public class PublishmentType {
this.fields = fields;
}
-
+ @Override
+ public String toString() {
+ return "PublishmentType{"
+ + "name='" + name + '\''
+ + ", type='" + type + '\''
+ + ", description='" + description + '\''
+ + ", fields=" + fields
+ + '}';
+ }
@Override
public boolean equals(Object obj) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
index 1bcac17..a57941e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
@@ -139,7 +139,9 @@ public class AlertEmailGenerator {
alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DATA_DESC, generateAlertDataDesc(event));
alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_CATEGORY, event.getCategory());
alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, event.getSeverity().toString());
- alertContext.put(PublishConstants.ALERT_EMAIL_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
+ alertContext.put(PublishConstants.ALERT_EMAIL_TIME, String.format("%s %s",
+ DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()),
+ DateTimeUtil.CURRENT_TIME_ZONE.getID()));
alertContext.put(PublishConstants.ALERT_EMAIL_STREAM_ID, event.getStreamId());
alertContext.put(PublishConstants.ALERT_EMAIL_CREATOR, event.getCreatedBy());
alertContext.put(PublishConstants.ALERT_EMAIL_VERSION, Version.version);
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index b155bb8..c5c9e04 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -29,6 +29,7 @@ import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
import org.apache.eagle.alert.engine.publisher.dedup.ExtendedDeduplicator;
import org.slf4j.Logger;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -71,8 +72,14 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
getLogger().error(String.format("initialize extended deduplicator %s failed", spec.getClassName()), t);
}
} else {
- this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin(),
- publishment.getDedupFields(), publishment.getDedupStateField(), publishment.getDedupStateCloseValue(), dedupCache);
+ if (publishment.getDedupIntervalMin() != null && !publishment.getDedupIntervalMin().isEmpty()) {
+ this.deduplicator = new DefaultDeduplicator(
+ publishment.getDedupIntervalMin(),
+ publishment.getDedupFields(),
+ publishment.getDedupStateField(),
+ publishment.getDedupStateCloseValue(),
+ dedupCache);
+ }
this.pubName = publishment.getName();
}
String serializerClz = publishment.getSerializer();
@@ -98,7 +105,11 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
@Override
public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
- return deduplicator.dedup(event);
+ if (null != deduplicator) {
+ return deduplicator.dedup(event);
+ } else {
+ return Collections.singletonList(event);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index 152a9f1..f40680c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -40,7 +40,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*;
import static org.apache.eagle.common.mail.AlertEmailConstants.*;
public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
@@ -215,7 +214,6 @@ public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertP
.name("Email")
.type(AlertEmailPublisher.class)
.description("Email alert publisher")
- .field("subject")
.field("sender")
.field("recipients")
.build();
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index 5b902f9..e38799f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -43,7 +43,10 @@ public class AlertPublisherImpl implements AlertPublisher {
private final String name;
- private volatile Map<PublishPartition, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
+ // <publishId, PublishPlugin>
+ private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
+ //private volatile Map<PublishPartition, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
+
private Config config;
private Map conf;
@@ -73,11 +76,11 @@ public class AlertPublisherImpl implements AlertPublisher {
private void notifyAlert(PublishPartition partition, AlertStreamEvent event) {
// remove the column values for publish plugin match
partition.getColumnValues().clear();
- if (!publishPluginMapping.containsKey(partition)) {
+ if (!publishPluginMapping.containsKey(partition.getPublishId())) {
LOG.warn("PublishPartition {} is not found in publish plugin map", partition);
return;
}
- AlertPublishPlugin plugin = publishPluginMapping.get(partition);
+ AlertPublishPlugin plugin = publishPluginMapping.get(partition.getPublishId());
if (plugin == null) {
LOG.warn("PublishPartition {} has problems while initializing publish plugin", partition);
return;
@@ -120,7 +123,7 @@ public class AlertPublisherImpl implements AlertPublisher {
}
// copy and swap to avoid concurrency issue
- Map<PublishPartition, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping);
+ Map<String, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping);
// added
for (Publishment publishment : added) {
@@ -128,9 +131,7 @@ public class AlertPublisherImpl implements AlertPublisher {
AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
if (plugin != null) {
- for (PublishPartition p : getPublishPartitions(publishment)) {
- newPublishMap.put(p, plugin);
- }
+ newPublishMap.put(publishment.getName(), plugin);
} else {
LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment);
}
@@ -138,16 +139,9 @@ public class AlertPublisherImpl implements AlertPublisher {
//removed
List<AlertPublishPlugin> toBeClosed = new ArrayList<>();
for (Publishment publishment : removed) {
- AlertPublishPlugin plugin = null;
- for (PublishPartition p : getPublishPartitions(publishment)) {
- if (plugin == null) {
- plugin = newPublishMap.remove(p);
- } else {
- newPublishMap.remove(p);
- }
- }
- if (plugin != null) {
- toBeClosed.add(plugin);
+ AlertPublishPlugin publishPlugin = newPublishMap.remove(publishment.getName());
+ if (publishPlugin != null) {
+ toBeClosed.add(publishPlugin);
}
}
// updated
@@ -155,16 +149,11 @@ public class AlertPublisherImpl implements AlertPublisher {
// for updated publishment, need to init them too
AlertPublishPlugin newPlugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
if (newPlugin != null) {
- AlertPublishPlugin plugin = null;
- for (PublishPartition p : getPublishPartitions(publishment)) {
- if (plugin == null) {
- plugin = newPublishMap.get(p);
- }
- newPublishMap.put(p, newPlugin);
- }
- if (plugin != null) {
- toBeClosed.add(plugin);
+ AlertPublishPlugin oldPlugin = newPublishMap.get(publishment.getName());
+ if (oldPlugin != null) {
+ toBeClosed.add(oldPlugin);
}
+ newPublishMap.put(publishment.getName(), newPlugin);
} else {
LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment);
}
@@ -199,7 +188,7 @@ public class AlertPublisherImpl implements AlertPublisher {
try {
p.close();
} catch (Exception e) {
- LOG.error(String.format("Error when close publish plugin {}!", p.getClass().getCanonicalName()), e);
+ LOG.error("Error when close publish plugin {}!", p.getClass().getCanonicalName(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
index ac99db3..54d551e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -20,6 +20,7 @@ package org.apache.eagle.alert.engine.publisher.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.coordinator.AlertDeduplication;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
@@ -59,6 +60,13 @@ public class DefaultDeduplicator implements AlertDeduplicator {
this.dedupIntervalSec = intervalMin;
}
+ public DefaultDeduplicator(AlertDeduplication alertDeduplication) {
+ this.customDedupFields = alertDeduplication.getDedupFields();
+ this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60;
+ this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
+ this.dedupIntervalSec, TimeUnit.SECONDS).build();
+ }
+
public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) {
setDedupIntervalMin(intervalMin);
@@ -81,7 +89,7 @@ public class DefaultDeduplicator implements AlertDeduplicator {
* @param key
* @return
*/
- public List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) {
+ private List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) {
if (StringUtils.isBlank(stateFiledValue)) {
// without state field, we cannot determine whether it is duplicated
// without custom filed values, we cannot determine whether it is duplicated
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
index 87a067f..c0b765a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
@@ -151,9 +151,13 @@ public class VelocityAlertTemplateEngine implements AlertTemplateEngine {
context.put(AlertContextFields.ALERT_ID, event.getAlertId());
context.put(AlertContextFields.CREATED_BY, event.getCreatedBy());
context.put(AlertContextFields.CREATED_TIMESTAMP, event.getCreatedTime());
- context.put(AlertContextFields.CREATED_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
+ context.put(AlertContextFields.CREATED_TIME, String.format("%s %s",
+ DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()),
+ DateTimeUtil.CURRENT_TIME_ZONE.getID()));
context.put(AlertContextFields.ALERT_TIMESTAMP, event.getTimestamp());
- context.put(AlertContextFields.ALERT_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getTimestamp()));
+ context.put(AlertContextFields.ALERT_TIME, String.format("%s %s",
+ DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getTimestamp()),
+ DateTimeUtil.CURRENT_TIME_ZONE.getID()));
context.put(AlertContextFields.ALERT_SCHEMA, event.getSchema());
context.put(AlertContextFields.ALERT_EVENT, event);
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 44a5fe9..d6829d6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -33,6 +33,7 @@ import org.apache.eagle.alert.engine.publisher.AlertPublisher;
import org.apache.eagle.alert.engine.publisher.AlertStreamFilter;
import org.apache.eagle.alert.engine.publisher.PipeStreamFilter;
import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine;
import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider;
import org.apache.eagle.alert.utils.AlertConstants;
@@ -43,6 +44,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener {
@@ -51,6 +53,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
private volatile Map<String, PolicyDefinition> policyDefinitionMap;
private volatile Map<String, StreamDefinition> streamDefinitionMap;
+ private volatile Map<String, DefaultDeduplicator> deduplicatorMap = new ConcurrentHashMap<>();
private AlertTemplateEngine alertTemplateEngine;
private boolean logEventEnabled;
@@ -87,6 +90,13 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
if (logEventEnabled) {
LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event);
}
+ if (deduplicatorMap != null && deduplicatorMap.containsKey(event.getPolicyId())) {
+ List<AlertStreamEvent> eventList = deduplicatorMap.get(event.getPolicyId()).dedup(event);
+ if (eventList == null || eventList.isEmpty()) {
+ collector.ack(input);
+ return;
+ }
+ }
AlertStreamEvent filteredEvent = alertFilter.filter(event);
if (filteredEvent != null) {
alertPublisher.nextEvent(partition, filteredEvent);
@@ -139,7 +149,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
}
@Override
- public void onAlertPolicyChange(Map<String, PolicyDefinition> pds, Map<String, StreamDefinition> sds) {
+ public synchronized void onAlertPolicyChange(Map<String, PolicyDefinition> pds, Map<String, StreamDefinition> sds) {
List<String> policyToRemove = new ArrayList<>();
if (this.policyDefinitionMap != null) {
policyToRemove.addAll(this.policyDefinitionMap.keySet().stream().filter(policyId -> !pds.containsKey(policyId)).collect(Collectors.toList()));
@@ -151,6 +161,9 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
for (Map.Entry<String, PolicyDefinition> entry : pds.entrySet()) {
try {
this.alertTemplateEngine.register(entry.getValue());
+ if (entry.getValue().getDeduplication() != null) {
+ this.deduplicatorMap.put(entry.getKey(), new DefaultDeduplicator(entry.getValue().getDeduplication()));
+ }
} catch (Throwable throwable) {
LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable);
}
@@ -159,6 +172,9 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
for (String policyId : policyToRemove) {
try {
this.alertTemplateEngine.unregister(policyId);
+ if (deduplicatorMap != null && deduplicatorMap.containsKey(policyId)) {
+ deduplicatorMap.remove(policyId);
+ }
} catch (Throwable throwable) {
LOG.error("Failed to unregister policy {} from template engine", policyId, throwable);
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm
index 0e3d5fe..70013c3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm
@@ -145,7 +145,7 @@
<td class="content-block"
style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0; padding: 0 0 20px;"
valign="top">
- <small>CATEGORY:</small> <strong style="color: $alertColor">#if($alert["alertCategory"]) $alert["alertCategory"] #else N/A #end</strong> <small>TIME:</small> <strong>$alert["alertTime"]</strong>
+ <small>CATEGORY:</small> <strong style="color: $alertColor">#if($alert["alertCategory"]) $alert["alertCategory"] #else N/A #end</strong> <small>CREATE TIME:</small> <strong>$alert["alertTime"]</strong>
</td>
</tr>
<tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
index 7b1d494..e5ec474 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
@@ -36,7 +36,7 @@ public class VelocityAlertTemplateEngineTest {
templateEngine.init(ConfigFactory.load());
templateEngine.register(mockPolicy("testPolicy"));
AlertStreamEvent event = templateEngine.filter(mockAlertEvent("testPolicy"));
- Assert.assertEquals("Alert (2016-11-30 07:31:15): cpu usage on hadoop of cluster test_cluster at localhost is 0.98, " +
+ Assert.assertEquals("Alert (2016-11-30 07:31:15 UTC): cpu usage on hadoop of cluster test_cluster at localhost is 0.98, " +
"exceeding thread hold: 90%. (policy: testPolicy, description: Policy for monitoring cpu usage > 90%), " +
"definition: from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " +
"select site, metric, host, role, value insert into capacityUsageAlert", event.getBody());
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
index e0b5c9d..6427d8c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
@@ -227,7 +227,7 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
@Override
public OpResult removePolicy(String policyId) {
- return handler.removeById(PolicyDefinition.class.getSimpleName(), policyId);
+ return handler.removePolicyById(PolicyDefinition.class.getSimpleName(), policyId);
}
@Override
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
index a9e3c5e..7fffa55 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
@@ -455,6 +455,36 @@ public class JdbcMetadataHandler {
return result;
}
+ public OpResult removePolicyById(String clzName, String policyId) {
+ Connection connection = null;
+ PreparedStatement statement = null;
+ OpResult result = new OpResult();
+ try {
+ String tb = getTableName(clzName);
+ connection = dataSource.getConnection();
+ connection.setAutoCommit(false);
+ statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
+ statement.setString(1, policyId);
+ int status = statement.executeUpdate();
+ LOG.info("delete {} policy {} from {}", status, policyId, tb);
+ closeResource(null, statement, null);
+
+ statement = connection.prepareStatement(DELETE_PUBLISHMENT_STATEMENT);
+ statement.setString(1, policyId);
+ status = statement.executeUpdate();
+ LOG.info("delete {} records from policy_publishment", status);
+
+ connection.commit();
+ connection.setAutoCommit(true);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ } finally {
+ closeResource(null, statement, connection);
+ }
+ LOG.info(result.message);
+ return result;
+ }
+
public OpResult removeById(String clzName, String key) {
Connection connection = null;
PreparedStatement statement = null;
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/JHFEventReaderBaseTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/JHFEventReaderBaseTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/JHFEventReaderBaseTest.java
new file mode 100644
index 0000000..9c8146d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/JHFEventReaderBaseTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.history;
+
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
+import org.apache.eagle.jpm.mr.history.parser.JHFMRVer2EventReader;
+import org.apache.eagle.jpm.mr.history.parser.JobConfigurationCreationServiceListener;
+import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.JobConfigurationAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public class JHFEventReaderBaseTest {
+
+ @Test
+ public void testParseConfiguration() throws Exception {
+ Configuration conf = new org.apache.hadoop.conf.Configuration();
+ conf.addResource("job_1479206441898_508949_conf.xml");
+
+ final JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
+ List<String> confKeyPatterns = new ArrayList<>();
+ confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB);
+ confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB);
+ confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB);
+ confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB);
+ for (String key : confKeyPatterns) {
+ builder.includeJobKeyPatterns(Pattern.compile(key));
+ }
+ JobHistoryContentFilter filter = builder.build();
+
+ MRHistoryJobConfig appConfig = MRHistoryJobConfig.newInstance(ConfigFactory.load());
+ Map<String, String> tags = new HashMap<>();
+ tags.put("site", "sandbox");
+ tags.put("jobId", "job_1490593856016_152289");
+ tags.put("jobType", "HIVE");
+ tags.put("jobDefId", "INSERT OVERWRITE TABLE kyl...'2017-04-06')))(Stage-1)");
+ JHFMRVer2EventReader reader = new JHFMRVer2EventReader(tags, conf, filter, appConfig);
+ reader.addListener(new JobConfigurationCreationServiceListener(appConfig.getEagleServiceConfig()) {
+ @Override
+ public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
+ Assert.assertTrue(null != entity);
+ Assert.assertTrue(entity instanceof JobConfigurationAPIEntity);
+ JobConfigurationAPIEntity configurationAPIEntity = (JobConfigurationAPIEntity) entity;
+ Assert.assertTrue(configurationAPIEntity.getJobConfig().getConfig().size() == 1);
+ }
+ });
+ reader.parseConfiguration();
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf
new file mode 100644
index 0000000..00b14a8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+ "appId":"mrHistoryJob",
+ "mode":"LOCAL",
+ "workers" : 3,
+ "siteId" : "sandbox",
+ application.storm.nimbusHost=localhost
+
+ "stormConfig" : {
+ "mrHistoryJobSpoutTasks" : 6,
+ "jobKafkaSinkTasks" : 1,
+ "taskAttemptKafkaSinkTasks" : 1
+ },
+
+ "zookeeper" : {
+ "zkQuorum" : "sandbox.hortonworks.com:2181",
+ "zkRoot" : "/test_mrjobhistory",
+ "zkSessionTimeoutMs" : 15000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 20000
+ },
+
+ "endpointConfig" : {
+ "timeZone" : "UTC",
+ "mrHistoryServerUrl" : "http://sandbox.hortonworks.com:19888",
+ "basePath" : "/mr-history/done",
+ "hdfs" : {
+ fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020",
+ #if not need, then do not set
+ # hdfs.kerberos.principal = ,
+ # hdfs.keytab.file =
+ # ....
+ }
+ },
+
+ "service": {
+ "host": "localhost",
+ "port": 9090,
+ "username": "admin",
+ "password": "secret",
+ "readTimeOutSeconds" : 10,
+ context = "/rest"
+ },
+
+ "dataSinkConfig": {
+ "topic" : "map_reduce_failed_job",
+ "brokerList" : "sandbox.hortonworks.com:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
+ "producerType" : "async",
+ "numBatchMessages" : "4096",
+ "maxQueueBufferMs" : "5000",
+ "requestRequiredAcks" : "0"
+ },
+
+ "MRConfigureKeys" : {
+ "jobNameKey" : "eagle.job.name",
+ "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/8da06636/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml
index 6d22996..b670a3f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml
@@ -1,18 +1,177 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with ~
+ this work for additional information regarding copyright ownership. ~ The
+ ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
+ "License"); you may not use this file except in compliance with ~ the License.
+ You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ ~ Unless required by applicable law or agreed to in writing, software ~
+ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
+ License for the specific language governing permissions and ~ limitations
+ under the License.
-->
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<configuration>
+ <property>
+ <name>hive.optimize.skewjoin.compiletime</name>
+ <value>false</value>
+ <source>programatically</source>
+ <source>
+ org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24
+ </source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>hive.query.string</name>
+ <value>
+ select a.phone_number from customer_details a, call_detail_records b where a.phone_number=b.phone_number
+ </value>
+ <source>programatically</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>dfs.blockreport.initialDelay</name>
+ <value>120</value>
+ <source>hdfs-site.xml</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>mapreduce.reduce.markreset.buffer.percent</name>
+ <value>0.0</value>
+ <source>mapred-default.xml</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>file.client-write-packet-size</name>
+ <value>65536</value>
+ <source>core-default.xml</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>
+ hadoop.http.authentication.simple.anonymous.allowed
+ </name>
+ <value>true</value>
+ <source>core-site.xml</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>hive.querylog.location</name>
+ <value>/tmp/hive</value>
+ <source>programatically</source>
+ <source>
+ org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24
+ </source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>yarn.timeline-service.leveldb-timeline-store.path</name>
+ <value>/hadoop/yarn/timeline</value>
+ <source>yarn-site.xml</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>yarn.resourcemanager.proxy-user-privileges.enabled</name>
+ <value>false</value>
+ <source>yarn-default.xml</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>hive.exec.script.allow.partial.consumption</name>
+ <value>false</value>
+ <source>programatically</source>
+ <source>
+ org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24
+ </source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>hive.server2.global.init.file.location</name>
+ <value>/etc/hive/conf</value>
+ <source>programatically</source>
+ <source>
+ org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24
+ </source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>
+ yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms
+ </name>
+ <value>10000</value>
+ <source>yarn-site.xml</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>dfs.datanode.slow.io.warning.threshold.ms</name>
+ <value>300</value>
+ <source>hdfs-default.xml</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>hive.support.concurrency</name>
+ <value>true</value>
+ <source>file:/etc/hive/conf/hive-site.xml</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>mapreduce.reduce.shuffle.merge.percent</name>
+ <value>0.66</value>
+ <source>mapred-site.xml</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
+ <property>
+ <name>mapreduce.task.skip.start.attempts</name>
+ <value>2</value>
+ <source>mapred-default.xml</source>
+ <source>job.xml</source>
+ <source>
+ hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml
+ </source>
+ </property>
</configuration>
\ No newline at end of file