You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/10/19 19:33:32 UTC

[gobblin] branch master updated: [GOBBLIN-1563]Collect more information to analyze the RC for some job cannot emit kafka events to update job status (#3416)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a25c88  [GOBBLIN-1563]Collect more information to analyze the RC for some job cannot emit kafka events to update job status (#3416)
4a25c88 is described below

commit 4a25c886db03dbd267cb3f01b7df1a35096f6064
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Tue Oct 19 12:33:27 2021 -0700

    [GOBBLIN-1563]Collect more information to analyze the RC for some job cannot emit kafka events to update job status (#3416)
    
    * [GOBBLIN-1563]Collect more information to analyze the RC for some job cannot emit kafka events to update job status
    
    * address comments for GOBBLIN-1561
    
    * address comments
    
    * avoid type cast
---
 .../apache/gobblin/metrics/reporter/EventReporter.java   | 14 ++++++++------
 .../apache/gobblin/metrics/kafka/KafkaEventReporter.java |  3 +++
 .../service/FlowConfigV2ResourceLocalHandler.java        | 16 ++++------------
 .../java/org/apache/gobblin/runtime/api/FlowSpec.java    | 14 +++++++-------
 .../service/modules/flow/MultiHopFlowCompiler.java       | 14 ++++++--------
 .../modules/flowgraph/pathfinder/AbstractPathFinder.java |  2 +-
 6 files changed, 29 insertions(+), 34 deletions(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
index 83c6a33..45061ba 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
@@ -33,9 +33,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
@@ -78,12 +75,12 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab
   protected static final Joiner JOINER = Joiner.on('.').skipNulls();
   protected static final String METRIC_KEY_PREFIX = "gobblin.metrics";
   protected static final String EVENTS_QUALIFIER = "events";
-  private static final Logger LOGGER = LoggerFactory.getLogger(EventReporter.class);
   public static final int DEFAULT_QUEUE_CAPACITY = 100;
   public static final String QUEUE_CAPACITY_KEY = ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".queue.capacity";
   public static final int DEFAULT_QUEUE_OFFER_TIMEOUT_SECS = 10;
   public static final String QUEUE_OFFER_TIMOUT_SECS_KEY = ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".queue.offer.timeout.secs";
   private static final String NULL_STRING = "null";
+  public static final int REPORT_TIMEOUT_SECS = 60;
 
   private final MetricContext metricContext;
   private final BlockingQueue<GobblinTrackingEvent> reportingQueue;
@@ -108,7 +105,7 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab
     this.closer = Closer.create();
     this.immediateReportExecutor = MoreExecutors.getExitingExecutorService(
         (ThreadPoolExecutor) Executors.newFixedThreadPool(1,
-            ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("EventReporter-" + builder.name + "-%d"))),
+            ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("EventReporter-" + builder.name + "-%d"))),
         5, TimeUnit.MINUTES);
 
     this.metricContext = builder.context;
@@ -144,9 +141,11 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab
    */
   public void addEventToReportingQueue(GobblinTrackingEvent event) {
     if (this.reportingQueue.size() > this.queueCapacity * 2 / 3) {
+      log.info("Trigger immediate run to report the event since queue is almost full");
       immediatelyScheduleReport();
     }
     try {
+      log.debug(String.format("Offering one event to the metrics queue with event name: %s", event.getName()));
       if (!this.reportingQueue.offer(sanitizeEvent(event), this.queueOfferTimeoutSecs, TimeUnit.SECONDS)) {
         log.error("Enqueuing of event {} at reporter with class {} timed out. Sending of events is probably stuck.",
             event, this.getClass().getCanonicalName());
@@ -235,11 +234,14 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab
   @Override
   public void close() {
     try {
+      log.info(String.format("Closing event reporter %s", this.getClass().getCanonicalName()));
       this.metricContext.removeNotificationTarget(this.notificationTargetKey);
+      this.immediateReportExecutor.awaitTermination(REPORT_TIMEOUT_SECS, TimeUnit.SECONDS);
+      log.info(String.format("Flush out %s events before closing the reporter", this.reportingQueue.size()));
       report();
       this.closer.close();
     } catch (Exception e) {
-      LOGGER.warn("Exception when closing EventReporter", e);
+      log.warn("Exception when closing EventReporter", e);
     } finally {
       super.close();
     }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
index 061c1da..732f1ef 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
@@ -25,6 +25,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.reporter.EventReporter;
@@ -36,6 +37,7 @@ import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 /**
  * Reports {@link GobblinTrackingEvent} to a Kafka topic serialized as JSON.
  */
+@Slf4j
 public class KafkaEventReporter extends EventReporter {
 
   protected final AvroSerializer<GobblinTrackingEvent> serializer;
@@ -66,6 +68,7 @@ public class KafkaEventReporter extends EventReporter {
     }
 
     if (!events.isEmpty()) {
+      log.info("Pushing events to Kafka");
       this.kafkaPusher.pushMessages(events);
     }
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 60dfe7b..b7249c1 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -18,10 +18,8 @@ package org.apache.gobblin.service;
 
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.Map;
 
-import java.util.Set;
 import org.apache.commons.lang3.StringEscapeUtils;
 
 import com.linkedin.data.template.StringMap;
@@ -103,18 +101,12 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
     StringBuilder message = new StringBuilder("Flow was not compiled successfully.");
     if (!flowSpec.getCompilationErrors().isEmpty()) {
       message.append(" Compilation errors encountered (Sorted by relevance): ");
-      Object[] errors = flowSpec.getCompilationErrors().toArray();
+      FlowSpec.CompilationError[] errors = flowSpec.getCompilationErrors().stream().distinct().toArray(FlowSpec.CompilationError[]::new);
       Arrays.sort(errors, Comparator.comparingInt(c -> ((FlowSpec.CompilationError)c).errorPriority));
-      // This is to avoid we print same error multi times.
-      Set<String> errorSet = new HashSet<>();
       int errorId = 0;
-      for (Object er: errors) {
-        FlowSpec.CompilationError error = (FlowSpec.CompilationError)er;
-        if (!errorSet.contains(error.errorMessage)) {
-          message.append("\n").append(String.format("ERROR[%s]", errorId)).append(error.errorMessage);
-          errorSet.add(error.errorMessage);
-          errorId++;
-        }
+      for (FlowSpec.CompilationError error: errors) {
+        message.append("\n").append(String.format("ERROR[%s]", errorId)).append(error.errorMessage);
+        errorId++;
       }
     }
     return message.toString();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 37eb584..5675da1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -125,20 +125,20 @@ public class FlowSpec implements Configurable, Spec {
       throw new RuntimeException("Unable to create a FlowSpec URI: " + e, e);
     }
   }
-  public CompilationError getCompilationError(String src, String dst, String errorMessage) {
-    return new CompilationError(src, dst, errorMessage);
+  public void addCompilationError(String src, String dst, String errorMessage) {
+    this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage));
   }
 
-
-  public class CompilationError {
+  @EqualsAndHashCode
+  public static class CompilationError {
     public int errorPriority;
     public String errorMessage;
-    CompilationError(String src, String dst, String errorMessage) {
+    CompilationError(Config config, String src, String dst, String errorMessage) {
       errorPriority = 0;
-      if (!src.equals(ConfigUtils.getString(getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))){
+      if (!src.equals(ConfigUtils.getString(config, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))){
         errorPriority++;
       }
-      if (!ConfigUtils.getStringList(getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY)
+      if (!ConfigUtils.getStringList(config, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY)
           .containsAll(Arrays.asList(StringUtils.split(dst, ",")))){
         errorPriority++;
       }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index a55d656..99580e9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -191,15 +191,13 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
 
     DataNode sourceNode = this.flowGraph.getNode(source);
     if (sourceNode == null) {
-      flowSpec.getCompilationErrors()
-          .add(flowSpec.getCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", source)));
+      flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", source));
       return null;
     }
     List<String> destNodeIds = ConfigUtils.getStringList(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
     List<DataNode> destNodes = destNodeIds.stream().map(this.flowGraph::getNode).collect(Collectors.toList());
     if (destNodes.contains(null)) {
-      flowSpec.getCompilationErrors()
-          .add(flowSpec.getCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", destNodeIds.get(destNodes.indexOf(null)))));
+      flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", destNodeIds.get(destNodes.indexOf(null))));
       return null;
     }
     log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
@@ -218,12 +216,12 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
               String message = String.format("Data movement is not authorized for flow: %s, source: %s, destination: %s",
                   flowSpec.getUri().toString(), source, destination);
               log.error(message);
-              datasetFlowSpec.getCompilationErrors().add(datasetFlowSpec.getCompilationError(source, destination, message));
+              datasetFlowSpec.addCompilationError(source, destination, message);
               return null;
             }
           } catch (Exception e) {
             Instrumented.markMeter(flowCompilationFailedMeter);
-            datasetFlowSpec.getCompilationErrors().add(datasetFlowSpec.getCompilationError(source, destination, Throwables.getStackTraceAsString(e)));
+            datasetFlowSpec.addCompilationError(source, destination, Throwables.getStackTraceAsString(e));
             return null;
           }
         }
@@ -242,7 +240,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
         log.info(message);
 
         if (!flowSpec.getCompilationErrors().stream().anyMatch(compilationError -> compilationError.errorPriority == 0)) {
-          flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(source, destination, message));
+          flowSpec.addCompilationError(source, destination, message);
         }
         return null;
       }
@@ -250,7 +248,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
       Instrumented.markMeter(flowCompilationFailedMeter);
       String message = String.format("Exception encountered while compiling flow for source: %s and destination: %s, %s", source, destination, Throwables.getStackTraceAsString(e));
       log.error(message, e);
-      flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(source, destination, message));
+      flowSpec.addCompilationError(source, destination, message);
       return null;
     } finally {
       this.rwLock.readLock().unlock();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 8c9fa8f..4dc71f4 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -210,7 +210,7 @@ public abstract class AbstractPathFinder implements PathFinder {
             try {
               flowEdge.getFlowTemplate().tryResolving(mergedConfig, datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
             } catch (JobTemplate.TemplateException | ConfigException | SpecNotFoundException e) {
-              this.flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(flowEdge.getSrc(), flowEdge.getDest(), "Error compiling edge " + flowEdge.toString() + ": " + e.toString()));
+              flowSpec.addCompilationError(flowEdge.getSrc(), flowEdge.getDest(), "Error compiling edge " + flowEdge.toString() + ": " + e.toString());
               continue;
             }