You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/07/15 18:38:10 UTC

[gobblin] branch master updated: [GOBBLIN-1457] Add automatic troubleshooter to Gobblin service (#3299)

This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b5e32b1  [GOBBLIN-1457] Add automatic troubleshooter to Gobblin service (#3299)
b5e32b1 is described below

commit b5e32b158bd713c45caa5b4b8866f8a4c4fc3a29
Author: Alex Prokofiev <ap...@linkedin.com>
AuthorDate: Thu Jul 15 11:37:59 2021 -0700

    [GOBBLIN-1457] Add automatic troubleshooter to Gobblin service (#3299)
    
    In previous commits, we've added automatic troubleshooting to Gobblin
    Azkaban jobs, and here we will collect and expose discovered issues
    in Gobblin service.
    
    Initial implementation will store issues for a limited number of jobs in
    memory, and future commits will add persistence.
---
 .../runtime/KafkaAvroJobStatusMonitorTest.java     |   8 +-
 .../pegasus/org/apache/gobblin/service/Issue.pdl   |  46 +++++++
 .../org/apache/gobblin/service/IssueSeverity.pdl   |   9 ++
 .../org/apache/gobblin/service/JobStatus.pdl       |   7 ++
 .../org/apache/gobblin/service/Timestamp.pdl       |   8 ++
 ...he.gobblin.service.flowexecutions.snapshot.json |  50 +++++++-
 ...ache.gobblin.service.flowstatuses.snapshot.json |  50 +++++++-
 .../org/apache/gobblin/service/FlowStatusTest.java | 140 +++++++++++++--------
 .../service/FlowExecutionResourceLocalHandler.java |  26 +++-
 gobblin-runtime/build.gradle                       |   1 +
 .../InMemoryMultiContextIssueRepository.java       |  98 +++++++++++++++
 .../troubleshooter/JobIssueEventHandler.java       | 116 +++++++++++++++++
 .../MultiContextIssueRepository.java               |  38 ++++++
 .../troubleshooter/TroubleshooterUtils.java        |  40 ++++++
 .../gobblin/service/monitoring/JobStatus.java      |   5 +
 .../service/monitoring/JobStatusRetriever.java     |  25 +++-
 .../InMemoryMultiContextIssueRepositoryTest.java   |  58 +++++++++
 .../troubleshooter/JobIssueEventHandlerTest.java   |  52 ++++++++
 .../MultiContextIssueRepositoryTest.java           | 116 +++++++++++++++++
 .../modules/core/GobblinServiceGuiceModule.java    |   7 ++
 .../service/monitoring/FsJobStatusRetriever.java   |   6 +-
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  19 ++-
 .../service/monitoring/KafkaJobStatusMonitor.java  |  30 ++++-
 .../monitoring/KafkaJobStatusMonitorFactory.java   |  14 ++-
 .../monitoring/LocalFsJobStatusRetriever.java      |  20 +--
 .../monitoring/MysqlJobStatusRetriever.java        |   8 +-
 .../monitoring/FsJobStatusRetrieverTest.java       |  16 ++-
 .../monitoring/MysqlJobStatusRetrieverTest.java    |  10 +-
 gradle/scripts/dependencyDefinitions.gradle        |   1 +
 29 files changed, 935 insertions(+), 89 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index c443b42..f0b5d23 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -55,11 +55,14 @@ import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
 import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
 import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
 import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
 
+import static org.mockito.Mockito.mock;
+
 
 public class KafkaAvroJobStatusMonitorTest {
   public static final String TOPIC = KafkaAvroJobStatusMonitorTest.class.getSimpleName();
@@ -176,7 +179,8 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
 
     messageAndMetadata = iterator.next();
-    Assert.assertNull(jobStatusMonitor.parseJobStatus(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata)));
+    Assert.assertNull(jobStatusMonitor.parseJobStatus(
+        jobStatusMonitor.deserializeEvent(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata))));
 
     // Check that state didn't get set to running since it was already complete
     messageAndMetadata = iterator.next();
@@ -465,7 +469,7 @@ public class KafkaAvroJobStatusMonitorTest {
 
     public MockKafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
         throws IOException, ReflectiveOperationException {
-      super(topic, config, numThreads);
+      super(topic, config, numThreads, mock(JobIssueEventHandler.class));
     }
 
     @Override
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Issue.pdl b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Issue.pdl
new file mode 100644
index 0000000..47cc512
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Issue.pdl
@@ -0,0 +1,46 @@
+namespace org.apache.gobblin.service
+
+/**
+ * Issue describes a specific unique problem in the job or application.
+ *
+ * Issue can be generated from log entries, health checks, and other places.
+ */
+record Issue {
+
+  /**
+   * Time when the issue have occured
+   */
+  time: Timestamp
+
+  /**
+   * Severity from DEBUG to FATAL
+   */
+  severity: IssueSeverity
+
+  /**
+   * Unique machine-readable code that identifies a specific problem.
+   *
+   * It can be used for making programmatic decisions on how to handle and recover from this issue.
+   *
+   * Issues representing the same kind of problem will have the same code.
+   * */
+  code: string
+
+  /**
+   * Short, human-readable description of the issue.
+   *
+   * It should focus on what is the root cause of the problem, and what steps the user should do to resolve it.
+   */
+  summary: string
+
+  /**
+   * Human-readable issue details that can include exception stack trace and additional information about the problem.
+   */
+  details: string
+
+  /**
+   * Additional machine-readable properties of the issue.
+   *
+   */
+  properties: map[string,string]
+}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/IssueSeverity.pdl b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/IssueSeverity.pdl
new file mode 100644
index 0000000..b2b2ed4
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/IssueSeverity.pdl
@@ -0,0 +1,9 @@
+namespace org.apache.gobblin.service
+
+enum IssueSeverity {
+  DEBUG,
+  INFO,
+  WARN,
+  ERROR,
+  FATAL
+}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdl b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdl
index 615f6cf..1c64b8d 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdl
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdl
@@ -44,4 +44,11 @@ record JobStatus {
    * Job state that is updated only at the start and end of a job execution.
    */
   jobState: JobState
+
+  /**
+   * Collection of issues for the job.
+   *
+   * Can include various errors and warnings that will help the user to troubleshoot problems.
+   * */
+  issues: array[Issue]
 }
\ No newline at end of file
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Timestamp.pdl b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Timestamp.pdl
new file mode 100644
index 0000000..2cb068c
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Timestamp.pdl
@@ -0,0 +1,8 @@
+namespace org.apache.gobblin.service
+
+/**
+ * Epoch/UNIX time in milliseconds
+ *
+ * Represents the number of milliseconds since the epoch of 1970-01-01T00:00:00Z
+ */
+typeref Timestamp = long
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 734c2ac..44eb865 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -198,12 +198,60 @@
               } ]
             },
             "doc" : "Job state that is updated only at the start and end of a job execution."
+          }, {
+            "name" : "issues",
+            "type" : {
+              "type" : "array",
+              "items" : {
+                "type" : "record",
+                "name" : "Issue",
+                "doc" : "Issue describes a specific unique problem in the job or application.\n\nIssue can be generated from log entries, health checks, and other places.",
+                "fields" : [ {
+                  "name" : "time",
+                  "type" : {
+                    "type" : "typeref",
+                    "name" : "Timestamp",
+                    "doc" : "Epoch/UNIX time in milliseconds\n\nRepresents the number of milliseconds since the epoch of 1970-01-01T00:00:00Z",
+                    "ref" : "long"
+                  },
+                  "doc" : "Time when the issue have occured"
+                }, {
+                  "name" : "severity",
+                  "type" : {
+                    "type" : "enum",
+                    "name" : "IssueSeverity",
+                    "symbols" : [ "DEBUG", "INFO", "WARN", "ERROR", "FATAL" ]
+                  },
+                  "doc" : "Severity from DEBUG to FATAL"
+                }, {
+                  "name" : "code",
+                  "type" : "string",
+                  "doc" : "Unique machine-readable code that identifies a specific problem.\n\nIt can be used for making programmatic decisions on how to handle and recover from this issue.\n\nIssues representing the same kind of problem will have the same code.\n"
+                }, {
+                  "name" : "summary",
+                  "type" : "string",
+                  "doc" : "Short, human-readable description of the issue.\n\nIt should focus on what is the root cause of the problem, and what steps the user should do to resolve it."
+                }, {
+                  "name" : "details",
+                  "type" : "string",
+                  "doc" : "Human-readable issue details that can include exception stack trace and additional information about the problem."
+                }, {
+                  "name" : "properties",
+                  "type" : {
+                    "type" : "map",
+                    "values" : "string"
+                  },
+                  "doc" : "Additional machine-readable properties of the issue.\n"
+                } ]
+              }
+            },
+            "doc" : "Collection of issues for the job.\n\nCan include various errors and warnings that will help the user to troubleshoot problems.\n"
           } ]
         }
       },
       "doc" : "Status of jobs belonging to the flow"
     } ]
-  }, "org.apache.gobblin.service.FlowId", "org.apache.gobblin.service.FlowStatistics", "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus" ],
+  }, "org.apache.gobblin.service.FlowId", "org.apache.gobblin.service.FlowStatistics", "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.Issue", "org.apache.gobblin.service.IssueSeverity", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus", "org.apache.gobblin.service.Timestamp" ],
   "schema" : {
     "name" : "flowexecutions",
     "namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index d3d4dbc..69b7ad4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -200,13 +200,61 @@
               } ]
             },
             "doc" : "Job state that is updated only at the start and end of a job execution."
+          }, {
+            "name" : "issues",
+            "type" : {
+              "type" : "array",
+              "items" : {
+                "type" : "record",
+                "name" : "Issue",
+                "doc" : "Issue describes a specific unique problem in the job or application.\n\nIssue can be generated from log entries, health checks, and other places.",
+                "fields" : [ {
+                  "name" : "time",
+                  "type" : {
+                    "type" : "typeref",
+                    "name" : "Timestamp",
+                    "doc" : "Epoch/UNIX time in milliseconds\n\nRepresents the number of milliseconds since the epoch of 1970-01-01T00:00:00Z",
+                    "ref" : "long"
+                  },
+                  "doc" : "Time when the issue have occured"
+                }, {
+                  "name" : "severity",
+                  "type" : {
+                    "type" : "enum",
+                    "name" : "IssueSeverity",
+                    "symbols" : [ "DEBUG", "INFO", "WARN", "ERROR", "FATAL" ]
+                  },
+                  "doc" : "Severity from DEBUG to FATAL"
+                }, {
+                  "name" : "code",
+                  "type" : "string",
+                  "doc" : "Unique machine-readable code that identifies a specific problem.\n\nIt can be used for making programmatic decisions on how to handle and recover from this issue.\n\nIssues representing the same kind of problem will have the same code.\n"
+                }, {
+                  "name" : "summary",
+                  "type" : "string",
+                  "doc" : "Short, human-readable description of the issue.\n\nIt should focus on what is the root cause of the problem, and what steps the user should do to resolve it."
+                }, {
+                  "name" : "details",
+                  "type" : "string",
+                  "doc" : "Human-readable issue details that can include exception stack trace and additional information about the problem."
+                }, {
+                  "name" : "properties",
+                  "type" : {
+                    "type" : "map",
+                    "values" : "string"
+                  },
+                  "doc" : "Additional machine-readable properties of the issue.\n"
+                } ]
+              }
+            },
+            "doc" : "Collection of issues for the job.\n\nCan include various errors and warnings that will help the user to troubleshoot problems.\n"
           } ]
         }
       },
       "doc" : "Status of jobs belonging to the flow"
     } ],
     "deprecated" : "Use FlowExecution instead"
