You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/07/15 18:38:10 UTC
[gobblin] branch master updated: [GOBBLIN-1457] Add automatic
troubleshooter to Gobblin service (#3299)
This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b5e32b1 [GOBBLIN-1457] Add automatic troubleshooter to Gobblin service (#3299)
b5e32b1 is described below
commit b5e32b158bd713c45caa5b4b8866f8a4c4fc3a29
Author: Alex Prokofiev <ap...@linkedin.com>
AuthorDate: Thu Jul 15 11:37:59 2021 -0700
[GOBBLIN-1457] Add automatic troubleshooter to Gobblin service (#3299)
In previous commits, we've added automatic troubleshooting to Gobblin
Azkaban jobs, and here we will collect and expose discovered issues
in Gobblin service.
Initial implementation will store issues for a limited number of jobs in
memory, and future commits will add persistence.
---
.../runtime/KafkaAvroJobStatusMonitorTest.java | 8 +-
.../pegasus/org/apache/gobblin/service/Issue.pdl | 46 +++++++
.../org/apache/gobblin/service/IssueSeverity.pdl | 9 ++
.../org/apache/gobblin/service/JobStatus.pdl | 7 ++
.../org/apache/gobblin/service/Timestamp.pdl | 8 ++
...he.gobblin.service.flowexecutions.snapshot.json | 50 +++++++-
...ache.gobblin.service.flowstatuses.snapshot.json | 50 +++++++-
.../org/apache/gobblin/service/FlowStatusTest.java | 140 +++++++++++++--------
.../service/FlowExecutionResourceLocalHandler.java | 26 +++-
gobblin-runtime/build.gradle | 1 +
.../InMemoryMultiContextIssueRepository.java | 98 +++++++++++++++
.../troubleshooter/JobIssueEventHandler.java | 116 +++++++++++++++++
.../MultiContextIssueRepository.java | 38 ++++++
.../troubleshooter/TroubleshooterUtils.java | 40 ++++++
.../gobblin/service/monitoring/JobStatus.java | 5 +
.../service/monitoring/JobStatusRetriever.java | 25 +++-
.../InMemoryMultiContextIssueRepositoryTest.java | 58 +++++++++
.../troubleshooter/JobIssueEventHandlerTest.java | 52 ++++++++
.../MultiContextIssueRepositoryTest.java | 116 +++++++++++++++++
.../modules/core/GobblinServiceGuiceModule.java | 7 ++
.../service/monitoring/FsJobStatusRetriever.java | 6 +-
.../monitoring/KafkaAvroJobStatusMonitor.java | 19 ++-
.../service/monitoring/KafkaJobStatusMonitor.java | 30 ++++-
.../monitoring/KafkaJobStatusMonitorFactory.java | 14 ++-
.../monitoring/LocalFsJobStatusRetriever.java | 20 +--
.../monitoring/MysqlJobStatusRetriever.java | 8 +-
.../monitoring/FsJobStatusRetrieverTest.java | 16 ++-
.../monitoring/MysqlJobStatusRetrieverTest.java | 10 +-
gradle/scripts/dependencyDefinitions.gradle | 1 +
29 files changed, 935 insertions(+), 89 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index c443b42..f0b5d23 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -55,11 +55,14 @@ import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import static org.mockito.Mockito.mock;
+
public class KafkaAvroJobStatusMonitorTest {
public static final String TOPIC = KafkaAvroJobStatusMonitorTest.class.getSimpleName();
@@ -176,7 +179,8 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
messageAndMetadata = iterator.next();
- Assert.assertNull(jobStatusMonitor.parseJobStatus(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata)));
+ Assert.assertNull(jobStatusMonitor.parseJobStatus(
+ jobStatusMonitor.deserializeEvent(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata))));
// Check that state didn't get set to running since it was already complete
messageAndMetadata = iterator.next();
@@ -465,7 +469,7 @@ public class KafkaAvroJobStatusMonitorTest {
public MockKafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
throws IOException, ReflectiveOperationException {
- super(topic, config, numThreads);
+ super(topic, config, numThreads, mock(JobIssueEventHandler.class));
}
@Override
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Issue.pdl b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Issue.pdl
new file mode 100644
index 0000000..47cc512
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Issue.pdl
@@ -0,0 +1,46 @@
+namespace org.apache.gobblin.service
+
+/**
+ * Issue describes a specific unique problem in the job or application.
+ *
+ * Issue can be generated from log entries, health checks, and other places.
+ */
+record Issue {
+
+ /**
+ * Time when the issue have occured
+ */
+ time: Timestamp
+
+ /**
+ * Severity from DEBUG to FATAL
+ */
+ severity: IssueSeverity
+
+ /**
+ * Unique machine-readable code that identifies a specific problem.
+ *
+ * It can be used for making programmatic decisions on how to handle and recover from this issue.
+ *
+ * Issues representing the same kind of problem will have the same code.
+ * */
+ code: string
+
+ /**
+ * Short, human-readable description of the issue.
+ *
+ * It should focus on what is the root cause of the problem, and what steps the user should do to resolve it.
+ */
+ summary: string
+
+ /**
+ * Human-readable issue details that can include exception stack trace and additional information about the problem.
+ */
+ details: string
+
+ /**
+ * Additional machine-readable properties of the issue.
+ *
+ */
+ properties: map[string,string]
+}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/IssueSeverity.pdl b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/IssueSeverity.pdl
new file mode 100644
index 0000000..b2b2ed4
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/IssueSeverity.pdl
@@ -0,0 +1,9 @@
+namespace org.apache.gobblin.service
+
+enum IssueSeverity {
+ DEBUG,
+ INFO,
+ WARN,
+ ERROR,
+ FATAL
+}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdl b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdl
index 615f6cf..1c64b8d 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdl
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatus.pdl
@@ -44,4 +44,11 @@ record JobStatus {
* Job state that is updated only at the start and end of a job execution.
*/
jobState: JobState
+
+ /**
+ * Collection of issues for the job.
+ *
+ * Can include various errors and warnings that will help the user to troubleshoot problems.
+ * */
+ issues: array[Issue]
}
\ No newline at end of file
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Timestamp.pdl b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Timestamp.pdl
new file mode 100644
index 0000000..2cb068c
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/Timestamp.pdl
@@ -0,0 +1,8 @@
+namespace org.apache.gobblin.service
+
+/**
+ * Epoch/UNIX time in milliseconds
+ *
+ * Represents the number of milliseconds since the epoch of 1970-01-01T00:00:00Z
+ */
+typeref Timestamp = long
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 734c2ac..44eb865 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -198,12 +198,60 @@
} ]
},
"doc" : "Job state that is updated only at the start and end of a job execution."
+ }, {
+ "name" : "issues",
+ "type" : {
+ "type" : "array",
+ "items" : {
+ "type" : "record",
+ "name" : "Issue",
+ "doc" : "Issue describes a specific unique problem in the job or application.\n\nIssue can be generated from log entries, health checks, and other places.",
+ "fields" : [ {
+ "name" : "time",
+ "type" : {
+ "type" : "typeref",
+ "name" : "Timestamp",
+ "doc" : "Epoch/UNIX time in milliseconds\n\nRepresents the number of milliseconds since the epoch of 1970-01-01T00:00:00Z",
+ "ref" : "long"
+ },
+ "doc" : "Time when the issue have occured"
+ }, {
+ "name" : "severity",
+ "type" : {
+ "type" : "enum",
+ "name" : "IssueSeverity",
+ "symbols" : [ "DEBUG", "INFO", "WARN", "ERROR", "FATAL" ]
+ },
+ "doc" : "Severity from DEBUG to FATAL"
+ }, {
+ "name" : "code",
+ "type" : "string",
+ "doc" : "Unique machine-readable code that identifies a specific problem.\n\nIt can be used for making programmatic decisions on how to handle and recover from this issue.\n\nIssues representing the same kind of problem will have the same code.\n"
+ }, {
+ "name" : "summary",
+ "type" : "string",
+ "doc" : "Short, human-readable description of the issue.\n\nIt should focus on what is the root cause of the problem, and what steps the user should do to resolve it."
+ }, {
+ "name" : "details",
+ "type" : "string",
+ "doc" : "Human-readable issue details that can include exception stack trace and additional information about the problem."
+ }, {
+ "name" : "properties",
+ "type" : {
+ "type" : "map",
+ "values" : "string"
+ },
+ "doc" : "Additional machine-readable properties of the issue.\n"
+ } ]
+ }
+ },
+ "doc" : "Collection of issues for the job.\n\nCan include various errors and warnings that will help the user to troubleshoot problems.\n"
} ]
}
},
"doc" : "Status of jobs belonging to the flow"
} ]
- }, "org.apache.gobblin.service.FlowId", "org.apache.gobblin.service.FlowStatistics", "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus" ],
+ }, "org.apache.gobblin.service.FlowId", "org.apache.gobblin.service.FlowStatistics", "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.Issue", "org.apache.gobblin.service.IssueSeverity", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus", "org.apache.gobblin.service.Timestamp" ],
"schema" : {
"name" : "flowexecutions",
"namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index d3d4dbc..69b7ad4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -200,13 +200,61 @@
} ]
},
"doc" : "Job state that is updated only at the start and end of a job execution."
+ }, {
+ "name" : "issues",
+ "type" : {
+ "type" : "array",
+ "items" : {
+ "type" : "record",
+ "name" : "Issue",
+ "doc" : "Issue describes a specific unique problem in the job or application.\n\nIssue can be generated from log entries, health checks, and other places.",
+ "fields" : [ {
+ "name" : "time",
+ "type" : {
+ "type" : "typeref",
+ "name" : "Timestamp",
+ "doc" : "Epoch/UNIX time in milliseconds\n\nRepresents the number of milliseconds since the epoch of 1970-01-01T00:00:00Z",
+ "ref" : "long"
+ },
+ "doc" : "Time when the issue have occured"
+ }, {
+ "name" : "severity",
+ "type" : {
+ "type" : "enum",
+ "name" : "IssueSeverity",
+ "symbols" : [ "DEBUG", "INFO", "WARN", "ERROR", "FATAL" ]
+ },
+ "doc" : "Severity from DEBUG to FATAL"
+ }, {
+ "name" : "code",
+ "type" : "string",
+ "doc" : "Unique machine-readable code that identifies a specific problem.\n\nIt can be used for making programmatic decisions on how to handle and recover from this issue.\n\nIssues representing the same kind of problem will have the same code.\n"
+ }, {
+ "name" : "summary",
+ "type" : "string",
+ "doc" : "Short, human-readable description of the issue.\n\nIt should focus on what is the root cause of the problem, and what steps the user should do to resolve it."
+ }, {
+ "name" : "details",
+ "type" : "string",
+ "doc" : "Human-readable issue details that can include exception stack trace and additional information about the problem."
+ }, {
+ "name" : "properties",
+ "type" : {
+ "type" : "map",
+ "values" : "string"
+ },
+ "doc" : "Additional machine-readable properties of the issue.\n"
+ } ]
+ }
+ },
+ "doc" : "Collection of issues for the job.\n\nCan include various errors and warnings that will help the user to troubleshoot problems.\n"
} ]
}
},
"doc" : "Status of jobs belonging to the flow"
} ],
"deprecated" : "Use FlowExecution instead"
- }, "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus" ],
+ }, "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.Issue", "org.apache.gobblin.service.IssueSeverity", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus", "org.apache.gobblin.service.Timestamp" ],
"schema" : {
"name" : "flowstatuses",
"namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index 16e08fc..da81dd9 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -39,9 +39,13 @@ import com.linkedin.restli.server.resources.BaseResource;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.restli.EmbeddedRestliServer;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import static org.mockito.Mockito.mock;
+
+
@Test(groups = { "gobblin.service" }, singleThreaded = true)
public class FlowStatusTest {
private FlowStatusClient _client;
@@ -49,6 +53,11 @@ public class FlowStatusTest {
private List<List<org.apache.gobblin.service.monitoring.JobStatus>> _listOfJobStatusLists;
class TestJobStatusRetriever extends JobStatusRetriever {
+
+ protected TestJobStatusRetriever(MultiContextIssueRepository issueRepository) {
+ super(issueRepository);
+ }
+
@Override
public Iterator<org.apache.gobblin.service.monitoring.JobStatus> getJobStatusesForFlowExecution(String flowName,
String flowGroup, long flowExecutionId) {
@@ -81,7 +90,7 @@ public class FlowStatusTest {
@BeforeClass
public void setUp() throws Exception {
- JobStatusRetriever jobStatusRetriever = new TestJobStatusRetriever();
+ JobStatusRetriever jobStatusRetriever = new TestJobStatusRetriever(mock(MultiContextIssueRepository.class));
final FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
Injector injector = Guice.createInjector(new Module() {
@@ -108,24 +117,33 @@ public class FlowStatusTest {
*/
@Test
public void testFindLatest() throws Exception {
- org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1")
- .processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
- org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(5000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).build();
- org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup("jgroup1").jobName("job1").jobTag("dataset1").startTime(2000L).endTime(6000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 2")
- .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
- org.apache.gobblin.service.monitoring.JobStatus js3 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup("jgroup1").jobName("job2").jobTag("dataset2").startTime(2000L).endTime(6000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 3")
- .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
- org.apache.gobblin.service.monitoring.JobStatus fs2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Flow message").build();
+ org.apache.gobblin.service.monitoring.JobStatus js1 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1").processedCount(100)
+ .jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
+ .build();
+ org.apache.gobblin.service.monitoring.JobStatus fs1 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(5000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).issues(Collections.emptyList()).build();
+ org.apache.gobblin.service.monitoring.JobStatus js2 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup("jgroup1").jobName("job1").jobTag("dataset1").startTime(2000L).endTime(6000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 2").processedCount(200)
+ .jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+ .build();
+ org.apache.gobblin.service.monitoring.JobStatus js3 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup("jgroup1").jobName("job2").jobTag("dataset2").startTime(2000L).endTime(6000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 3").processedCount(200)
+ .jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+ .build();
+ org.apache.gobblin.service.monitoring.JobStatus fs2 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Flow message")
+ .issues(Collections.emptyList()).build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 = Lists.newArrayList(js1, fs1);
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = Lists.newArrayList(js2, js3, fs2);
_listOfJobStatusLists = Lists.newArrayList();
@@ -170,17 +188,23 @@ public class FlowStatusTest {
*/
@Test
public void testGetCompleted() throws Exception {
- org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1")
- .processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
- org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2")
- .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
- org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Flow message").build();
+ org.apache.gobblin.service.monitoring.JobStatus js1 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1").processedCount(100)
+ .jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
+ .build();
+ org.apache.gobblin.service.monitoring.JobStatus js2 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2").processedCount(200)
+ .jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+ .build();
+ org.apache.gobblin.service.monitoring.JobStatus fs1 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Flow message")
+ .issues(Collections.emptyList()).build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
@@ -213,17 +237,23 @@ public class FlowStatusTest {
*/
@Test
public void testGetRunning() throws Exception {
- org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
- .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Test message 1").processedCount(100)
- .jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
- org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2")
- .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
- org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY)
- .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Flow message").build();
+ org.apache.gobblin.service.monitoring.JobStatus js1 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
+ .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Test message 1").processedCount(100)
+ .jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
+ .build();
+ org.apache.gobblin.service.monitoring.JobStatus js2 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2").processedCount(200)
+ .jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+ .build();
+ org.apache.gobblin.service.monitoring.JobStatus fs1 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY)
+ .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Flow message")
+ .issues(Collections.emptyList()).build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
@@ -256,17 +286,23 @@ public class FlowStatusTest {
*/
@Test
public void testGetFailed() throws Exception {
- org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1")
- .processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
- org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
- .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Test message 2")
- .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
- org.apache.gobblin.service.monitoring.JobStatus fs1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
- .flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
- .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Flow message").build();
+ org.apache.gobblin.service.monitoring.JobStatus js1 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1").processedCount(100)
+ .jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
+ .build();
+ org.apache.gobblin.service.monitoring.JobStatus js2 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
+ .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Test message 2").processedCount(200)
+ .jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+ .build();
+ org.apache.gobblin.service.monitoring.JobStatus fs1 =
+ org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
+ .jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
+ .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Flow message")
+ .issues(Collections.emptyList()).build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index f61fc96..83b86c2 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -21,8 +21,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.ObjectUtils;
+
import com.google.common.base.Strings;
import com.linkedin.data.template.SetMode;
+import com.linkedin.data.template.StringMap;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
@@ -33,6 +36,7 @@ import com.linkedin.restli.server.UpdateResponse;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
import org.apache.gobblin.service.monitoring.FlowStatus;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -147,7 +151,9 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
.setExecutionStatus(ExecutionStatus.valueOf(queriedJobStatus.getEventName()))
.setMessage(queriedJobStatus.getMessage())
.setJobState(new JobState().setLowWatermark(queriedJobStatus.getLowWatermark()).
- setHighWatermark(queriedJobStatus.getHighWatermark()));
+ setHighWatermark(queriedJobStatus.getHighWatermark()))
+ .setIssues(new IssueArray(queriedJobStatus.getIssues().stream()
+ .map(FlowExecutionResourceLocalHandler::convertIssueToRestApiObject).collect(Collectors.toList())));
if (!Strings.isNullOrEmpty(queriedJobStatus.getMetrics())) {
jobStatus.setMetrics(queriedJobStatus.getMetrics());
@@ -168,6 +174,24 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
.setJobStatuses(jobStatusArray);
}
+ private static org.apache.gobblin.service.Issue convertIssueToRestApiObject(Issue issues) {
+ org.apache.gobblin.service.Issue converted = new org.apache.gobblin.service.Issue();
+
+ converted.setCode(issues.getCode())
+ .setSummary(ObjectUtils.firstNonNull(issues.getSummary(), ""))
+ .setDetails(ObjectUtils.firstNonNull(issues.getDetails(), ""))
+ .setSeverity(IssueSeverity.valueOf(issues.getSeverity().name()))
+ .setTime(issues.getTime().toInstant().toEpochMilli());
+
+ if (issues.getProperties() != null) {
+ converted.setProperties(new StringMap(issues.getProperties()));
+ } else {
+ converted.setProperties(new StringMap());
+ }
+
+ return converted;
+ }
+
/**
* Return the flow start time given a {@link org.apache.gobblin.service.monitoring.FlowStatus}. Flow execution ID is
* assumed to be the flow start time.
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 3fcaeda..46c1053 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -56,6 +56,7 @@ dependencies {
compile externalDependency.calciteCore
//compile externalDependency.calciteAvatica
compile externalDependency.commonsCli
+ compile externalDependency.commonsCollections4
compile externalDependency.commonsConfiguration
compile externalDependency.commonsEmail
compile externalDependency.commonsLang
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
new file mode 100644
index 0000000..6727f3c
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.collections4.map.LRUMap;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+/**
+ * Stores issues from multiple jobs, flows or other contexts in memory.
+ *
+ * To limit the memory consumption, it will keep only the last {@link #MAX_CONTEXT_COUNT} contexts,
+ * and older ones will be discarded.
+ * */
+@Singleton
+public class InMemoryMultiContextIssueRepository implements MultiContextIssueRepository {
+ public static final int DEFAULT_MAX_CONTEXT_COUNT = 100;
+
+ public static final String CONFIG_PREFIX = "gobblin.troubleshooter.inMemoryIssueRepository.";
+ public static final String MAX_CONTEXT_COUNT = CONFIG_PREFIX + "maxContextCount";
+ public static final String MAX_ISSUE_PER_CONTEXT = CONFIG_PREFIX + "maxIssuesPerContext";
+
+ private final LRUMap<String, InMemoryIssueRepository> contextIssues;
+ private final int maxIssuesPerContext;
+
+ public InMemoryMultiContextIssueRepository() {
+ this(ConfigFactory.empty());
+ }
+
+ @Inject
+ public InMemoryMultiContextIssueRepository(Config config) {
+ this(ConfigUtils.getInt(config, MAX_CONTEXT_COUNT, DEFAULT_MAX_CONTEXT_COUNT),
+ ConfigUtils.getInt(config, MAX_ISSUE_PER_CONTEXT, InMemoryIssueRepository.DEFAULT_MAX_SIZE));
+ }
+
+ public InMemoryMultiContextIssueRepository(int maxContextCount, int maxIssuesPerContext) {
+ contextIssues = new LRUMap<>(maxContextCount);
+ this.maxIssuesPerContext = maxIssuesPerContext;
+ }
+
+ @Override
+ public synchronized List<Issue> getAll(String contextId)
+ throws TroubleshooterException {
+
+ InMemoryIssueRepository issueRepository = contextIssues.getOrDefault(contextId, null);
+
+ if (issueRepository != null) {
+ return issueRepository.getAll();
+ }
+
+ return Collections.emptyList();
+ }
+
+ @Override
+ public synchronized void put(String contextId, Issue issue)
+ throws TroubleshooterException {
+
+ InMemoryIssueRepository issueRepository =
+ contextIssues.computeIfAbsent(contextId, s -> new InMemoryIssueRepository(maxIssuesPerContext));
+
+ issueRepository.put(issue);
+ }
+
+ @Override
+ public synchronized void remove(String contextId, String issueCode)
+ throws TroubleshooterException {
+
+ InMemoryIssueRepository issueRepository = contextIssues.getOrDefault(contextId, null);
+
+ if (issueRepository != null) {
+ issueRepository.remove(issueCode);
+ }
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
new file mode 100644
index 0000000..bccd01b
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.Objects;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.util.GsonUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * This class will forward issues received from job events to shared repository.
+ *
+ * It will additionally log received issues, so that they can be processed by an analytical systems to determine
+ * the overall platform health.
+ * */
+@Slf4j
+public class JobIssueEventHandler {
+
+ public static final String CONFIG_PREFIX = "gobblin.troubleshooter.jobIssueEventHandler.";
+ public static final String LOG_RECEIVED_EVENTS = CONFIG_PREFIX + "logReceiveEvents";
+
+ private static final Logger issueLogger =
+ LoggerFactory.getLogger("org.apache.gobblin.runtime.troubleshooter.JobIssueLogger");
+
+ private final MultiContextIssueRepository issueRepository;
+ private final boolean logReceiveEvents;
+
+ @Inject
+ public JobIssueEventHandler(MultiContextIssueRepository issueRepository, Config config) {
+ this(issueRepository, ConfigUtils.getBoolean(config, LOG_RECEIVED_EVENTS, true));
+ }
+
+ public JobIssueEventHandler(MultiContextIssueRepository issueRepository, boolean logReceiveEvents) {
+ this.issueRepository = Objects.requireNonNull(issueRepository);
+ this.logReceiveEvents = logReceiveEvents;
+ }
+
+ public void processEvent(GobblinTrackingEvent event) {
+ if (!IssueEventBuilder.isIssueEvent(event)) {
+ return;
+ }
+
+ String contextId;
+ try {
+ Properties metadataProperties = new Properties();
+ metadataProperties.putAll(event.getMetadata());
+ contextId = TroubleshooterUtils.getContextIdForJob(metadataProperties);
+ } catch (Exception ex) {
+ log.warn("Failed to extract context id from Gobblin tracking event with timestamp " + event.getTimestamp(), ex);
+ return;
+ }
+
+ IssueEventBuilder issueEvent = IssueEventBuilder.fromEvent(event);
+
+ try {
+ issueRepository.put(contextId, issueEvent.getIssue());
+ } catch (TroubleshooterException e) {
+ log.warn("Failed to save issue to repository. Issue code: " + issueEvent.getIssue().getCode(), e);
+ }
+
+ if (logReceiveEvents) {
+ logEvent(issueEvent);
+ }
+ }
+
+ private void logEvent(IssueEventBuilder issueEvent) {
+ ImmutableMap<String, String> metadata = issueEvent.getMetadata();
+
+ JobIssueLogEntry logEntry = new JobIssueLogEntry();
+ logEntry.issue = issueEvent.getIssue();
+
+ logEntry.flowGroup = metadata.getOrDefault(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, null);
+ logEntry.flowName = metadata.getOrDefault(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, null);
+ logEntry.flowExecutionId = metadata.getOrDefault(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, null);
+ logEntry.jobName = metadata.getOrDefault(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, null);
+
+ String serializedIssueEvent = GsonUtils.GSON_WITH_DATE_HANDLING.toJson(logEntry);
+ issueLogger.info(serializedIssueEvent);
+ }
+
+ private static class JobIssueLogEntry {
+ String flowName;
+ String flowGroup;
+ String flowExecutionId;
+ String jobName;
+ Issue issue;
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
new file mode 100644
index 0000000..e3eafbd
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.List;
+
+/**
+ * Stores issues from multiple jobs, flows and other contexts
+ *
+ * @see AutomaticTroubleshooter
+ * */
+public interface MultiContextIssueRepository {
+
+ List<Issue> getAll(String contextId)
+ throws TroubleshooterException;
+
+ void put(String contextId, Issue issue)
+ throws TroubleshooterException;
+
+ void remove(String contextId, String issueCode)
+ throws TroubleshooterException;
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/TroubleshooterUtils.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/TroubleshooterUtils.java
new file mode 100644
index 0000000..5c7c980
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/TroubleshooterUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.Properties;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+public final class TroubleshooterUtils {
+
+ private TroubleshooterUtils() {
+ }
+
+ public static String getContextIdForJob(String flowGroup, String flowName, String flowExecutionId, String jobName) {
+ return flowGroup + ":" + flowName + ":" + flowExecutionId + ":" + jobName;
+ }
+
+ public static String getContextIdForJob(Properties props) {
+ return getContextIdForJob(props.getProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD),
+ props.getProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD),
+ props.getProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD),
+ props.getProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD));
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index e1ebb64..9a3e58e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -17,10 +17,14 @@
package org.apache.gobblin.service.monitoring;
+import java.util.List;
+
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+
/**
* Contains attributes that describe job status.
@@ -48,4 +52,5 @@ public class JobStatus {
private final int maxAttempts;
private final int currentAttempts;
private final boolean shouldRetry;
+ private final List<Issue> issues;
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 668824b..47d4834 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -17,25 +17,33 @@
package org.apache.gobblin.service.monitoring;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import com.google.common.collect.Iterators;
import com.typesafe.config.ConfigFactory;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
import org.apache.gobblin.util.ConfigUtils;
/**
* Retriever for {@link JobStatus}.
*/
+@Slf4j
public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker {
public static final String EVENT_NAME_FIELD = "eventName";
public static final String NA_KEY = "NA";
@@ -43,8 +51,11 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
@Getter
protected final MetricContext metricContext;
- protected JobStatusRetriever() {
+ private final MultiContextIssueRepository issueRepository;
+
+ protected JobStatusRetriever(MultiContextIssueRepository issueRepository) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
+ this.issueRepository = Objects.requireNonNull(issueRepository);
}
public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
@@ -95,11 +106,21 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
int currentAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, "1"));
boolean shouldRetry = Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, "false"));
+
+ List<Issue> issues;
+ try {
+ String contextId = TroubleshooterUtils.getContextIdForJob(jobState.getProperties());
+ issues = issueRepository.getAll(contextId);
+ } catch (TroubleshooterException e) {
+ log.warn("Cannot retrieve job issues", e);
+ issues = Collections.emptyList();
+ }
+
return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
- shouldRetry(shouldRetry).build();
+ shouldRetry(shouldRetry).issues(issues).build();
}
public abstract StateStore<State> getStateStore();
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java
new file mode 100644
index 0000000..f4f3e97
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepositoryTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.List;
+
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+
+@Test
+public class InMemoryMultiContextIssueRepositoryTest extends MultiContextIssueRepositoryTest {
+
+ @Test
+ public void willHaveCapacityLimit()
+ throws Exception {
+
+ int jobCount = 100;
+ int jobCapacity = 50;
+
+ MultiContextIssueRepository repository = new InMemoryMultiContextIssueRepository(50, 10);
+
+ for (int j = 0; j < jobCount; j++) {
+ repository.put("job" + j, getTestIssue("issue 1", "code1"));
+ }
+
+ for (int j = 0; j < jobCount; j++) {
+ List<Issue> retrievedIssues = repository.getAll("job" + j);
+
+ if (j < jobCapacity) {
+ assertEquals(0, retrievedIssues.size()); // expecting empty list for non-existent jobs
+ } else {
+ assertEquals(1, retrievedIssues.size());
+ }
+ }
+ }
+
+ @Override
+ protected MultiContextIssueRepository getRepository() {
+ return new InMemoryMultiContextIssueRepository();
+ }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
new file mode 100644
index 0000000..b479935
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+
+public class JobIssueEventHandlerTest {
+
+ @Test
+ public void canHandleIssue()
+ throws Exception {
+ MultiContextIssueRepository issueRepository = mock(MultiContextIssueRepository.class);
+ JobIssueEventHandler eventHandler = new JobIssueEventHandler(issueRepository, true);
+
+ IssueEventBuilder eventBuilder = new IssueEventBuilder("TestJob");
+ eventBuilder.setIssue(getTestIssue("test issue", "code1"));
+ eventBuilder.addMetadata(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, "test-group");
+ eventBuilder.addMetadata(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, "test-flow");
+ eventBuilder.addMetadata(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, "1234");
+ eventBuilder.addMetadata(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, "test-job");
+
+ eventHandler.processEvent(eventBuilder.build());
+
+ verify(issueRepository).put(any(), any());
+ }
+
+ private Issue getTestIssue(String summary, String code) {
+ return Issue.builder().summary(summary).code(code).build();
+ }
+}
\ No newline at end of file
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java
new file mode 100644
index 0000000..a524408
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepositoryTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.List;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public abstract class MultiContextIssueRepositoryTest {
+
+ @Test
+ public void canPutIssue()
+ throws Exception {
+ MultiContextIssueRepository repository = getRepository();
+
+ Issue testIssue = getTestIssue("first", "code1");
+ repository.put("job1", testIssue);
+
+ List<Issue> issues = repository.getAll("job1");
+ assertEquals(1, issues.size());
+ assertEquals(testIssue, issues.get(0));
+ }
+
+ @Test
+ public void canPutMultipleJobIssue()
+ throws Exception {
+ MultiContextIssueRepository repository = getRepository();
+
+ repository.put("job1", getTestIssue("first", "code1"));
+ repository.put("job1", getTestIssue("second", "code2"));
+
+ List<Issue> issues = repository.getAll("job1");
+ assertEquals(2, issues.size());
+ }
+
+ @Test
+ public void canWorkWithMultipleJobs()
+ throws Exception {
+ MultiContextIssueRepository repository = getRepository();
+
+ Issue job1Issue1 = getTestIssue("first", "code1");
+ Issue job2Issue1 = getTestIssue("first", "code1");
+ Issue job2Issue2 = getTestIssue("second", "code2");
+
+ repository.put("job1", job1Issue1);
+ repository.put("job2", job2Issue1);
+ repository.put("job2", job2Issue2);
+
+ assertEquals(1, repository.getAll("job1").size());
+ assertEquals(2, repository.getAll("job2").size());
+ assertEquals(2, repository.getAll("job2").size());
+
+ assertEquals(job1Issue1, repository.getAll("job1").get(0));
+ }
+
+ @Test
+ public void canRemoveIssue()
+ throws Exception {
+ MultiContextIssueRepository repository = getRepository();
+
+ repository.put("job1", getTestIssue("first", "code1"));
+ repository.put("job1", getTestIssue("second", "code2"));
+
+ repository.remove("job1", "code1");
+ List<Issue> issues = repository.getAll("job1");
+ assertEquals(1, issues.size());
+ assertEquals("code2", issues.get(0).getCode());
+ }
+
+ @Test
+ public void willPreserveIssueInsertionOrder()
+ throws Exception {
+
+ int jobCount = 10;
+ int issueCount = 50;
+
+ MultiContextIssueRepository repository = getRepository();
+
+ for (int j = 0; j < jobCount; j++) {
+ for (int i = 0; i < issueCount; i++) {
+ repository.put("job" + j, getTestIssue("issue " + i, String.valueOf(i)));
+ }
+ }
+
+ for (int j = 0; j < jobCount; j++) {
+ List<Issue> retrievedIssues = repository.getAll("job" + j);
+ for (int i = 0; i < issueCount; i++) {
+ assertEquals(String.valueOf(i), retrievedIssues.get(i).getCode());
+ }
+ }
+ }
+
+ protected abstract MultiContextIssueRepository getRepository();
+
+ protected Issue getTestIssue(String summary, String code) {
+ return Issue.builder().summary(summary).code(code).build();
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index a4d1458..91aeb64 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -41,6 +41,9 @@ import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
@@ -207,6 +210,10 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(GobblinServiceManager.class);
+ binder.bind(MultiContextIssueRepository.class).to(InMemoryMultiContextIssueRepository.class);
+
+ binder.bind(JobIssueEventHandler.class);
+
LOGGER.info("Bindings configured");
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index 85ee51a..21abc4e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Iterators;
import com.typesafe.config.Config;
import javax.inject.Inject;
+import javax.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -40,6 +41,7 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
/**
@@ -48,6 +50,7 @@ import org.apache.gobblin.metastore.FsStateStore;
* The store name is set to flowGroup.flowName, while the table name is set to flowExecutionId.jobGroup.jobName.
*/
@Slf4j
+@Singleton
public class FsJobStatusRetriever extends JobStatusRetriever {
public static final String CONF_PREFIX = "fsJobStatusRetriever";
@@ -55,7 +58,8 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
private final FileContextBasedFsStateStore<State> stateStore;
@Inject
- public FsJobStatusRetriever(Config config) {
+ public FsJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) {
+ super(issueRepository);
this.stateStore = (FileContextBasedFsStateStore<State>) new FileContextBasedFsStateStoreFactory().
createStateStore(config.getConfig(CONF_PREFIX), State.class);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index def4943..6e47ca0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -29,6 +29,7 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
import lombok.Getter;
@@ -43,6 +44,7 @@ import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory;
import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.util.ConfigUtils;
@@ -62,9 +64,11 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
@Getter
private Meter messageParseFailures;
- public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
+ public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
+ JobIssueEventHandler jobIssueEventHandler)
throws IOException, ReflectiveOperationException {
- super(topic, config, numThreads);
+ super(topic, config, numThreads, jobIssueEventHandler);
+
if (ConfigUtils.getBoolean(config, ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new KafkaAvroSchemaRegistryFactory().
create(ConfigUtils.configToProperties(config));
@@ -86,13 +90,14 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
}
@Override
- public org.apache.gobblin.configuration.State parseJobStatus(DecodeableKafkaRecord<byte[],byte[]> message) {
+ @VisibleForTesting
+ public GobblinTrackingEvent deserializeEvent(DecodeableKafkaRecord<byte[],byte[]> message) {
try {
InputStream is = new ByteArrayInputStream(message.getValue());
schemaVersionWriter.readSchemaVersioningInformation(new DataInputStream(is));
Decoder decoder = DecoderFactory.get().binaryDecoder(is, this.decoder.get());
- GobblinTrackingEvent decodedMessage = this.reader.get().read(null, decoder);
- return parseJobStatus(decodedMessage);
+
+ return this.reader.get().read(null, decoder);
} catch (Exception exc) {
this.messageParseFailures.mark();
if (this.messageParseFailures.getFiveMinuteRate() < 1) {
@@ -109,7 +114,9 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
* @param event an instance of {@link GobblinTrackingEvent}
* @return job status as an instance of {@link org.apache.gobblin.configuration.State}
*/
- private org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEvent event) {
+ @Override
+ @VisibleForTesting
+ public org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEvent event) {
if (!acceptEvent(event)) {
return null;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 38a7f6b..9792b83 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -43,11 +43,14 @@ import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.runtime.retention.DatasetCleanerTask;
+import org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
+import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.workunit.WorkUnit;
@@ -67,6 +70,9 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
public static final String GET_AND_SET_JOB_STATUS = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
JOB_STATUS_MONITOR_PREFIX, "getAndSetJobStatus");
+ private static final String PROCESS_JOB_ISSUE = MetricRegistry
+ .name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, JOB_STATUS_MONITOR_PREFIX, "jobIssueProcessingTime");
+
static final String JOB_STATUS_MONITOR_TOPIC_KEY = "topic";
static final String JOB_STATUS_MONITOR_NUM_THREADS_KEY = "numThreads";
static final String JOB_STATUS_MONITOR_CLASS_KEY = "class";
@@ -86,7 +92,9 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE,
ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
- public KafkaJobStatusMonitor(String topic, Config config, int numThreads)
+ private final JobIssueEventHandler jobIssueEventHandler;
+
+ public KafkaJobStatusMonitor(String topic, Config config, int numThreads, JobIssueEventHandler jobIssueEventHandler)
throws ReflectiveOperationException {
super(topic, config.withFallback(DEFAULTS), numThreads);
String stateStoreFactoryClass = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, FileContextBasedFsStateStoreFactory.class.getName());
@@ -94,6 +102,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
this.stateStore =
((StateStore.Factory) Class.forName(stateStoreFactoryClass).newInstance()).createStateStore(config, org.apache.gobblin.configuration.State.class);
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
+
+ this.jobIssueEventHandler = jobIssueEventHandler;
}
@Override
@@ -128,7 +138,19 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
@Override
protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
try {
- org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message);
+ GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
+
+ if (gobblinTrackingEvent == null) {
+ return;
+ }
+
+ if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
+ try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
+ jobIssueEventHandler.processEvent(gobblinTrackingEvent);
+ }
+ }
+
+ org.apache.gobblin.configuration.State jobStatus = parseJobStatus(gobblinTrackingEvent);
if (jobStatus != null) {
try(Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
addJobStatusToStateStore(jobStatus, this.stateStore);
@@ -231,6 +253,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
return Long.parseLong(Splitter.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
}
- public abstract org.apache.gobblin.configuration.State parseJobStatus(DecodeableKafkaRecord<byte[],byte[]> message);
+ protected abstract GobblinTrackingEvent deserializeEvent(DecodeableKafkaRecord<byte[],byte[]> message);
+
+ protected abstract org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEvent event);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 741a246..7f6fc51 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.service.monitoring;
+import java.util.Objects;
+
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
@@ -28,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -40,11 +43,13 @@ public class KafkaJobStatusMonitorFactory implements Provider<KafkaJobStatusMoni
private static final String KAFKA_SSL_CONFIG_PREFIX_KEY = "jobStatusMonitor.kafka.config";
private static final String DEFAULT_KAFKA_SSL_CONFIG_PREFIX = "metrics.reporting.kafka.config";
- Config config;
+ private final Config config;
+ private final JobIssueEventHandler jobIssueEventHandler;
@Inject
- public KafkaJobStatusMonitorFactory(Config config) {
- this.config = config;
+ public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler jobIssueEventHandler) {
+ this.config = Objects.requireNonNull(config);
+ this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
}
private KafkaJobStatusMonitor createJobStatusMonitor()
@@ -71,7 +76,8 @@ public class KafkaJobStatusMonitorFactory implements Provider<KafkaJobStatusMoni
config.getValue(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE));
}
jobStatusConfig = jobStatusConfig.withFallback(kafkaSslConfig).withFallback(schemaRegistryConfig);
- return (KafkaJobStatusMonitor) GobblinConstructorUtils.invokeLongestConstructor(jobStatusMonitorClass, topic, jobStatusConfig, numThreads);
+ return (KafkaJobStatusMonitor) GobblinConstructorUtils
+ .invokeLongestConstructor(jobStatusMonitorClass, topic, jobStatusConfig, numThreads, jobIssueEventHandler);
}
@Override
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
index c9bb3e1..3d1cfc0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -17,10 +17,6 @@
package org.apache.gobblin.service.monitoring;
-import com.google.common.base.Preconditions;
-
-import com.google.common.collect.Iterators;
-import com.typesafe.config.Config;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
@@ -28,18 +24,26 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecProducer;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.ExecutionStatus;
-import lombok.extern.slf4j.Slf4j;
-
/**
* A job status monitor for jobs completed by a Gobblin Standalone instance running on the same machine. Mainly used for sandboxing/testing
* Considers a job done when Gobblin standalone appends ".done" to the job. Otherwise it will assume the job is in progress
*/
@Slf4j
+@Singleton
public class LocalFsJobStatusRetriever extends JobStatusRetriever {
public static final String CONF_PREFIX = "localFsJobStatusRetriever.";
@@ -47,7 +51,9 @@ public class LocalFsJobStatusRetriever extends JobStatusRetriever {
private String specProducerPath;
// Do not use a state store for this implementation, just look at the job folder that @LocalFsSpecProducer writes to
- public LocalFsJobStatusRetriever(Config config) {
+ @Inject
+ public LocalFsJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) {
+ super(issueRepository);
this.specProducerPath = config.getString(CONF_PREFIX + LocalFsSpecProducer.LOCAL_FS_PRODUCER_PATH_KEY);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index a7a760c..6fc02ff 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -29,6 +29,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.typesafe.config.Config;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import lombok.Getter;
import org.apache.gobblin.configuration.State;
@@ -36,11 +38,13 @@ import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
/**
* Mysql based Retriever for {@link JobStatus}.
*/
+@Singleton
public class MysqlJobStatusRetriever extends JobStatusRetriever {
public static final String MYSQL_JOB_STATUS_RETRIEVER_PREFIX = "mysqlJobStatusRetriever";
public static final String GET_LATEST_JOB_STATUS_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -53,7 +57,9 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
@Getter
private MysqlJobStatusStateStore<State> stateStore;
- public MysqlJobStatusRetriever(Config config) throws ReflectiveOperationException {
+ @Inject
+ public MysqlJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) throws ReflectiveOperationException {
+ super(issueRepository);
config = config.getConfig(MYSQL_JOB_STATUS_RETRIEVER_PREFIX).withFallback(config);
this.stateStore = (MysqlJobStatusStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index f9dffc0..d0595a4 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -17,16 +17,22 @@
package org.apache.gobblin.service.monitoring;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
import java.io.File;
import java.io.IOException;
+
import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+
+import static org.mockito.Mockito.mock;
+
public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
@@ -37,7 +43,7 @@ public class FsJobStatusRetrieverTest extends JobStatusRetrieverTest {
cleanUpDir();
Config config = ConfigFactory.empty().withValue(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir));
- this.jobStatusRetriever = new FsJobStatusRetriever(config);
+ this.jobStatusRetriever = new FsJobStatusRetriever(config, mock(MultiContextIssueRepository.class));
}
@Test
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index ac924fb..4646ef2 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -21,12 +21,12 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
-import com.google.common.base.Strings;
-
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Strings;
+
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -34,9 +34,12 @@ import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
+import static org.mockito.Mockito.mock;
+
public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
private MysqlJobStatusStateStore dbJobStateStore;
@@ -54,7 +57,8 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
- this.jobStatusRetriever = new MysqlJobStatusRetriever(configBuilder.build());
+ this.jobStatusRetriever =
+ new MysqlJobStatusRetriever(configBuilder.build(), mock(MultiContextIssueRepository.class));
this.dbJobStateStore = ((MysqlJobStatusRetriever) this.jobStatusRetriever).getStateStore();
cleanUpDir();
}
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 9e54cc3..4a7c480 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -38,6 +38,7 @@ ext.externalDependency = [
"commonsEmail": "org.apache.commons:commons-email:1.4",
"commonsLang": "commons-lang:commons-lang:2.6",
"commonsLang3": "org.apache.commons:commons-lang3:3.4",
+ "commonsCollections4": "org.apache.commons:commons-collections4:4.4",
"commonsConfiguration": "commons-configuration:commons-configuration:1.10",
"commonsIo": "commons-io:commons-io:2.5",
"commonsMath": "org.apache.commons:commons-math3:3.5",