You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/10/27 07:45:46 UTC

incubator-eagle git commit: [EAGLE-659] mr history feeder integrate with alert engine

Repository: incubator-eagle
Updated Branches:
  refs/heads/master ee65453db -> 8d26a4f12


[EAGLE-659] mr history feeder integrate with alert engine

Author: wujinhu <wu...@126.com>

Closes #571 from wujinhu/EAGLE-659.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8d26a4f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8d26a4f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8d26a4f1

Branch: refs/heads/master
Commit: 8d26a4f129f0d3a8729f8bac356bb748090d834b
Parents: ee65453
Author: wujinhu <wu...@126.com>
Authored: Thu Oct 27 15:45:41 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Thu Oct 27 15:45:41 2016 +0800

----------------------------------------------------------------------
 .../metadata/resource/MetadataResource.java     |  16 +++
 .../eagle/app/resource/ApplicationResource.java |   1 +
 .../jpm/mr/history/MRHistoryJobApplication.java |   8 ++
 .../crawler/DefaultJHFInputStreamCallback.java  |   4 +-
 .../jpm/mr/history/parser/JHFParserFactory.java |   8 +-
 .../JobEntityCreationEagleServiceListener.java  |  30 +++--
 .../jpm/mr/history/storm/JobHistorySpout.java   |   2 +
 ....history.MRHistoryJobApplicationProvider.xml | 112 +++++++++++++++++++
 .../src/main/resources/application.conf         |  19 ++++
 9 files changed, 190 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 6b88a4e..f0a89ce 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -276,6 +276,22 @@ public class MetadataResource {
                     throw new IllegalArgumentException("Publishsment (name: " + publishmentId + ") not found");
                 }
             }
+
+            //for other publishments, remove policyId from them, work around, we should refactor
+            for (String publishmentId : publishmentMap.keySet()) {
+                if (publishmentIds.contains(publishmentId)) {
+                    continue;
+                }
+                Publishment publishment = publishmentMap.get(publishmentId);
+                if (publishment.getPolicyIds() != null && publishment.getPolicyIds().contains(policyId)) {
+                    publishment.getPolicyIds().remove(policyId);
+                    OpResult opResult = addPublishment(publishment);
+                    if (opResult.code == OpResult.FAILURE) {
+                        LOG.error("Failed to delete policy {}, from publisher {}, {} ", policyId, publishmentId, opResult.message);
+                        return opResult;
+                    }
+                }
+            }
             result.code = OpResult.SUCCESS;
             result.message = "Successfully add " + publishmentIds.size() + " publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId;
             LOG.info(result.message);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