-  }, "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus" ],
+  }, "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.Issue", "org.apache.gobblin.service.IssueSeverity", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus", "org.apache.gobblin.service.Timestamp" ],
   "schema" : {
     "name" : "flowstatuses",
     "namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index 16e08fc..da81dd9 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -39,9 +39,13 @@ import com.linkedin.restli.server.resources.BaseResource;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 
+import static org.mockito.Mockito.mock;
+
+
 @Test(groups = { "gobblin.service" }, singleThreaded = true)
 public class FlowStatusTest {
   private FlowStatusClient _client;
@@ -49,6 +53,11 @@ public class FlowStatusTest {
   private List<List<org.apache.gobblin.service.monitoring.JobStatus>> _listOfJobStatusLists;
 
   class TestJobStatusRetriever extends JobStatusRetriever {
+
+    protected TestJobStatusRetriever(MultiContextIssueRepository issueRepository) {
+      super(issueRepository);
+    }
+
     @Override
     public Iterator<org.apache.gobblin.service.monitoring.JobStatus> getJobStatusesForFlowExecution(String flowName,
         String flowGroup, long flowExecutionId) {
@@ -81,7 +90,7 @@ public class FlowStatusTest {
 
   @BeforeClass
   public void setUp() throws Exception {
-    JobStatusRetriever jobStatusRetriever = new TestJobStatusRetriever();
+    JobStatusRetriever jobStatusRetriever = new TestJobStatusRetriever(mock(MultiContextIssueRepository.class));
     final FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
 
     Injector injector = Guice.createInjector(new Module() {
@@ -108,24 +117,33 @@ public class FlowStatusTest {
    */
   @Test
   public void testFindLatest() throws Exception {
-    org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1")
-        .processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
-    org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(5000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).build();
-    org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup("jgroup1").jobName("job1").jobTag("dataset1").startTime(2000L).endTime(6000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 2")
-        .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
-    org.apache.gobblin.service.monitoring.JobStatus js3 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup("jgroup1").jobName("job2").jobTag("dataset2").startTime(2000L).endTime(6000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 3")
-        .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
-    org.apache.gobblin.service.monitoring.JobStatus fs2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Flow message").build();
+    org.apache.gobblin.service.monitoring.JobStatus js1 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
+            .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1").processedCount(100)
+            .jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
+            .build();
+    org.apache.gobblin.service.monitoring.JobStatus fs1 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(5000L)
+            .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).issues(Collections.emptyList()).build();
+    org.apache.gobblin.service.monitoring.JobStatus js2 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup("jgroup1").jobName("job1").jobTag("dataset1").startTime(2000L).endTime(6000L)
+            .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 2").processedCount(200)
+            .jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+            .build();
+    org.apache.gobblin.service.monitoring.JobStatus js3 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup("jgroup1").jobName("job2").jobTag("dataset2").startTime(2000L).endTime(6000L)
+            .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 3").processedCount(200)
+            .jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+            .build();
+    org.apache.gobblin.service.monitoring.JobStatus fs2 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+            .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Flow message")
+            .issues(Collections.emptyList()).build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 = Lists.newArrayList(js1, fs1);
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = Lists.newArrayList(js2, js3, fs2);
     _listOfJobStatusLists = Lists.newArrayList();
@@ -170,17 +188,23 @@ public class FlowStatusTest {
    */
   @Test
   public void testGetCompleted() throws Exception {
-    org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1")
-        .processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
-    org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2")
-        .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
-    org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Flow message").build();
+    org.apache.gobblin.service.monitoring.JobStatus js1 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
+            .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1").processedCount(100)
+            .jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
+            .build();
+    org.apache.gobblin.service.monitoring.JobStatus js2 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
+            .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2").processedCount(200)
+            .jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+            .build();
+    org.apache.gobblin.service.monitoring.JobStatus fs1 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+            .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Flow message")
+            .issues(Collections.emptyList()).build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
     _listOfJobStatusLists = Lists.newArrayList();
     _listOfJobStatusLists.add(jobStatusList);
@@ -213,17 +237,23 @@ public class FlowStatusTest {
    */
   @Test
   public void testGetRunning() throws Exception {
-    org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
-        .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Test message 1").processedCount(100)
-        .jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
-    org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2")
-        .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
-    org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY)
-        .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Flow message").build();
+    org.apache.gobblin.service.monitoring.JobStatus js1 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
+            .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Test message 1").processedCount(100)
+            .jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
+            .build();
+    org.apache.gobblin.service.monitoring.JobStatus js2 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
+            .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2").processedCount(200)
+            .jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+            .build();
+    org.apache.gobblin.service.monitoring.JobStatus fs1 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY)
+            .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Flow message")
+            .issues(Collections.emptyList()).build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
     _listOfJobStatusLists = Lists.newArrayList();
     _listOfJobStatusLists.add(jobStatusList);
@@ -256,17 +286,23 @@ public class FlowStatusTest {
    */
   @Test
   public void testGetFailed() throws Exception {
-    org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1")
-        .processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
-    org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
-        .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Test message 2")
-        .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
-    org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
-        .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
-        .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Flow message").build();
+    org.apache.gobblin.service.monitoring.JobStatus js1 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
+            .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1").processedCount(100)
+            .jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
+            .build();
+    org.apache.gobblin.service.monitoring.JobStatus js2 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
+            .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Test message 2").processedCount(200)
+            .jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+            .build();
+    org.apache.gobblin.service.monitoring.JobStatus fs1 =
+        org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+            .jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+            .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Flow message")
+            .issues(Collections.emptyList()).build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
     _listOfJobStatusLists = Lists.newArrayList();
     _listOfJobStatusLists.add(jobStatusList);
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index f61fc96..83b86c2 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -21,8 +21,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.ObjectUtils;
+
 import com.google.common.base.Strings;
 import com.linkedin.data.template.SetMode;
+import com.linkedin.data.template.StringMap;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
@@ -33,6 +36,7 @@ import com.linkedin.restli.server.UpdateResponse;
 import javax.inject.Inject;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.runtime.troubleshooter.Issue;
 import org.apache.gobblin.service.monitoring.FlowStatus;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -147,7 +151,9 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
           .setExecutionStatus(ExecutionStatus.valueOf(queriedJobStatus.getEventName()))
           .setMessage(queriedJobStatus.getMessage())
           .setJobState(new JobState().setLowWatermark(queriedJobStatus.getLowWatermark()).
-              setHighWatermark(queriedJobStatus.getHighWatermark()));
+              setHighWatermark(queriedJobStatus.getHighWatermark()))
+          .setIssues(new IssueArray(queriedJobStatus.getIssues().stream()
+              .map(FlowExecutionResourceLocalHandler::convertIssueToRestApiObject).collect(Collectors.toList())));
 
       if (!Strings.isNullOrEmpty(queriedJobStatus.getMetrics())) {
         jobStatus.setMetrics(queriedJobStatus.getMetrics());
@@ -168,6 +174,24 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
         .setJobStatuses(jobStatusArray);
   }
 
+  private static org.apache.gobblin.service.Issue convertIssueToRestApiObject(Issue issues) {
+    org.apache.gobblin.service.Issue converted = new org.apache.gobblin.service.Issue();
+
+    converted.setCode(issues.getCode())
+        .setSummary(ObjectUtils.firstNonNull(issues.getSummary(), ""))
+        .setDetails(ObjectUtils.firstNonNull(issues.getDetails(), ""))
+        .setSeverity(IssueSeverity.valueOf(issues.getSeverity().name()))
+        .setTime(issues.getTime().toInstant().toEpochMilli());
+
+    if (issues.getProperties() != null) {
+      converted.setProperties(new StringMap(issues.getProperties()));
+    } else {
+      converted.setProperties(new StringMap());
+    }
+
+    return converted;
+  }
+
   /**
    * Return the flow start time given a {@link org.apache.gobblin.service.monitoring.FlowStatus}. Flow execution ID is
    * assumed to be the flow start time.
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 3fcaeda..46c1053 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -56,6 +56,7 @@ dependencies {
   compile externalDependency.calciteCore
   //compile externalDependency.calciteAvatica
   compile externalDependency.commonsCli
+  compile externalDependency.commonsCollections4
   compile externalDependency.commonsConfiguration
   compile externalDependency.commonsEmail
   compile externalDependency.commonsLang
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
new file mode 100644
index 0000000..6727f3c
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.collections4.map.LRUMap;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+/**
+ * Stores issues from multiple jobs, flows or other contexts in memory.
+ *
+ * To limit the memory consumption, it will keep only the last {@link #MAX_CONTEXT_COUNT} contexts,
+ * and older ones will be discarded.
+ * */
+@Singleton
+public class InMemoryMultiContextIssueRepository implements MultiContextIssueRepository {
+  public static final int DEFAULT_MAX_CONTEXT_COUNT = 100;
+
+  public static final String CONFIG_PREFIX = "gobblin.troubleshooter.inMemoryIssueRepository.";
+  public static final String MAX_CONTEXT_COUNT = CONFIG_PREFIX + "maxContextCount";
+  public static final String MAX_ISSUE_PER_CONTEXT = CONFIG_PREFIX + "maxIssuesPerContext";
+
+  private final LRUMap<String, InMemoryIssueRepository> contextIssues;
+  private final int maxIssuesPerContext;
+
+  public InMemoryMultiContextIssueRepository() {
+    this(ConfigFactory.empty());
+  }
+
+  @Inject
+  public InMemoryMultiContextIssueRepository(Config config) {
+    this(ConfigUtils.getInt(config, MAX_CONTEXT_COUNT, DEFAULT_MAX_CONTEXT_COUNT),
+         ConfigUtils.getInt(config, MAX_ISSUE_PER_CONTEXT, InMemoryIssueRepository.DEFAULT_MAX_SIZE));
+  }
+
+  public InMemoryMultiContextIssueRepository(int maxContextCount, int maxIssuesPerContext) {
+    contextIssues = new LRUMap<>(maxContextCount);
+    this.maxIssuesPerContext = maxIssuesPerContext;
+  }
+
+  @Override
+  public synchronized List<Issue> getAll(String contextId)
+      throws TroubleshooterException {
+
+    InMemoryIssueRepository issueRepository = contextIssues.getOrDefault(contextId, null);
+
+    if (issueRepository != null) {
+      return issueRepository.getAll();
+    }
+
+    return Collections.emptyList();
+  }
+
+  @Override
+  public synchronized void put(String contextId, Issue issue)
+      throws TroubleshooterException {
+
+    InMemoryIssueRepository issueRepository =
+        contextIssues.computeIfAbsent(contextId, s -> new InMemoryIssueRepository(maxIssuesPerContext));
+
+    issueRepository.put(issue);
+  }
+
+  @Override
+  public synchronized void remove(String contextId, String issueCode)
+      throws TroubleshooterException {
+
+    InMemoryIssueRepository issueRepository = contextIssues.getOrDefault(contextId, null);
+
+    if (issueRepository != null) {
+      issueRepository.remove(issueCode);
+    }
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
new file mode 100644
index 0000000..bccd01b
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.Objects;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.util.GsonUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ *  This class will forward issues received from job events to shared repository.
+ *
+ *  It will additionally log received issues, so that they can be processed by an analytical systems to determine
+ *  the overall platform health.
+ * */
+@Slf4j
+public class JobIssueEventHandler {
+
+  public static final String CONFIG_PREFIX = "gobblin.troubleshooter.jobIssueEventHandler.";
+  public static final String LOG_RECEIVED_EVENTS = CONFIG_PREFIX + "logReceiveEvents";
+
+  private static final Logger issueLogger =
+      LoggerFactory.getLogger("org.apache.gobblin.runtime.troubleshooter.JobIssueLogger");
+
+  private final MultiContextIssueRepository issueRepository;
+  private final boolean logReceiveEvents;
+
+  @Inject
+  public JobIssueEventHandler(MultiContextIssueRepository issueRepository, Config config) {
+    this(issueRepository, ConfigUtils.getBoolean(config, LOG_RECEIVED_EVENTS, true));
+  }
+
+  public JobIssueEventHandler(MultiContextIssueRepository issueRepository, boolean logReceiveEvents) {
+    this.issueRepository = Objects.requireNonNull(issueRepository);
+    this.logReceiveEvents = logReceiveEvents;
+  }
+
+  public void processEvent(GobblinTrackingEvent event) {
+    if (!IssueEventBuilder.isIssueEvent(event)) {
+      return;
+    }
+
+    String contextId;
+    try {
+      Properties metadataProperties = new Properties();
+      metadataProperties.putAll(event.getMetadata());
+      contextId = TroubleshooterUtils.getContextIdForJob(metadataProperties);
+    } catch (Exception ex) {
+      log.warn("Failed to extract context id from Gobblin tracking event with timestamp " + event.getTimestamp(), ex);
+      return;
+    }
+
+    IssueEventBuilder issueEvent = IssueEventBuilder.fromEvent(event);
+
+    try {
+      issueRepository.put(contextId, issueEvent.getIssue());
+    } catch (TroubleshooterException e) {
+      log.warn("Failed to save issue to repository. Issue code: " + issueEvent.getIssue().getCode(), e);
+    }
+
+    if (logReceiveEvents) {
+      logEvent(issueEvent);
+    }
+  }
+
+  private void logEvent(IssueEventBuilder issueEvent) {
+    ImmutableMap<String, String> metadata = issueEvent.getMetadata();
+
+    JobIssueLogEntry logEntry = new JobIssueLogEntry();
+    logEntry.issue = issueEvent.getIssue();
+
+    logEntry.flowGroup = metadata.getOrDefault(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, null);
+    logEntry.flowName = metadata.getOrDefault(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, null);
+    logEntry.flowExecutionId = metadata.getOrDefault(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, null);
+    logEntry.jobName = metadata.getOrDefault(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, null);
+
+    String serializedIssueEvent = GsonUtils.GSON_WITH_DATE_HANDLING.toJson(logEntry);
+    issueLogger.info(serializedIssueEvent);
+  }
+
+  private static class JobIssueLogEntry {
+    String flowName;
+    String flowGroup;
+    String flowExecutionId;
+    String jobName;
+    Issue issue;
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
new file mode 100644
index 0000000..e3eafbd
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.List;
+
+/**
+ * Stores issues from multiple jobs, flows and other contexts
+ *
+ * @see AutomaticTroubleshooter
+ * */
+public interface MultiContextIssueRepository {
+
+  List<Issue> getAll(String contextId)
+      throws TroubleshooterException;
+
+  void put(String contextId, Issue issue)
+      throws TroubleshooterException;
+
+  void remove(String contextId, String issueCode)
+      throws TroubleshooterException;
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/TroubleshooterUtils.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/TroubleshooterUtils.java
new file mode 100644
index 0000000..5c7c980
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/TroubleshooterUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.Properties;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+public final class TroubleshooterUtils {
+
+  private TroubleshooterUtils() {
+  }
+
+  public static String getContextIdForJob(String flowGroup, String flowName, String flowExecutionId, String jobName) {
+    return flowGroup + ":" + flowName + ":" + flowExecutionId + ":" + jobName;
+  }
+
+  public static String getContextIdForJob(Properties props) {
+    return getContextIdForJob(props.getProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD),
+                              props.getProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD),
+                              props.getProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD),
+                              props.getProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD));
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index e1ebb64..9a3e58e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -17,10 +17,14 @@
 
 package org.apache.gobblin.service.monitoring;
 
+import java.util.List;
+
 import lombok.Builder;
 import lombok.Getter;
 import lombok.Setter;
 
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+
 
 /**
  * Contains attributes that describe job status.
@@ -48,4 +52,5 @@ public class JobStatus {
   private final int maxAttempts;
   private final int currentAttempts;
   private final boolean shouldRetry;
+  private final List<Issue> issues;
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 668824b..47d4834 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -17,25 +17,33 @@
 
 package org.apache.gobblin.service.monitoring;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 
 import com.google.common.collect.Iterators;
 import com.typesafe.config.ConfigFactory;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
  * Retriever for {@link JobStatus}.
  */
+@Slf4j
 public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker {
   public static final String EVENT_NAME_FIELD = "eventName";
   public static final String NA_KEY = "NA";
@@ -43,8 +51,11 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
   @Getter
   protected final MetricContext metricContext;
 
-  protected JobStatusRetriever() {
+  private final MultiContextIssueRepository issueRepository;
+
+  protected JobStatusRetriever(MultiContextIssueRepository issueRepository) {
     this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
+    this.issueRepository = Objects.requireNonNull(issueRepository);
   }
 
   public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
@@ -95,11 +106,21 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
     int currentAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, "1"));
     boolean shouldRetry = Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, "false"));
 
+
+    List<Issue> issues;
+    try {
+      String contextId = TroubleshooterUtils.getContextIdForJob(jobState.getProperties());
+      issues = issueRepository.getAll(contextId);
+    } catch (TroubleshooterException e) {
+      log.warn("Cannot retrieve job issues", e);
+      issues = Collections.emptyList();
+    }
+
     return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
         jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
         lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
         message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
-        shouldRetry(shouldRetry).build();
+        shouldRetry(shouldRetry).issues(issues).build();
   }
 
   public abstract StateStore<State> getStateStore();
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java
new file mode 100644
index 0000000..f4f3e97
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.List;
+
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+
+@Test
+public class InMemoryMultiContextIssueRepositoryTest extends MultiContextIssueRepositoryTest {
+
+  @Test
+  public void willHaveCapacityLimit()
+      throws Exception {
+
+    int jobCount = 100;
+    int jobCapacity = 50;
+
+    MultiContextIssueRepository repository = new InMemoryMultiContextIssueRepository(50, 10);
+
+    for (int j = 0; j < jobCount; j++) {
+      repository.put("job" + j, getTestIssue("issue 1", "code1"));
+    }
+
+    for (int j = 0; j < jobCount; j++) {
+      List<Issue> retrievedIssues = repository.getAll("job" + j);
+
+      if (j < jobCapacity) {
+        assertEquals(0, retrievedIssues.size()); // expecting empty list for non-existent jobs
+      } else {
+        assertEquals(1, retrievedIssues.size());
+      }
+    }
+  }
+
+  @Override
+  protected MultiContextIssueRepository getRepository() {
+    return new InMemoryMultiContextIssueRepository();
+  }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
new file mode 100644
index 0000000..b479935
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+
+public class JobIssueEventHandlerTest {
+
+  @Test
+  public void canHandleIssue()
+      throws Exception {
+    MultiContextIssueRepository issueRepository = mock(MultiContextIssueRepository.class);
+    JobIssueEventHandler eventHandler = new JobIssueEventHandler(issueRepository, true);
+
+    IssueEventBuilder eventBuilder = new IssueEventBuilder("TestJob");
+    eventBuilder.setIssue(getTestIssue("test issue", "code1"));
+    eventBuilder.addMetadata(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, "test-group");
+    eventBuilder.addMetadata(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, "test-flow");
+    eventBuilder.addMetadata(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, "1234");
+    eventBuilder.addMetadata(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, "test-job");
+
+    eventHandler.processEvent(eventBuilder.build());
+
+    verify(issueRepository).put(any(), any());
+  }
+
+  private Issue getTestIssue(String summary, String code) {
+    return Issue.builder().summary(summary).code(code).build();
+  }
+}
\ No newline at end of file
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java
new file mode 100644
index 0000000..a524408
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.List;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public abstract class MultiContextIssueRepositoryTest {
+
+  @Test
+  public void canPutIssue()
+      throws Exception {
+    MultiContextIssueRepository repository = getRepository();
+
+    Issue testIssue = getTestIssue("first", "code1");
+    repository.put("job1", testIssue);
+
+    List<Issue> issues = repository.getAll("job1");
+    assertEquals(1, issues.size());
+    assertEquals(testIssue, issues.get(0));
+  }
+
+  @Test
+  public void canPutMultipleJobIssue()
+      throws Exception {
+    MultiContextIssueRepository repository = getRepository();
+
+    repository.put("job1", getTestIssue("first", "code1"));
+    repository.put("job1", getTestIssue("second", "code2"));
+
+    List<Issue> issues = repository.getAll("job1");
+    assertEquals(2, issues.size());
+  }
+
+  @Test
+  public void canWorkWithMultipleJobs()
+      throws Exception {
+    MultiContextIssueRepository repository = getRepository();
+
+    Issue job1Issue1 = getTestIssue("first", "code1");
+    Issue job2Issue1 = getTestIssue("first", "code1");
+    Issue job2Issue2 = getTestIssue("second", "code2");
+
+    repository.put("job1", job1Issue1);
+    repository.put("job2", job2Issue1);
+    repository.put("job2", job2Issue2);
+
+    assertEquals(1, repository.getAll("job1").size());
+    assertEquals(2, repository.getAll("job2").size());
+    assertEquals(2, repository.getAll("job2").size());
+
+    assertEquals(job1Issue1, repository.getAll("job1").get(0));
+  }
+
+  @Test
+  public void canRemoveIssue()
+      throws Exception {
+    MultiContextIssueRepository repository = getRepository();
+
+    repository.put("job1", getTestIssue("first", "code1"));
+    repository.put("job1", getTestIssue("second", "code2"));
+
+    repository.remove("job1", "code1");
+    List<Issue> issues = repository.getAll("job1");
+    assertEquals(1, issues.size());
+    assertEquals("code2", issues.get(0).getCode());
+  }
+
+  @Test
+  public void willPreserveIssueInsertionOrder()
+      throws Exception {
+
+    int jobCount = 10;
+    int issueCount = 50;
+
+    MultiContextIssueRepository repository = getRepository();
+
+    for (int j = 0; j < jobCount; j++) {
+      for (int i = 0; i < issueCount; i++) {
+        repository.put("job" + j, getTestIssue("issue " + i, String.valueOf(i)));
+      }
+    }
+
+    for (int j = 0; j < jobCount; j++) {
+      List<Issue> retrievedIssues = repository.getAll("job" + j);
+      for (int i = 0; i < issueCount; i++) {
+        assertEquals(String.valueOf(i), retrievedIssues.get(i).getCode());
+      }
+    }
+  }
+
+  protected abstract MultiContextIssueRepository getRepository();
+
+  protected Issue getTestIssue(String summary, String code) {
+    return Issue.builder().summary(summary).code(code).build();
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index a4d1458..91aeb64 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -41,6 +41,9 @@ import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
 import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
 import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
@@ -207,6 +210,10 @@ public class GobblinServiceGuiceModule implements Module {
 
     binder.bind(GobblinServiceManager.class);
 
+    binder.bind(MultiContextIssueRepository.class).to(InMemoryMultiContextIssueRepository.class);
+
+    binder.bind(JobIssueEventHandler.class);
+
     LOGGER.info("Bindings configured");
   }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index 85ee51a..21abc4e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Iterators;
 import com.typesafe.config.Config;
 
 import javax.inject.Inject;
+import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -40,6 +41,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
 import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 
 
 /**
@@ -48,6 +50,7 @@ import org.apache.gobblin.metastore.FsStateStore;
  * The store name is set to flowGroup.flowName, while the table name is set to flowExecutionId.jobGroup.jobName.
  */
 @Slf4j
+@Singleton
 public class FsJobStatusRetriever extends JobStatusRetriever {
   public static final String CONF_PREFIX = "fsJobStatusRetriever";
 
@@ -55,7 +58,8 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
   private final FileContextBasedFsStateStore<State> stateStore;
 
   @Inject
-  public FsJobStatusRetriever(Config config) {
+  public FsJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) {
+    super(issueRepository);
     this.stateStore = (FileContextBasedFsStateStore<State>) new FileContextBasedFsStateStoreFactory().
         createStateStore(config.getConfig(CONF_PREFIX), State.class);
   }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index def4943..6e47ca0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -29,6 +29,7 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 
 import com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
 import com.typesafe.config.Config;
 
 import lombok.Getter;
@@ -43,6 +44,7 @@ import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory;
 import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -62,9 +64,11 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
   @Getter
   private Meter messageParseFailures;
 
-  public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
+  public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
+      JobIssueEventHandler jobIssueEventHandler)
       throws IOException, ReflectiveOperationException {
-    super(topic, config, numThreads);
+    super(topic, config, numThreads,  jobIssueEventHandler);
+
     if (ConfigUtils.getBoolean(config, ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
       KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new KafkaAvroSchemaRegistryFactory().
           create(ConfigUtils.configToProperties(config));
@@ -86,13 +90,14 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
   }
 
   @Override
-  public org.apache.gobblin.configuration.State parseJobStatus(DecodeableKafkaRecord<byte[],byte[]> message) {
+  @VisibleForTesting
+  public GobblinTrackingEvent deserializeEvent(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
       InputStream is = new ByteArrayInputStream(message.getValue());
       schemaVersionWriter.readSchemaVersioningInformation(new DataInputStream(is));
       Decoder decoder = DecoderFactory.get().binaryDecoder(is, this.decoder.get());
-      GobblinTrackingEvent decodedMessage = this.reader.get().read(null, decoder);
-      return parseJobStatus(decodedMessage);
+
+      return this.reader.get().read(null, decoder);
     } catch (Exception exc) {
       this.messageParseFailures.mark();
       if (this.messageParseFailures.getFiveMinuteRate() < 1) {
@@ -109,7 +114,9 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
    * @param event an instance of {@link GobblinTrackingEvent}
    * @return job status as an instance of {@link org.apache.gobblin.configuration.State}
    */
-  private org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEvent event) {
+  @Override
+  @VisibleForTesting
+  public org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEvent event) {
     if (!acceptEvent(event)) {
       return null;
     }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 38a7f6b..9792b83 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -43,11 +43,14 @@ import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
 import org.apache.gobblin.runtime.retention.DatasetCleanerTask;
+import org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
+import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.source.workunit.WorkUnit;
@@ -67,6 +70,9 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   public static final String GET_AND_SET_JOB_STATUS = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
       JOB_STATUS_MONITOR_PREFIX,  "getAndSetJobStatus");
 
+  private static final String PROCESS_JOB_ISSUE = MetricRegistry
+      .name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, JOB_STATUS_MONITOR_PREFIX, "jobIssueProcessingTime");
+
   static final String JOB_STATUS_MONITOR_TOPIC_KEY = "topic";
   static final String JOB_STATUS_MONITOR_NUM_THREADS_KEY = "numThreads";
   static final String JOB_STATUS_MONITOR_CLASS_KEY = "class";
@@ -86,7 +92,9 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
           ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE,
           ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
 
-  public KafkaJobStatusMonitor(String topic, Config config, int numThreads)
+  private final JobIssueEventHandler jobIssueEventHandler;
+
+  public KafkaJobStatusMonitor(String topic, Config config, int numThreads, JobIssueEventHandler jobIssueEventHandler)
       throws ReflectiveOperationException {
     super(topic, config.withFallback(DEFAULTS), numThreads);
     String stateStoreFactoryClass = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, FileContextBasedFsStateStoreFactory.class.getName());
@@ -94,6 +102,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
     this.stateStore =
         ((StateStore.Factory) Class.forName(stateStoreFactoryClass).newInstance()).createStateStore(config, org.apache.gobblin.configuration.State.class);
     this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
+
+    this.jobIssueEventHandler = jobIssueEventHandler;
   }
 
   @Override
@@ -128,7 +138,19 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
-      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message);
+      GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
+
+      if (gobblinTrackingEvent == null) {
+        return;
+      }
+
+      if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
+        try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
+          jobIssueEventHandler.processEvent(gobblinTrackingEvent);
+        }
+      }
+
+      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(gobblinTrackingEvent);
       if (jobStatus != null) {
         try(Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
           addJobStatusToStateStore(jobStatus, this.stateStore);
@@ -231,6 +253,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
     return Long.parseLong(Splitter.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
   }
 
-  public abstract org.apache.gobblin.configuration.State parseJobStatus(DecodeableKafkaRecord<byte[],byte[]> message);
+  protected abstract GobblinTrackingEvent deserializeEvent(DecodeableKafkaRecord<byte[],byte[]> message);
+
+  protected abstract org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEvent event);
 
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 741a246..7f6fc51 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.service.monitoring;
 
+import java.util.Objects;
+
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
@@ -28,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -40,11 +43,13 @@ public class KafkaJobStatusMonitorFactory implements Provider<KafkaJobStatusMoni
   private static final String KAFKA_SSL_CONFIG_PREFIX_KEY = "jobStatusMonitor.kafka.config";
   private static final String DEFAULT_KAFKA_SSL_CONFIG_PREFIX = "metrics.reporting.kafka.config";
 
-  Config config;
+  private final Config config;
+  private final JobIssueEventHandler jobIssueEventHandler;
 
   @Inject
-  public KafkaJobStatusMonitorFactory(Config config) {
-    this.config = config;
+  public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler jobIssueEventHandler) {
+    this.config = Objects.requireNonNull(config);
+    this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
   }
 
   private KafkaJobStatusMonitor createJobStatusMonitor()
@@ -71,7 +76,8 @@ public class KafkaJobStatusMonitorFactory implements Provider<KafkaJobStatusMoni
           config.getValue(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE));
     }
     jobStatusConfig = jobStatusConfig.withFallback(kafkaSslConfig).withFallback(schemaRegistryConfig);
-    return (KafkaJobStatusMonitor) GobblinConstructorUtils.invokeLongestConstructor(jobStatusMonitorClass, topic, jobStatusConfig, numThreads);
+    return (KafkaJobStatusMonitor) GobblinConstructorUtils
+        .invokeLongestConstructor(jobStatusMonitorClass, topic, jobStatusConfig, numThreads, jobIssueEventHandler);
   }
 
   @Override
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
index c9bb3e1..3d1cfc0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -17,10 +17,6 @@
 
 package org.apache.gobblin.service.monitoring;
 
-import com.google.common.base.Preconditions;
-
-import com.google.common.collect.Iterators;
-import com.typesafe.config.Config;
 import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -28,18 +24,26 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecProducer;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
 
-import lombok.extern.slf4j.Slf4j;
-
 /**
  * A job status monitor for jobs completed by a Gobblin Standalone instance running on the same machine. Mainly used for sandboxing/testing
  * Considers a job done when Gobblin standalone appends ".done" to the job. Otherwise it will assume the job is in progress
  */
 @Slf4j
+@Singleton
 public class LocalFsJobStatusRetriever extends JobStatusRetriever {
 
   public static final String CONF_PREFIX = "localFsJobStatusRetriever.";
@@ -47,7 +51,9 @@ public class LocalFsJobStatusRetriever extends JobStatusRetriever {
   private String specProducerPath;
 
   // Do not use a state store for this implementation, just look at the job folder that @LocalFsSpecProducer writes to
-  public LocalFsJobStatusRetriever(Config config) {
+  @Inject
+  public LocalFsJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) {
+    super(issueRepository);
     this.specProducerPath = config.getString(CONF_PREFIX + LocalFsSpecProducer.LOCAL_FS_PRODUCER_PATH_KEY);
   }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index a7a760c..6fc02ff 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -29,6 +29,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.typesafe.config.Config;
 
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import lombok.Getter;
 
 import org.apache.gobblin.configuration.State;
@@ -36,11 +38,13 @@ import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 
 
 /**
  * Mysql based Retriever for {@link JobStatus}.
  */
+@Singleton
 public class MysqlJobStatusRetriever extends JobStatusRetriever {
   public static final String MYSQL_JOB_STATUS_RETRIEVER_PREFIX = "mysqlJobStatusRetriever";
   public static final String GET_LATEST_JOB_STATUS_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -53,7 +57,9 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
   @Getter
   private MysqlJobStatusStateStore<State> stateStore;
 
-  public MysqlJobStatusRetriever(Config config) throws ReflectiveOperationException {
+  @Inject
+  public MysqlJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) throws ReflectiveOperationException {
+    super(issueRepository);
     config = config.getConfig(MYSQL_JOB_STATUS_RETRIEVER_PREFIX).withFallback(config);
     this.stateStore = (MysqlJobStatusStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
   }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index f9dffc0..d0595a4 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -17,16 +17,22 @@
 
 package org.apache.gobblin.service.monitoring;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
 import java.io.File;
 import java.io.IOException;
+
 import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+
+import static org.mockito.Mockito.mock;
+
 
 public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
 
@@ -37,7 +43,7 @@ public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
     cleanUpDir();
     Config config = ConfigFactory.empty().withValue(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
         ConfigValueFactory.fromAnyRef(stateStoreDir));
-    this.jobStatusRetriever = new FsJobStatusRetriever(config);
+    this.jobStatusRetriever = new FsJobStatusRetriever(config, mock(MultiContextIssueRepository.class));
   }
 
   @Test
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index ac924fb..4646ef2 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -21,12 +21,12 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Properties;
 
-import com.google.common.base.Strings;
-
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Strings;
+
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -34,9 +34,12 @@ import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
 
+import static org.mockito.Mockito.mock;
+
 
 public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
   private MysqlJobStatusStateStore dbJobStateStore;
@@ -54,7 +57,8 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
     configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
     configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
 
-    this.jobStatusRetriever = new MysqlJobStatusRetriever(configBuilder.build());
+    this.jobStatusRetriever =
+        new MysqlJobStatusRetriever(configBuilder.build(), mock(MultiContextIssueRepository.class));
     this.dbJobStateStore = ((MysqlJobStatusRetriever) this.jobStatusRetriever).getStateStore();
     cleanUpDir();
   }
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 9e54cc3..4a7c480 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -38,6 +38,7 @@ ext.externalDependency = [
     "commonsEmail": "org.apache.commons:commons-email:1.4",
     "commonsLang": "commons-lang:commons-lang:2.6",
     "commonsLang3": "org.apache.commons:commons-lang3:3.4",
+    "commonsCollections4": "org.apache.commons:commons-collections4:4.4",
     "commonsConfiguration": "commons-configuration:commons-configuration:1.10",
     "commonsIo": "commons-io:commons-io:2.5",
     "commonsMath": "org.apache.commons:commons-math3:3.5",