You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/10/27 07:45:46 UTC
incubator-eagle git commit: [EAGLE-659] mr history feeder integrate
with alert engine
Repository: incubator-eagle
Updated Branches:
refs/heads/master ee65453db -> 8d26a4f12
[EAGLE-659] mr history feeder integrate with alert engine
Author: wujinhu <wu...@126.com>
Closes #571 from wujinhu/EAGLE-659.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8d26a4f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8d26a4f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8d26a4f1
Branch: refs/heads/master
Commit: 8d26a4f129f0d3a8729f8bac356bb748090d834b
Parents: ee65453
Author: wujinhu <wu...@126.com>
Authored: Thu Oct 27 15:45:41 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Thu Oct 27 15:45:41 2016 +0800
----------------------------------------------------------------------
.../metadata/resource/MetadataResource.java | 16 +++
.../eagle/app/resource/ApplicationResource.java | 1 +
.../jpm/mr/history/MRHistoryJobApplication.java | 8 ++
.../crawler/DefaultJHFInputStreamCallback.java | 4 +-
.../jpm/mr/history/parser/JHFParserFactory.java | 8 +-
.../JobEntityCreationEagleServiceListener.java | 30 +++--
.../jpm/mr/history/storm/JobHistorySpout.java | 2 +
....history.MRHistoryJobApplicationProvider.xml | 112 +++++++++++++++++++
.../src/main/resources/application.conf | 19 ++++
9 files changed, 190 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 6b88a4e..f0a89ce 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -276,6 +276,22 @@ public class MetadataResource {
throw new IllegalArgumentException("Publishsment (name: " + publishmentId + ") not found");
}
}
+
+ //for other publishments, remove policyId from them, work around, we should refactor
+ for (String publishmentId : publishmentMap.keySet()) {
+ if (publishmentIds.contains(publishmentId)) {
+ continue;
+ }
+ Publishment publishment = publishmentMap.get(publishmentId);
+ if (publishment.getPolicyIds() != null && publishment.getPolicyIds().contains(policyId)) {
+ publishment.getPolicyIds().remove(policyId);
+ OpResult opResult = addPublishment(publishment);
+ if (opResult.code == OpResult.FAILURE) {
+ LOG.error("Failed to delete policy {}, from publisher {}, {} ", policyId, publishmentId, opResult.message);
+ return opResult;
+ }
+ }
+ }
result.code = OpResult.SUCCESS;
result.message = "Successfully add " + publishmentIds.size() + " publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId;
LOG.info(result.message);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
index b493b2b..e634598 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
@@ -96,6 +96,7 @@ public class ApplicationResource {
public RESTResponse<ApplicationEntity> updateApplicationEntity(@PathParam("appUuid") String appUuid, ApplicationOperations.UpdateOperation updateOperation) {
return RESTResponse.async(() -> {
ApplicationEntity applicationEntity = new ApplicationEntity();
+ applicationEntity.setStatus(entityService.getByUUID(appUuid).getStatus());
applicationEntity.setUuid(appUuid);
applicationEntity.setJarPath(updateOperation.getJarPath());
applicationEntity.setMode(updateOperation.getMode());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
index e33fc02..aaf65ac 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -16,8 +16,10 @@
*/
package org.apache.eagle.jpm.mr.history;
+import backtype.storm.topology.BoltDeclarer;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
@@ -66,6 +68,12 @@ public class MRHistoryJobApplication extends StormApplication {
new JobHistorySpout(filter, config),
tasks
).setNumTasks(tasks);
+
+ StormStreamSink sinkBolt = environment.getStreamSink("mr_failed_job_stream", config);
+ BoltDeclarer kafkaBoltDeclarer = topologyBuilder.setBolt("HistoryKafkaSink", sinkBolt, jhfAppConf.getInt("stormConfig.historyKafkaSinkTasks"))
+ .setNumTasks(jhfAppConf.getInt("stormConfig.historyKafkaSinkTasks"));
+ kafkaBoltDeclarer.shuffleGrouping(spoutName);
+
return topologyBuilder.createTopology();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index b491857..af83985 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -33,9 +33,11 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
private JobHistoryContentFilter filter;
+ private EagleOutputCollector collector;
public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, EagleOutputCollector eagleCollector) {
this.filter = filter;
+ this.collector = eagleCollector;
}
@Override
@@ -52,7 +54,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
jobFileInputStream.close();
} else {
//get parser and parse, do not need to emit data now
- JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, filter);
+ JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, filter, this.collector);
parser.parse(jobFileInputStream);
jobFileInputStream.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index fee9394..fd5483b 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -18,6 +18,7 @@
package org.apache.eagle.jpm.mr.history.parser;
+import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
@@ -29,9 +30,12 @@ public class JHFParserFactory {
private static final Logger LOG = LoggerFactory.getLogger(JHFParserFactory.class);
- public static JHFParserBase getParser(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+ public static JHFParserBase getParser(Map<String, String> baseTags,
+ Configuration configuration,
+ JobHistoryContentFilter filter,
+ EagleOutputCollector outputCollector) {
JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
- reader2.addListener(new JobEntityCreationEagleServiceListener());
+ reader2.addListener(new JobEntityCreationEagleServiceListener(outputCollector));
reader2.addListener(new TaskFailureListener());
reader2.addListener(new TaskAttemptCounterListener());
reader2.addListener(new JobConfigurationCreationServiceListener());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 67cfb71..2c7a2b1 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -18,10 +18,15 @@
package org.apache.eagle.jpm.mr.history.parser;
+import org.apache.eagle.dataproc.impl.storm.ValuesArray;
import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
+import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector;
import org.apache.eagle.jpm.mr.history.metrics.JobExecutionMetricsCreationListener;
import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
import org.apache.eagle.jpm.mr.historyentity.*;
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
import org.apache.eagle.jpm.util.MRJobTagName;
import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
@@ -30,10 +35,7 @@ import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.GregorianCalendar;
-import java.util.List;
-import java.util.TimeZone;
+import java.util.*;
public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCreationListener {
private static final Logger logger = LoggerFactory.getLogger(JobEntityCreationEagleServiceListener.class);
@@ -46,16 +48,18 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
private JobExecutionMetricsCreationListener jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener();
private TimeZone timeZone;
+ private EagleOutputCollector collector;
- public JobEntityCreationEagleServiceListener() {
- this(BATCH_SIZE);
+ public JobEntityCreationEagleServiceListener(EagleOutputCollector collector) {
+ this(BATCH_SIZE, collector);
}
- public JobEntityCreationEagleServiceListener(int batchSize) {
+ public JobEntityCreationEagleServiceListener(int batchSize, EagleOutputCollector collector) {
if (batchSize <= 0) {
throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided");
}
this.batchSize = batchSize;
+ this.collector = collector;
timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getJobHistoryEndpointConfig().timeZone);
}
@@ -100,6 +104,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
((JobExecutionAPIEntity) entity).getCurrentState());
metricEntities.addAll(jobExecutionMetricsCreationListener.generateMetrics((JobExecutionAPIEntity)entity));
+ emitFailedJob((JobExecutionAPIEntity)entity);
} else if (entity instanceof JobEventAPIEntity) {
jobEvents.add((JobEventAPIEntity) entity);
} else if (entity instanceof TaskExecutionAPIEntity) {
@@ -152,4 +157,15 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
throw new Exception("Entity creation fails going to EagleService");
}
}
+
+ private void emitFailedJob(JobExecutionAPIEntity entity) {
+ Map<String, Object> fields = new HashMap<>(entity.getTags());
+ fields.put("submissionTime", entity.getSubmissionTime());
+ fields.put("startTime", entity.getStartTime());
+ fields.put("endTime", entity.getEndTime());
+ fields.put("currentState", entity.getCurrentState());
+ fields.put("trackingUrl", entity.getTrackingUrl());
+
+ collector.collect(new ValuesArray(fields.get(MRJobTagName.JOB_ID.toString()), fields));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 88cd100..8dc951f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -18,6 +18,7 @@
package org.apache.eagle.jpm.mr.history.storm;
+import backtype.storm.tuple.Fields;
import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.mr.history.crawler.*;
import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
@@ -180,6 +181,7 @@ public class JobHistorySpout extends BaseRichSpout {
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("jobId", "message"));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
index 7df445f..545cf56 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -35,6 +35,12 @@
<value>4</value>
</property>
<property>
+ <name>stormConfig.historyKafkaSinkTasks</name>
+ <displayName>Sink Task Number</displayName>
+ <description>the number tasks of the sink bolt will be assigned</description>
+ <value>1</value>
+ </property>
+ <property>
<name>endpointConfig.hdfs.fs.defaultFS</name>
<displayName>HDFS Address</displayName>
<description>The name of the default file system. Either the literal string "local" or a host:port for NDFS</description>
@@ -62,8 +68,57 @@
<value>Etc/GMT+7</value>
<required>true</required>
</property>
+ <property>
+ <name>dataSinkConfig.topic</name>
+ <displayName>dataSinkConfig.topic</displayName>
+ <value>map_reduce_failed_job</value>
+ <description>topic for kafka data sink</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.brokerList</name>
+ <displayName>dataSinkConfig.brokerList</displayName>
+ <value>localhost:6667</value>
+ <description>kafka broker list</description>
+ <required>true</required>
+ </property>
+ <property>
+ <name>dataSinkConfig.serializerClass</name>
+ <displayName>dataSinkConfig.serializerClass</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>serializer class Kafka message value</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.keySerializerClass</name>
+ <displayName>dataSinkConfig.keySerializerClass</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>serializer class Kafka message key</description>
+ </property>
<property>
+ <name>dataSinkConfig.producerType</name>
+ <displayName>dataSinkConfig.producerType</displayName>
+ <value>async</value>
+ <description>whether the messages are sent asynchronously in a background thread</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.numBatchMessages</name>
+ <displayName>dataSinkConfig.numBatchMessages</displayName>
+ <value>4096</value>
+ <description>number of messages to send in one batch when using async mode</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.maxQueueBufferMs</name>
+ <displayName>dataSinkConfig.maxQueueBufferMs</displayName>
+ <value>5000</value>
+ <description>maximum time to buffer data when using async mode</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.requestRequiredAcks</name>
+ <displayName>dataSinkConfig.requestRequiredAcks</displayName>
+ <value>0</value>
+ <description>value controls when a produce request is considered completed</description>
+ </property>
+ <property>
<name>MRConfigureKeys.jobConfigKey</name>
<displayName>Map Reduce Extracted Configuration Keys</displayName>
<description>which configures will be extracted from map reduce job configurations</description>
@@ -86,6 +141,63 @@
<value>eagle.job.name</value>
</property>
</configuration>
+ <streams>
+ <stream>
+ <streamId>map_reduce_failed_job_stream</streamId>
+ <description>Map Reduce Failed Job Stream</description>
+ <validate>true</validate>
+ <columns>
+ <column>
+ <name>site</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>queue</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>user</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>jobType</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>jobDefId</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>jobName</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>jobId</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>submissionTime</name>
+ <type>long</type>
+ </column>
+ <column>
+ <name>trackingUrl</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>startTime</name>
+ <type>long</type>
+ </column>
+ <column>
+ <name>endTime</name>
+ <type>long</type>
+ </column>
+ <column>
+ <name>currentState</name>
+ <type>string</type>
+ </column>
+ </columns>
+ </stream>
+ </streams>
<docs>
<install>
</install>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d26a4f1/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index 0eb287f..4d4dc4b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -22,6 +22,7 @@
"stormConfig" : {
"mrHistoryJobSpoutTasks" : 6,
+ "historyKafkaSinkTasks" : 1
},
"zookeeper" : {
@@ -53,6 +54,24 @@
"readTimeOutSeconds" : 10,
},
+ "eagleService": {
+ "host": "localhost",
+ "port": 9090,
+ "username": "admin",
+ "password": "secret"
+ },
+
+ "dataSinkConfig": {
+ "topic" : "map_reduce_failed_job",
+ "brokerList" : "sandbox.hortonworks.com:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
+ "producerType" : "async",
+ "numBatchMessages" : "4096",
+ "maxQueueBufferMs" : "5000",
+ "requestRequiredAcks" : "0"
+ },
+
"MRConfigureKeys" : {
"jobNameKey" : "eagle.job.name",
"jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts"