You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/10/19 19:33:32 UTC
[gobblin] branch master updated: [GOBBLIN-1563]Collect more
information to analyze the RC for some job cannot emit kafka events to
update job status (#3416)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4a25c88 [GOBBLIN-1563]Collect more information to analyze the RC for some job cannot emit kafka events to update job status (#3416)
4a25c88 is described below
commit 4a25c886db03dbd267cb3f01b7df1a35096f6064
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Tue Oct 19 12:33:27 2021 -0700
[GOBBLIN-1563]Collect more information to analyze the RC for some job cannot emit kafka events to update job status (#3416)
* [GOBBLIN-1563]Collect more information to analyze the RC for some job cannot emit kafka events to update job status
* address comments for GOBBLIN-1561
* address comments
* avoid type cast
---
.../apache/gobblin/metrics/reporter/EventReporter.java | 14 ++++++++------
.../apache/gobblin/metrics/kafka/KafkaEventReporter.java | 3 +++
.../service/FlowConfigV2ResourceLocalHandler.java | 16 ++++------------
.../java/org/apache/gobblin/runtime/api/FlowSpec.java | 14 +++++++-------
.../service/modules/flow/MultiHopFlowCompiler.java | 14 ++++++--------
.../modules/flowgraph/pathfinder/AbstractPathFinder.java | 2 +-
6 files changed, 29 insertions(+), 34 deletions(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
index 83c6a33..45061ba 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
@@ -33,9 +33,6 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
@@ -78,12 +75,12 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab
protected static final Joiner JOINER = Joiner.on('.').skipNulls();
protected static final String METRIC_KEY_PREFIX = "gobblin.metrics";
protected static final String EVENTS_QUALIFIER = "events";
- private static final Logger LOGGER = LoggerFactory.getLogger(EventReporter.class);
public static final int DEFAULT_QUEUE_CAPACITY = 100;
public static final String QUEUE_CAPACITY_KEY = ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".queue.capacity";
public static final int DEFAULT_QUEUE_OFFER_TIMEOUT_SECS = 10;
public static final String QUEUE_OFFER_TIMOUT_SECS_KEY = ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".queue.offer.timeout.secs";
private static final String NULL_STRING = "null";
+ public static final int REPORT_TIMEOUT_SECS = 60;
private final MetricContext metricContext;
private final BlockingQueue<GobblinTrackingEvent> reportingQueue;
@@ -108,7 +105,7 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab
this.closer = Closer.create();
this.immediateReportExecutor = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(1,
- ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("EventReporter-" + builder.name + "-%d"))),
+ ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("EventReporter-" + builder.name + "-%d"))),
5, TimeUnit.MINUTES);
this.metricContext = builder.context;
@@ -144,9 +141,11 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab
*/
public void addEventToReportingQueue(GobblinTrackingEvent event) {
if (this.reportingQueue.size() > this.queueCapacity * 2 / 3) {
+ log.info("Trigger immediate run to report the event since queue is almost full");
immediatelyScheduleReport();
}
try {
+ log.debug(String.format("Offering one event to the metrics queue with event name: %s", event.getName()));
if (!this.reportingQueue.offer(sanitizeEvent(event), this.queueOfferTimeoutSecs, TimeUnit.SECONDS)) {
log.error("Enqueuing of event {} at reporter with class {} timed out. Sending of events is probably stuck.",
event, this.getClass().getCanonicalName());
@@ -235,11 +234,14 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab
@Override
public void close() {
try {
+ log.info(String.format("Closing event reporter %s", this.getClass().getCanonicalName()));
this.metricContext.removeNotificationTarget(this.notificationTargetKey);
+ this.immediateReportExecutor.awaitTermination(REPORT_TIMEOUT_SECS, TimeUnit.SECONDS);
+ log.info(String.format("Flush out %s events before closing the reporter", this.reportingQueue.size()));
report();
this.closer.close();
} catch (Exception e) {
- LOGGER.warn("Exception when closing EventReporter", e);
+ log.warn("Exception when closing EventReporter", e);
} finally {
super.close();
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
index 061c1da..732f1ef 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
@@ -25,6 +25,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.reporter.EventReporter;
@@ -36,6 +37,7 @@ import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
/**
* Reports {@link GobblinTrackingEvent} to a Kafka topic serialized as JSON.
*/
+@Slf4j
public class KafkaEventReporter extends EventReporter {
protected final AvroSerializer<GobblinTrackingEvent> serializer;
@@ -66,6 +68,7 @@ public class KafkaEventReporter extends EventReporter {
}
if (!events.isEmpty()) {
+ log.info("Pushing events to Kafka");
this.kafkaPusher.pushMessages(events);
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 60dfe7b..b7249c1 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -18,10 +18,8 @@ package org.apache.gobblin.service;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.commons.lang3.StringEscapeUtils;
import com.linkedin.data.template.StringMap;
@@ -103,18 +101,12 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
StringBuilder message = new StringBuilder("Flow was not compiled successfully.");
if (!flowSpec.getCompilationErrors().isEmpty()) {
message.append(" Compilation errors encountered (Sorted by relevance): ");
- Object[] errors = flowSpec.getCompilationErrors().toArray();
+ FlowSpec.CompilationError[] errors = flowSpec.getCompilationErrors().stream().distinct().toArray(FlowSpec.CompilationError[]::new);
Arrays.sort(errors, Comparator.comparingInt(c -> ((FlowSpec.CompilationError)c).errorPriority));
- // This is to avoid we print same error multi times.
- Set<String> errorSet = new HashSet<>();
int errorId = 0;
- for (Object er: errors) {
- FlowSpec.CompilationError error = (FlowSpec.CompilationError)er;
- if (!errorSet.contains(error.errorMessage)) {
- message.append("\n").append(String.format("ERROR[%s]", errorId)).append(error.errorMessage);
- errorSet.add(error.errorMessage);
- errorId++;
- }
+ for (FlowSpec.CompilationError error: errors) {
+ message.append("\n").append(String.format("ERROR[%s]", errorId)).append(error.errorMessage);
+ errorId++;
}
}
return message.toString();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 37eb584..5675da1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -125,20 +125,20 @@ public class FlowSpec implements Configurable, Spec {
throw new RuntimeException("Unable to create a FlowSpec URI: " + e, e);
}
}
- public CompilationError getCompilationError(String src, String dst, String errorMessage) {
- return new CompilationError(src, dst, errorMessage);
+ public void addCompilationError(String src, String dst, String errorMessage) {
+ this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage));
}
-
- public class CompilationError {
+ @EqualsAndHashCode
+ public static class CompilationError {
public int errorPriority;
public String errorMessage;
- CompilationError(String src, String dst, String errorMessage) {
+ CompilationError(Config config, String src, String dst, String errorMessage) {
errorPriority = 0;
- if (!src.equals(ConfigUtils.getString(getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))){
+ if (!src.equals(ConfigUtils.getString(config, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))){
errorPriority++;
}
- if (!ConfigUtils.getStringList(getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY)
+ if (!ConfigUtils.getStringList(config, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY)
.containsAll(Arrays.asList(StringUtils.split(dst, ",")))){
errorPriority++;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index a55d656..99580e9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -191,15 +191,13 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
DataNode sourceNode = this.flowGraph.getNode(source);
if (sourceNode == null) {
- flowSpec.getCompilationErrors()
- .add(flowSpec.getCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", source)));
+ flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", source));
return null;
}
List<String> destNodeIds = ConfigUtils.getStringList(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
List<DataNode> destNodes = destNodeIds.stream().map(this.flowGraph::getNode).collect(Collectors.toList());
if (destNodes.contains(null)) {
- flowSpec.getCompilationErrors()
- .add(flowSpec.getCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", destNodeIds.get(destNodes.indexOf(null)))));
+ flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", destNodeIds.get(destNodes.indexOf(null))));
return null;
}
log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
@@ -218,12 +216,12 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
String message = String.format("Data movement is not authorized for flow: %s, source: %s, destination: %s",
flowSpec.getUri().toString(), source, destination);
log.error(message);
- datasetFlowSpec.getCompilationErrors().add(datasetFlowSpec.getCompilationError(source, destination, message));
+ datasetFlowSpec.addCompilationError(source, destination, message);
return null;
}
} catch (Exception e) {
Instrumented.markMeter(flowCompilationFailedMeter);
- datasetFlowSpec.getCompilationErrors().add(datasetFlowSpec.getCompilationError(source, destination, Throwables.getStackTraceAsString(e)));
+ datasetFlowSpec.addCompilationError(source, destination, Throwables.getStackTraceAsString(e));
return null;
}
}
@@ -242,7 +240,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
log.info(message);
if (!flowSpec.getCompilationErrors().stream().anyMatch(compilationError -> compilationError.errorPriority == 0)) {
- flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(source, destination, message));
+ flowSpec.addCompilationError(source, destination, message);
}
return null;
}
@@ -250,7 +248,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
Instrumented.markMeter(flowCompilationFailedMeter);
String message = String.format("Exception encountered while compiling flow for source: %s and destination: %s, %s", source, destination, Throwables.getStackTraceAsString(e));
log.error(message, e);
- flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(source, destination, message));
+ flowSpec.addCompilationError(source, destination, message);
return null;
} finally {
this.rwLock.readLock().unlock();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 8c9fa8f..4dc71f4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -210,7 +210,7 @@ public abstract class AbstractPathFinder implements PathFinder {
try {
flowEdge.getFlowTemplate().tryResolving(mergedConfig, datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
} catch (JobTemplate.TemplateException | ConfigException | SpecNotFoundException e) {
- this.flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(flowEdge.getSrc(), flowEdge.getDest(), "Error compiling edge " + flowEdge.toString() + ": " + e.toString()));
+ flowSpec.addCompilationError(flowEdge.getSrc(), flowEdge.getDest(), "Error compiling edge " + flowEdge.toString() + ": " + e.toString());
continue;
}