You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/15 12:03:09 UTC

incubator-eagle git commit: [EAGLE-840] add task attempt stream for bad node detection

Repository: incubator-eagle
Updated Branches:
  refs/heads/master bb71d37df -> 8fe968cb7


[EAGLE-840] add task attempt stream for bad node detection

Author: wujinhu <wu...@126.com>

Closes #744 from wujinhu/EAGLE-840.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8fe968cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8fe968cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8fe968cb

Branch: refs/heads/master
Commit: 8fe968cb7e340e2f21711b5a0f701e760cb91d59
Parents: bb71d37
Author: wujinhu <wu...@126.com>
Authored: Thu Dec 15 20:03:02 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Thu Dec 15 20:03:02 2016 +0800

----------------------------------------------------------------------
 .../siddhiext/StringEmptyFunctionExtension.java | 86 ++++++++++++++++++++
 .../src/main/resources/string.siddhiext         | 18 ++++
 .../jpm/mr/history/MRHistoryJobApplication.java | 33 ++++++--
 .../history/crawler/EagleOutputCollector.java   |  2 +
 .../JobHistorySpoutCollectorInterceptor.java    |  5 ++
 .../mr/history/parser/JHFEventReaderBase.java   | 19 ++---
 .../JobEntityCreationEagleServiceListener.java  | 22 ++---
 .../history/publisher/JobStreamPublisher.java   | 54 ++++++++++++
 .../mr/history/publisher/StreamPublisher.java   | 39 +++++++++
 .../publisher/StreamPublisherManager.java       | 54 ++++++++++++
 .../publisher/TaskAttemptStreamPublisher.java   | 54 ++++++++++++
 .../jpm/mr/history/storm/JobHistorySpout.java   | 15 ++--
 ....history.MRHistoryJobApplicationProvider.xml | 81 ++++++++++++++++--
 .../src/main/resources/application.conf         |  3 +-
 14 files changed, 443 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringEmptyFunctionExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringEmptyFunctionExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringEmptyFunctionExtension.java
new file mode 100644
index 0000000..e047662
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringEmptyFunctionExtension.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.siddhiext;
+
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+public class StringEmptyFunctionExtension extends FunctionExecutor {
+    /**
+     * The initialization method for StringEmptyFunctionExtension, this method will be called before the other methods.
+     *
+     * @param attributeExpressionExecutors the executors of each function parameter
+     * @param executionPlanContext         the context of the execution plan
+     */
+    @Override
+    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+        if (attributeExpressionExecutors.length != 1) {
+            throw new ExecutionPlanValidationException("Invalid no of arguments passed to string:empty() function, "
+                    + "required 1, but found " + attributeExpressionExecutors.length);
+        }
+
+        Attribute.Type attributeType = attributeExpressionExecutors[0].getReturnType();
+        if (attributeType != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the argument of math:string() function, "
+                    + "required " + Attribute.Type.STRING
+                    + ", but found " + attributeType.toString());
+        }
+    }
+
+    /**
+     * The main execution method which will be called upon event arrival.
+     * when there are more than one function parameter
+     *
+     * @param data the runtime values of function parameters
+     * @return the function result
+     */
+    @Override
+    protected Object execute(Object[] data) {
+        return null;
+    }
+
+    @Override
+    protected Object execute(Object data) {
+        return !(data == null || ((String) data).isEmpty());
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public Attribute.Type getReturnType() {
+        return Attribute.Type.BOOL;
+    }
+
+    @Override
+    public Object[] currentState() {
+        return null;
+    }
+
+    @Override
+    public void restoreState(Object[] state) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext
new file mode 100644
index 0000000..e16be99
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+empty=org.apache.eagle.alert.siddhiext.StringEmptyFunctionExtension
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
index 66906f0..e2695a3 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -22,6 +22,10 @@ import org.apache.eagle.app.environment.impl.StormEnvironment;
 import org.apache.eagle.app.messaging.StormStreamSink;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistorySpoutCollectorInterceptor;
+import org.apache.eagle.jpm.mr.history.publisher.JobStreamPublisher;
+import org.apache.eagle.jpm.mr.history.publisher.StreamPublisherManager;
+import org.apache.eagle.jpm.mr.history.publisher.TaskAttemptStreamPublisher;
 import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
 import org.apache.eagle.jpm.util.Constants;
 
@@ -63,17 +67,36 @@ public class MRHistoryJobApplication extends StormApplication {
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         String spoutName = "mrHistoryJobSpout";
         int tasks = jhfAppConf.getInt("stormConfig.mrHistoryJobSpoutTasks");
+        JobHistorySpoutCollectorInterceptor collectorInterceptor = new JobHistorySpoutCollectorInterceptor();
+        JobHistorySpout jobHistorySpout = new JobHistorySpout(filter, appConfig, collectorInterceptor);
         topologyBuilder.setSpout(
                 spoutName,
-                new JobHistorySpout(filter, appConfig),
+                jobHistorySpout,
                 tasks
         ).setNumTasks(tasks);
 
-        StormStreamSink sinkBolt = environment.getStreamSink("mr_failed_job_stream", config);
-        BoltDeclarer kafkaBoltDeclarer = topologyBuilder.setBolt("HistoryKafkaSink", sinkBolt, jhfAppConf.getInt("stormConfig.historyKafkaSinkTasks"))
-                .setNumTasks(jhfAppConf.getInt("stormConfig.historyKafkaSinkTasks"));
-        kafkaBoltDeclarer.shuffleGrouping(spoutName);
+        StormStreamSink jobSinkBolt = environment.getStreamSink("MAP_REDUCE_JOB_STREAM", config);
+        String jobSinkBoltName = "JobKafkaSink";
+        BoltDeclarer jobKafkaBoltDeclarer = topologyBuilder.setBolt(jobSinkBoltName, jobSinkBolt, jhfAppConf.getInt("stormConfig.jobKafkaSinkTasks"))
+                .setNumTasks(jhfAppConf.getInt("stormConfig.jobKafkaSinkTasks"));
+        String spoutToJobSinkName = spoutName + "_to_" + jobSinkBoltName;
+        jobKafkaBoltDeclarer.shuffleGrouping(spoutName, spoutToJobSinkName);
 
+        StormStreamSink taskAttemptSinkBolt = environment.getStreamSink("MAP_REDUCE_TASK_ATTEMPT_STREAM", config);
+        String taskAttemptSinkBoltName = "TaskAttemptKafkaSink";
+        BoltDeclarer taskAttemptKafkaBoltDeclarer = topologyBuilder.setBolt(taskAttemptSinkBoltName, taskAttemptSinkBolt, jhfAppConf.getInt("stormConfig.taskAttemptKafkaSinkTasks"))
+                .setNumTasks(jhfAppConf.getInt("stormConfig.taskAttemptKafkaSinkTasks"));
+        String spoutToTaskAttemptSinkName = spoutName + "_to_" + taskAttemptSinkBoltName;
+        taskAttemptKafkaBoltDeclarer.shuffleGrouping(spoutName, spoutToTaskAttemptSinkName);
+
+        List<String> streams = new ArrayList<>();
+        streams.add(spoutToJobSinkName);
+        streams.add(spoutToTaskAttemptSinkName);
+        jobHistorySpout.setStreams(streams);
+
+        //4, add stream publisher
+        StreamPublisherManager.getInstance().addStreamPublisher(new JobStreamPublisher(spoutToJobSinkName, collectorInterceptor));
+        StreamPublisherManager.getInstance().addStreamPublisher(new TaskAttemptStreamPublisher(spoutToTaskAttemptSinkName, collectorInterceptor));
         return topologyBuilder.createTopology();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java
index 70eab38..e4f1de8 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java
@@ -26,4 +26,6 @@ import java.io.Serializable;
  */
 public interface EagleOutputCollector extends Serializable {
     void collect(ValuesArray t);
+
+    void collect(String streamId, ValuesArray t);
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
index a7dc9a8..fac281f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
@@ -32,4 +32,9 @@ public class JobHistorySpoutCollectorInterceptor implements EagleOutputCollector
     public void collect(ValuesArray t) {
         collector.emit(t);
     }
+
+    @Override
+    public void collect(String steamId, ValuesArray t) {
+        collector.emit(steamId, t);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index fb05bae..d89937e 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -463,18 +463,17 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             entityCreated(entity);
             attempt2ErrorMsg.put(taskAttemptID, Pair.of(taskID, entity.getError()));
             //generate TaskAttemptErrorCategoryEntity
-            TaskAttemptErrorCategoryEntity taskAttemptErrorCategoryEntity = new TaskAttemptErrorCategoryEntity();
-            Map<String, String> taskAttemptErrorCategoryEntityTags = new HashMap<>(entity.getTags());
-            taskAttemptErrorCategoryEntity.setTags(taskAttemptErrorCategoryEntityTags);
-            if (!taskAttemptErrorCategoryEntityTags.containsKey(MRJobTagName.ERROR_CATEGORY.toString())) {
-                taskAttemptErrorCategoryEntityTags.put(MRJobTagName.ERROR_CATEGORY.toString(), "");
+            if (entity.getTags().containsKey(MRJobTagName.ERROR_CATEGORY.toString())) {
+                TaskAttemptErrorCategoryEntity taskAttemptErrorCategoryEntity = new TaskAttemptErrorCategoryEntity();
+                Map<String, String> taskAttemptErrorCategoryEntityTags = new HashMap<>(entity.getTags());
+                taskAttemptErrorCategoryEntity.setTags(taskAttemptErrorCategoryEntityTags);
+
+                taskAttemptErrorCategoryEntity.setStartTime(entity.getStartTime());
+                taskAttemptErrorCategoryEntity.setEndTime(entity.getEndTime());
+                taskAttemptErrorCategoryEntity.setTimestamp(entity.getTimestamp());
+                entityCreated(taskAttemptErrorCategoryEntity);
             }
 
-            taskAttemptErrorCategoryEntity.setStartTime(entity.getStartTime());
-            taskAttemptErrorCategoryEntity.setEndTime(entity.getEndTime());
-            taskAttemptErrorCategoryEntity.setTimestamp(entity.getTimestamp());
-            entityCreated(taskAttemptErrorCategoryEntity);
-
             taskAttemptStartTime.remove(taskAttemptID);
         } else {
             // silently ignore

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 4eb58a7..0c7f8c8 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -22,6 +22,8 @@ import org.apache.eagle.dataproc.impl.storm.ValuesArray;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector;
 import org.apache.eagle.jpm.mr.history.metrics.JobExecutionMetricsCreationListener;
+import org.apache.eagle.jpm.mr.history.publisher.StreamPublisher;
+import org.apache.eagle.jpm.mr.history.publisher.StreamPublisherManager;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.mr.historyentity.*;
 import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
@@ -109,8 +111,9 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
                     ((JobExecutionAPIEntity) entity).getCurrentState());
 
                 metricEntities.addAll(jobExecutionMetricsCreationListener.generateMetrics((JobExecutionAPIEntity)entity));
-                if (((JobExecutionAPIEntity)entity).getCurrentState().equals(Constants.JobState.FAILED.toString())) {
-                    emitFailedJob((JobExecutionAPIEntity) entity);
+                StreamPublisher streamPublisher = StreamPublisherManager.getInstance().getStreamPublisher(JobExecutionAPIEntity.class);
+                if (streamPublisher != null) {
+                    streamPublisher.flush((JobExecutionAPIEntity) entity);
                 }
             } else if (entity instanceof JobEventAPIEntity) {
                 jobEvents.add((JobEventAPIEntity) entity);
@@ -118,6 +121,10 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
                 taskExecs.add((TaskExecutionAPIEntity) entity);
             } else if (entity instanceof TaskAttemptExecutionAPIEntity) {
                 taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity);
+                StreamPublisher streamPublisher = StreamPublisherManager.getInstance().getStreamPublisher(TaskAttemptExecutionAPIEntity.class);
+                if (streamPublisher != null) {
+                    streamPublisher.flush((TaskAttemptExecutionAPIEntity) entity);
+                }
             } else if (entity instanceof TaskAttemptErrorCategoryEntity) {
                 taskAttemptErrors.add((TaskAttemptErrorCategoryEntity) entity);
             }
@@ -172,15 +179,4 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
             throw new Exception("Entity creation fails going to EagleService");
         }
     }
-
-    private void emitFailedJob(JobExecutionAPIEntity entity) {
-        Map<String, Object> fields = new HashMap<>(entity.getTags());
-        fields.put("submissionTime", entity.getSubmissionTime());
-        fields.put("startTime", entity.getStartTime());
-        fields.put("endTime", entity.getEndTime());
-        fields.put("currentState", entity.getCurrentState());
-        fields.put("trackingUrl", entity.getTrackingUrl());
-
-        collector.collect(new ValuesArray(fields.get(MRJobTagName.JOB_ID.toString()), fields));
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
new file mode 100644
index 0000000..06f1ff2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.publisher;
+
+import org.apache.eagle.dataproc.impl.storm.ValuesArray;
+import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector;
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JobStreamPublisher extends StreamPublisher<JobExecutionAPIEntity> {
+    private static final Logger LOG = LoggerFactory.getLogger(JobStreamPublisher.class);
+
+    public JobStreamPublisher(String stormStreamId, EagleOutputCollector collector) {
+        super(stormStreamId, collector);
+    }
+
+    @Override
+    public Class<?> type() {
+        return JobExecutionAPIEntity.class;
+    }
+
+    @Override
+    public void flush(JobExecutionAPIEntity entity) {
+        Map<String, Object> fields = new HashMap<>(entity.getTags());
+        fields.put("submissionTime", entity.getSubmissionTime());
+        fields.put("startTime", entity.getStartTime());
+        fields.put("endTime", entity.getEndTime());
+        fields.put("currentState", entity.getCurrentState());
+        fields.put("trackingUrl", entity.getTrackingUrl());
+
+        collector.collect(stormStreamId, new ValuesArray(fields.get(MRJobTagName.JOB_ID.toString()), fields));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java
new file mode 100644
index 0000000..17eb922
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.publisher;
+
+import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class StreamPublisher<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(StreamPublisher.class);
+
+    protected String stormStreamId;
+    protected EagleOutputCollector collector;
+
+    public StreamPublisher(String stormStreamId, EagleOutputCollector collector) {
+        this.stormStreamId = stormStreamId;
+        this.collector = collector;
+    }
+
+    public abstract Class<?> type();
+
+    public abstract void flush(T entity);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisherManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisherManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisherManager.java
new file mode 100644
index 0000000..b9b68c3
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisherManager.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.publisher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class StreamPublisherManager implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(StreamPublisherManager.class);
+
+    private static StreamPublisherManager instance = new StreamPublisherManager();
+
+    //Entity Class, StreamPublisher, currently, each Entity Class has only one publisher
+    private Map<Class<?>, StreamPublisher> publishers = new HashMap<>();
+
+    public static StreamPublisherManager getInstance() {
+        return instance;
+    }
+
+    public void addStreamPublisher(StreamPublisher streamPublisher) {
+        if (publishers.containsKey(streamPublisher.type())) {
+            LOG.warn("stream {} publisher already exists", streamPublisher.type().toString());
+        }
+
+        publishers.put(streamPublisher.type(), streamPublisher);
+    }
+
+    public StreamPublisher getStreamPublisher(Class<?> type) {
+        if (!publishers.containsKey(type)) {
+            return null;
+        }
+        return publishers.get(type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java
new file mode 100644
index 0000000..e196604
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.publisher;
+
+import org.apache.eagle.dataproc.impl.storm.ValuesArray;
+import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TaskAttemptStreamPublisher extends StreamPublisher<TaskAttemptExecutionAPIEntity> {
+    private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptStreamPublisher.class);
+
+    public TaskAttemptStreamPublisher(String stormStreamId, EagleOutputCollector collector) {
+        super(stormStreamId, collector);
+    }
+
+    @Override
+    public Class<?> type() {
+        return TaskAttemptExecutionAPIEntity.class;
+    }
+
+    @Override
+    public void flush(TaskAttemptExecutionAPIEntity entity) {
+        Map<String, Object> fields = new HashMap<>(entity.getTags());
+        fields.put("startTime", entity.getStartTime());
+        fields.put("endTime", entity.getEndTime());
+        fields.put("taskStatus", entity.getTaskStatus());
+        if (!fields.containsKey(MRJobTagName.ERROR_CATEGORY.toString())) {
+            fields.put("errorCategory", "");
+        }
+        collector.collect(stormStreamId, new ValuesArray(fields.get(MRJobTagName.TASK_ATTEMPT_ID.toString()), fields));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 9743ea7..2157da9 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -97,10 +97,7 @@ public class JobHistorySpout extends BaseRichSpout {
     private static final int MAX_RETRY_TIMES = 3;
     private MRHistoryJobConfig appConfig;
     private JobHistoryEndpointConfig jobHistoryEndpointConfig;
-
-    public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig appConfig) {
-        this(filter, appConfig, new JobHistorySpoutCollectorInterceptor());
-    }
+    private List<String> streams;
 
     /**
      * mostly this constructor signature is for unit test purpose as you can put customized interceptor here.
@@ -178,12 +175,20 @@ public class JobHistorySpout extends BaseRichSpout {
         }
     }
 
+    public void setStreams(List<String> streams) {
+        this.streams = streams;
+    }
+
     /**
      * empty because framework will take care of output fields declaration.
      */
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("jobId", "message"));
+        if (streams != null) {
+            for (String streamId : streams) {
+                declarer.declareStream(streamId, new Fields("f1", "message"));
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
index 835dbda..1c3b5cb 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -35,8 +35,14 @@
             <value>4</value>
         </property>
         <property>
-            <name>stormConfig.historyKafkaSinkTasks</name>
-            <displayName>Sink Task Number</displayName>
+            <name>stormConfig.jobKafkaSinkTasks</name>
+            <displayName>Sink Task Number For Job Stream</displayName>
+            <description>the number tasks of the sink bolt will be assigned</description>
+            <value>1</value>
+        </property>
+        <property>
+            <name>stormConfig.taskAttemptKafkaSinkTasks</name>
+            <displayName>Sink Task Number For Task Attempt Stream</displayName>
             <description>the number tasks of the sink bolt will be assigned</description>
             <value>1</value>
         </property>
@@ -144,8 +150,8 @@
     </configuration>
     <streams>
         <stream>
-            <streamId>map_reduce_failed_job_stream</streamId>
-            <description>Map Reduce Failed Job Stream</description>
+            <streamId>MAP_REDUCE_JOB_STREAM</streamId>
+            <description>Map Reduce Job Stream</description>
             <validate>true</validate>
             <columns>
                 <column>
@@ -169,10 +175,6 @@
                     <type>string</type>
                 </column>
                 <column>
-                    <name>jobName</name>
-                    <type>string</type>
-                </column>
-                <column>
                     <name>jobId</name>
                     <type>string</type>
                 </column>
@@ -198,6 +200,69 @@
                 </column>
             </columns>
         </stream>
+        <stream>
+            <streamId>MAP_REDUCE_TASK_ATTEMPT_STREAM</streamId>
+            <description>Map Reduce Task Attempt Stream</description>
+            <validate>true</validate>
+            <columns>
+                <column>
+                    <name>site</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>rack</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>queue</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>user</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>jobType</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>jobDefId</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>jobId</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>taskId</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>taskType</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>taskAttemptId</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>errorCategory</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>startTime</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>endTime</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>taskStatus</name>
+                    <type>string</type>
+                </column>
+            </columns>
+        </stream>
     </streams>
     <docs>
         <install>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8fe968cb/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index 4d4dc4b..e8d5311 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -22,7 +22,8 @@
 
   "stormConfig" : {
     "mrHistoryJobSpoutTasks" : 6,
-    "historyKafkaSinkTasks" : 1
+    "jobKafkaSinkTasks" : 1,
+    "taskAttemptKafkaSinkTasks" : 1
   },
 
   "zookeeper" : {