index b493b2b..e634598 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
@@ -96,6 +96,7 @@ public class ApplicationResource {
     public RESTResponse<ApplicationEntity> updateApplicationEntity(@PathParam("appUuid") String appUuid, ApplicationOperations.UpdateOperation updateOperation) {
         return RESTResponse.async(() -> {
             ApplicationEntity applicationEntity = new ApplicationEntity();
+            applicationEntity.setStatus(entityService.getByUUID(appUuid).getStatus());
             applicationEntity.setUuid(appUuid);
             applicationEntity.setJarPath(updateOperation.getJarPath());
             applicationEntity.setMode(updateOperation.getMode());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
index e33fc02..aaf65ac 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -16,8 +16,10 @@
  */
 package org.apache.eagle.jpm.mr.history;
 
+import backtype.storm.topology.BoltDeclarer;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
 import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
@@ -66,6 +68,12 @@ public class MRHistoryJobApplication extends StormApplication {
                 new JobHistorySpout(filter, config),
                 tasks
         ).setNumTasks(tasks);
+
+        StormStreamSink sinkBolt = environment.getStreamSink("mr_failed_job_stream", config);
+        BoltDeclarer kafkaBoltDeclarer = topologyBuilder.setBolt("HistoryKafkaSink", sinkBolt, jhfAppConf.getInt("stormConfig.historyKafkaSinkTasks"))
+                .setNumTasks(jhfAppConf.getInt("stormConfig.historyKafkaSinkTasks"));
+        kafkaBoltDeclarer.shuffleGrouping(spoutName);
+
         return topologyBuilder.createTopology();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index b491857..af83985 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -33,9 +33,11 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
 
 
     private JobHistoryContentFilter filter;
+    private EagleOutputCollector collector;
 
     public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, EagleOutputCollector eagleCollector) {
         this.filter = filter;
+        this.collector = eagleCollector;
     }
 
     @Override
@@ -52,7 +54,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
             jobFileInputStream.close();
         } else {
             //get parser and parse, do not need to emit data now
-            JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, filter);
+            JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, filter, this.collector);
             parser.parse(jobFileInputStream);
             jobFileInputStream.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index fee9394..fd5483b 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
+import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
@@ -29,9 +30,12 @@ public class JHFParserFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(JHFParserFactory.class);
 
-    public static JHFParserBase getParser(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+    public static JHFParserBase getParser(Map<String, String> baseTags,
+                                          Configuration configuration,
+                                          JobHistoryContentFilter filter,
+                                          EagleOutputCollector outputCollector) {
         JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
-        reader2.addListener(new JobEntityCreationEagleServiceListener());
+        reader2.addListener(new JobEntityCreationEagleServiceListener(outputCollector));
         reader2.addListener(new TaskFailureListener());
         reader2.addListener(new TaskAttemptCounterListener());
         reader2.addListener(new JobConfigurationCreationServiceListener());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 67cfb71..2c7a2b1 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -18,10 +18,15 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
+import org.apache.eagle.dataproc.impl.storm.ValuesArray;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
+import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector;
 import org.apache.eagle.jpm.mr.history.metrics.JobExecutionMetricsCreationListener;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.mr.historyentity.*;
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
 import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
@@ -30,10 +35,7 @@ import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.GregorianCalendar;
-import java.util.List;
-import java.util.TimeZone;
+import java.util.*;
 
 public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCreationListener {
     private static final Logger logger = LoggerFactory.getLogger(JobEntityCreationEagleServiceListener.class);
@@ -46,16 +48,18 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
     List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
     private JobExecutionMetricsCreationListener jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener();
     private TimeZone timeZone;
+    private EagleOutputCollector collector;
 
-    public JobEntityCreationEagleServiceListener() {
-        this(BATCH_SIZE);
+    public JobEntityCreationEagleServiceListener(EagleOutputCollector collector) {
+        this(BATCH_SIZE, collector);
     }
 
-    public JobEntityCreationEagleServiceListener(int batchSize) {
+    public JobEntityCreationEagleServiceListener(int batchSize, EagleOutputCollector collector) {
         if (batchSize <= 0) {
             throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided");
         }
         this.batchSize = batchSize;
+        this.collector = collector;
         timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getJobHistoryEndpointConfig().timeZone);
     }
 
@@ -100,6 +104,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
                     ((JobExecutionAPIEntity) entity).getCurrentState());
 
                 metricEntities.addAll(jobExecutionMetricsCreationListener.generateMetrics((JobExecutionAPIEntity)entity));
+                emitFailedJob((JobExecutionAPIEntity)entity);
             } else if (entity instanceof JobEventAPIEntity) {
                 jobEvents.add((JobEventAPIEntity) entity);
             } else if (entity instanceof TaskExecutionAPIEntity) {
@@ -152,4 +157,15 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
             throw new Exception("Entity creation fails going to EagleService");
         }
     }
+
+    private void emitFailedJob(JobExecutionAPIEntity entity) {
+        Map<String, Object> fields = new HashMap<>(entity.getTags());
+        fields.put("submissionTime", entity.getSubmissionTime());
+        fields.put("startTime", entity.getStartTime());
+        fields.put("endTime", entity.getEndTime());
+        fields.put("currentState", entity.getCurrentState());
+        fields.put("trackingUrl", entity.getTrackingUrl());
+
+        collector.collect(new ValuesArray(fields.get(MRJobTagName.JOB_ID.toString()), fields));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 88cd100..8dc951f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.storm;
 
+import backtype.storm.tuple.Fields;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.*;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
@@ -180,6 +181,7 @@ public class JobHistorySpout extends BaseRichSpout {
      */
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("jobId", "message"));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
index 7df445f..545cf56 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -35,6 +35,12 @@
             <value>4</value>
         </property>
         <property>
+            <name>stormConfig.historyKafkaSinkTasks</name>
+            <displayName>Sink Task Number</displayName>
+            <description>the number tasks of the sink bolt will be assigned</description>
+            <value>1</value>
+        </property>
+        <property>
             <name>endpointConfig.hdfs.fs.defaultFS</name>
             <displayName>HDFS Address</displayName>
             <description>The name of the default file system.  Either the literal string "local" or a host:port for NDFS</description>
@@ -62,8 +68,57 @@
             <value>Etc/GMT+7</value>
             <required>true</required>
         </property>
+        <property>
+            <name>dataSinkConfig.topic</name>
+            <displayName>dataSinkConfig.topic</displayName>
+            <value>map_reduce_failed_job</value>
+            <description>topic for kafka data sink</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.brokerList</name>
+            <displayName>dataSinkConfig.brokerList</displayName>
+            <value>localhost:6667</value>
+            <description>kafka broker list</description>
+            <required>true</required>
+        </property>
+        <property>
+            <name>dataSinkConfig.serializerClass</name>
+            <displayName>dataSinkConfig.serializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message value</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.keySerializerClass</name>
+            <displayName>dataSinkConfig.keySerializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message key</description>
+        </property>
 
         <property>
+            <name>dataSinkConfig.producerType</name>
+            <displayName>dataSinkConfig.producerType</displayName>
+            <value>async</value>
+            <description>whether the messages are sent asynchronously in a background thread</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.numBatchMessages</name>
+            <displayName>dataSinkConfig.numBatchMessages</displayName>
+            <value>4096</value>
+            <description>number of messages to send in one batch when using async mode</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.maxQueueBufferMs</name>
+            <displayName>dataSinkConfig.maxQueueBufferMs</displayName>
+            <value>5000</value>
+            <description>maximum time to buffer data when using async mode</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.requestRequiredAcks</name>
+            <displayName>dataSinkConfig.requestRequiredAcks</displayName>
+            <value>0</value>
+            <description>value controls when a produce request is considered completed</description>
+        </property>
+        <property>
             <name>MRConfigureKeys.jobConfigKey</name>
             <displayName>Map Reduce Extracted Configuration Keys</displayName>
             <description>which configures will be extracted from map reduce job configurations</description>
@@ -86,6 +141,63 @@
             <value>eagle.job.name</value>
         </property>
     </configuration>
+    <streams>
+        <stream>
+            <streamId>map_reduce_failed_job_stream</streamId>
+            <description>Map Reduce Failed Job Stream</description>
+            <validate>true</validate>
+            <columns>
+                <column>
+                    <name>site</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>queue</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>user</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>jobType</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>jobDefId</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>jobName</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>jobId</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>submissionTime</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>trackingUrl</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>startTime</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>endTime</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>currentState</name>
+                    <type>string</type>
+                </column>
+            </columns>
+        </stream>
+    </streams>
     <docs>
         <install>
         </install>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index 0eb287f..4d4dc4b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -22,6 +22,7 @@
 
   "stormConfig" : {
     "mrHistoryJobSpoutTasks" : 6,
+    "historyKafkaSinkTasks" : 1
   },
 
   "zookeeper" : {
@@ -53,6 +54,24 @@
     "readTimeOutSeconds" : 10,
   },
 
+  "eagleService": {
+    "host": "localhost",
+    "port": 9090,
+    "username": "admin",
+    "password": "secret"
+  },
+
+  "dataSinkConfig": {
+    "topic" : "map_reduce_failed_job",
+    "brokerList" : "sandbox.hortonworks.com:6667",
+    "serializerClass" : "kafka.serializer.StringEncoder",
+    "keySerializerClass" : "kafka.serializer.StringEncoder"
+    "producerType" : "async",
+    "numBatchMessages" : "4096",
+    "maxQueueBufferMs" : "5000",
+    "requestRequiredAcks" : "0"
+  },
+
   "MRConfigureKeys" : {
     "jobNameKey" : "eagle.job.name",
     "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts"