You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/01/30 18:24:45 UTC

[gobblin] branch master updated: [GOBBLIN-1771] Clean up logs for dataset commit and file cleanup (#3631)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 667d79787 [GOBBLIN-1771] Clean up logs for dataset commit and file cleanup (#3631)
667d79787 is described below

commit 667d79787ae2f5ca2a22f5d023a6e89a00874835
Author: William Lo <lo...@gmail.com>
AuthorDate: Mon Jan 30 10:24:38 2023 -0800

    [GOBBLIN-1771] Clean up logs for dataset commit and file cleanup (#3631)
    
    * Clean up logs for dataset commit and file cleanup
    
    * Fix grammatical error
---
 .../org/apache/gobblin/runtime/SafeDatasetCommit.java    | 16 +++++++++++-----
 .../java/org/apache/gobblin/util/JobLauncherUtils.java   | 10 +++++-----
 2 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index f649e1402..ffd2683a2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.runtime;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 
 import org.apache.commons.lang.StringUtils;
@@ -317,7 +319,7 @@ final class SafeDatasetCommit implements Callable<Void> {
         datasetState.setState(JobState.RunningState.FAILED);
         datasetState.incrementJobFailures();
         Optional<String> taskStateException = taskState.getTaskFailureException();
-        log.warn("At least one task did not committed successfully. Setting dataset state to FAILED. "
+        log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. "
             + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set."));
         return;
       }
@@ -382,6 +384,7 @@ final class SafeDatasetCommit implements Callable<Void> {
   }
 
   private void finalizeDatasetState(JobState.DatasetState datasetState, String datasetUrn) {
+    Set<String> taskErrors = new HashSet<>();
     for (TaskState taskState : datasetState.getTaskStates()) {
       // Backoff the actual high watermark to the low watermark for each task that has not been committed
       if (taskState.getWorkingState() != WorkUnitState.WorkingState.COMMITTED) {
@@ -393,13 +396,16 @@ final class SafeDatasetCommit implements Callable<Void> {
           // 2. Otherwise, the processing of the dataset is considered successful even if some tasks for the
           //    dataset failed to be committed.
           datasetState.setState(JobState.RunningState.FAILED);
-          Optional<String> taskStateException = taskState.getTaskFailureException();
-          log.warn("At least one task did not committed successfully. Setting dataset state to FAILED. {}",
-              taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
+          String taskStateException = taskState.getTaskFailureException().isPresent() ? taskState.getTaskFailureException().get() : "Exception not set.";
+          // Only print out the unique exceptions to avoid needless logging duplication on large datasets
+          if (!taskErrors.contains(taskStateException)) {
+            taskErrors.add(taskStateException);
+            log.warn("At least one task in {} did not get committed successfully. Setting dataset state to FAILED. {}", datasetUrn,
+                taskStateException);
+          }
         }
       }
     }
-
     datasetState.setId(datasetUrn);
   }
 
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 93ea4a26d..401d159be 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -27,8 +27,6 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,6 +39,8 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -144,7 +144,7 @@ public class JobLauncherUtils {
     HadoopUtils.deletePath(fs, jobStagingPath, true);
 
     if (fs.exists(jobStagingPath.getParent()) && fs.listStatus(jobStagingPath.getParent()).length == 0) {
-      logger.info("Deleting directory " + jobStagingPath.getParent());
+      logger.debug("Deleting directory " + jobStagingPath.getParent());
       HadoopUtils.deletePath(fs, jobStagingPath.getParent(), true);
     }
 
@@ -153,14 +153,14 @@ public class JobLauncherUtils {
     HadoopUtils.deletePath(fs, jobOutputPath, true);
 
     if (fs.exists(jobOutputPath.getParent()) && fs.listStatus(jobOutputPath.getParent()).length == 0) {
-      logger.info("Deleting directory " + jobOutputPath.getParent());
+      logger.debug("Deleting directory " + jobOutputPath.getParent());
       HadoopUtils.deletePath(fs, jobOutputPath.getParent(), true);
     }
 
     if (state.contains(ConfigurationKeys.ROW_LEVEL_ERR_FILE)) {
       if (state.getPropAsBoolean(ConfigurationKeys.CLEAN_ERR_DIR, ConfigurationKeys.DEFAULT_CLEAN_ERR_DIR)) {
         Path jobErrPath = new Path(state.getProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE));
-        log.info("Cleaning up err directory : " + jobErrPath);
+        log.debug("Cleaning up err directory : " + jobErrPath);
         HadoopUtils.deleteIfExists(fs, jobErrPath, true);
       }
     }