You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/23 20:49:55 UTC

[gobblin] branch master updated: [GOBBLIN-1451] Log better error messages from mappers

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 022b49f  [GOBBLIN-1451] Log better error messages from mappers
022b49f is described below

commit 022b49fdd99cebcc21aad9b6b7cde9df0b092537
Author: aprokofiev <ap...@linkedin.com>
AuthorDate: Wed Jun 23 13:49:50 2021 -0700

    [GOBBLIN-1451] Log better error messages from mappers
    
    Previously, the root cause of the problems were
    wrapped in several
    container exceptions that added little value in
    troubleshooting and
    confused the users. They frequently reported that
    the job failed with
    "RetryException" without looking at the root cause
    of it.
    
    With this change, we'll unwrap the exceptions from
    tasks and show
    the possible root cause at the top of the error
    message.
    
    We also shorten the boilerplate messages, so that
    real root cause
    is shown to the users right away.
    
    Closes #3289 from aplex/exception-unwrapping
---
 .../org/apache/gobblin/writer/RetryWriter.java     | 19 +++++---
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   | 11 +++--
 .../main/java/org/apache/gobblin/runtime/Task.java | 29 +++++++++----
 .../java/org/apache/gobblin/runtime/fork/Fork.java |  7 ++-
 .../runtime/util/ExceptionCleanupUtils.java        | 45 +++++++++++++++++++
 .../runtime/util/ExceptionCleanupUtilsTest.java    | 50 ++++++++++++++++++++++
 6 files changed, 138 insertions(+), 23 deletions(-)

diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
index 8e9b4be..73d0c8f 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
@@ -16,6 +16,14 @@
  */
 package org.apache.gobblin.writer;
 
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.codahale.metrics.Meter;
 import com.github.rholder.retry.Attempt;
 import com.github.rholder.retry.RetryException;
@@ -26,10 +34,7 @@ import com.github.rholder.retry.StopStrategies;
 import com.github.rholder.retry.WaitStrategies;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+
 import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.exception.NonTransientException;
@@ -38,8 +43,6 @@ import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.records.ControlMessageHandler;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.FinalState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Retry writer follows decorator pattern that retries on inner writer's failure.
@@ -136,7 +139,9 @@ public class RetryWriter<D> extends WatermarkAwareWriterWrapper<D> implements Da
   private void callWithRetry(Callable<Void> callable) throws IOException {
     try {
       this.retryer.wrap(callable).call();
-    } catch (ExecutionException | RetryException e) {
+    } catch (RetryException e) {
+      throw new IOException(e.getLastFailedAttempt().getExceptionCause());
+    } catch (ExecutionException e) {
       throw new IOException(e);
     }
   }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 9b39b95..dcecb07 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -311,11 +311,12 @@ public class GobblinMultiTaskAttempt {
     }
 
     if (hasTaskFailure) {
-      String errorMsg ="";
+      String errorMsg = String.format("Tasks in container %s failed", containerIdOptional.or(""));
       for (Task task : tasks) {
         if (task.getTaskState().contains(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY)) {
-          errorMsg = String.format("Task %s failed due to exception: %s", task.getTaskId(),
-              task.getTaskState().getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY));
+          errorMsg = String.format("Task failed: %s (Gobblin task id %s, container id %s)",
+                                   task.getTaskState().getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY),
+                                   task.getTaskId(), containerIdOptional.or(""));
         }
 
         // If there are task failures then the tasks may be reattempted. Save a copy of the task state that is used
@@ -327,9 +328,7 @@ public class GobblinMultiTaskAttempt {
         }
       }
 
-      throw new IOException(
-          String.format("Not all tasks running in container %s completed successfully, last recorded exception[%s]",
-              containerIdOptional.or(""), errorMsg));
+      throw new IOException(errorMsg);
     }
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 6b0e822..b46feba 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -80,6 +80,7 @@ import org.apache.gobblin.runtime.fork.AsynchronousFork;
 import org.apache.gobblin.runtime.fork.Fork;
 import org.apache.gobblin.runtime.fork.SynchronousFork;
 import org.apache.gobblin.runtime.task.TaskIFace;
+import org.apache.gobblin.runtime.util.ExceptionCleanupUtils;
 import org.apache.gobblin.runtime.util.TaskMetrics;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
@@ -379,11 +380,15 @@ public class Task implements TaskIFace {
             filter(not(Fork::isSucceeded)).map(x -> x.getIndex()).collect(Collectors.toList());
         ForkThrowableHolder holder = Task.getForkThrowableHolder(this.taskState.getTaskBroker());
 
-        Exception e = null;
+        Throwable e = null;
         if (!holder.isEmpty()) {
-          e = holder.getAggregatedException(failedForksId, this.taskId);
+          if (failedForksId.size() == 1 && holder.getThrowable(failedForksId.get(0)).isPresent()) {
+            e = holder.getThrowable(failedForksId.get(0)).get();
+          }else{
+            e = holder.getAggregatedException(failedForksId, this.taskId);
+          }
         }
-        throw e == null ? new RuntimeException("Some forks failed") : new RuntimeException("Forks failed with exception:", e);
+        throw e == null ? new RuntimeException("Some forks failed") : e;
       }
 
       //TODO: Move these to explicit shutdown phase
@@ -561,13 +566,16 @@ public class Task implements TaskIFace {
   }
 
   protected void failTask(Throwable t) {
-    LOG.error(String.format("Task %s failed", this.taskId), t);
+    Throwable cleanedException = ExceptionCleanupUtils.removeEmptyWrappers(t);
+
+    LOG.error(String.format("Task %s failed", this.taskId), cleanedException);
     this.taskState.setWorkingState(WorkUnitState.WorkingState.FAILED);
-    this.taskState.setProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY, Throwables.getStackTraceAsString(t));
+    this.taskState
+        .setProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY, Throwables.getStackTraceAsString(cleanedException));
 
     // Send task failure event
     FailureEventBuilder failureEvent = new FailureEventBuilder(FAILED_TASK_EVENT);
-    failureEvent.setRootCause(t);
+    failureEvent.setRootCause(cleanedException);
     failureEvent.addMetadata(TASK_STATE, this.taskState.toString());
     failureEvent.addAdditionalMetadata(this.taskEventMetadataGenerator.getMetadata(this.taskState, failureEvent.getName()));
     failureEvent.submit(taskContext.getTaskMetrics().getMetricContext());
@@ -923,11 +931,16 @@ public class Task implements TaskIFace {
         if (this.taskState.getWorkingState() != WorkUnitState.WorkingState.FAILED) {
           this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
         }
-      } else {
+      }
+      else {
         ForkThrowableHolder holder = Task.getForkThrowableHolder(this.taskState.getTaskBroker());
         LOG.info("Holder for this task {} is {}", this.taskId, holder);
         if (!holder.isEmpty()) {
-          failTask(holder.getAggregatedException(failedForkIds, this.taskId));
+          if (failedForkIds.size() == 1 && holder.getThrowable(failedForkIds.get(0)).isPresent()) {
+            failTask(holder.getThrowable(failedForkIds.get(0)).get());
+          } else {
+            failTask(holder.getAggregatedException(failedForkIds, this.taskId));
+          }
         } else {
           // just in case there are some corner cases where Fork throw an exception but doesn't add into holder
           failTask(new ForkException("Fork branches " + failedForkIds + " failed for task " + this.taskId));
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index f9bc862..9c58e6c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -56,6 +56,7 @@ import org.apache.gobblin.runtime.Task;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.util.ExceptionCleanupUtils;
 import org.apache.gobblin.runtime.util.ForkMetrics;
 import org.apache.gobblin.state.ConstructState;
 import org.apache.gobblin.stream.ControlMessage;
@@ -265,11 +266,13 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
 
       compareAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED);
     } catch (Throwable t) {
+      Throwable cleanedUpException = ExceptionCleanupUtils.removeEmptyWrappers(t);
+
       // Set throwable to holder first because AsynchronousFork::putRecord can pull the throwable when it detects ForkState.FAILED status.
       ForkThrowableHolder holder = Task.getForkThrowableHolder(this.broker);
-      holder.setThrowable(this.getIndex(), t);
+      holder.setThrowable(this.getIndex(), cleanedUpException);
       this.forkState.set(ForkState.FAILED);
-      this.logger.error(String.format("Fork %d of task %s failed to process data records. Set throwable in holder %s", this.index, this.taskId, holder), t);
+      this.logger.error(String.format("Fork %d of task %s failed.", this.index, this.taskId), cleanedUpException);
     } finally {
       this.cleanup();
     }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtils.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtils.java
new file mode 100644
index 0000000..e5727e1
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.util;
+
+public final class ExceptionCleanupUtils {
+
+  private ExceptionCleanupUtils() {
+  }
+
+  /**
+   * Removes exceptions that were wrapping another exception without providing a message of their own.
+   *
+   * When a checked exception is defined on the interface, and one of the implementations want to propagate a
+   * different type of exception, that implementation can wrap the original exception. For example, Gobblin
+   * codebase frequently wraps exception into new IOException(cause). As a result, users see large stack traces
+   * where real error is hidden below several wrappers. This method will remove the wrappers to provide users with
+   * better error messages.
+   * */
+  public static Throwable removeEmptyWrappers(Throwable exception) {
+    if (exception == null) {
+      return null;
+    }
+
+    if (exception.getCause() != null && exception.getCause().toString().equals(exception.getMessage())) {
+      return removeEmptyWrappers(exception.getCause());
+    }
+
+    return exception;
+  }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtilsTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtilsTest.java
new file mode 100644
index 0000000..979bb28
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtilsTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.util;
+
+import java.io.IOException;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+@Test(groups = {"gobblin.runtime"})
+public class ExceptionCleanupUtilsTest {
+
+  @Test
+  public void canRemoveEmptyWrapper() {
+    Exception exception = new IOException(new IllegalArgumentException("root cause"));
+    Throwable rootCause = ExceptionCleanupUtils.removeEmptyWrappers(exception);
+    assertEquals(rootCause.getClass(), IllegalArgumentException.class);
+  }
+
+  @Test
+  public void canRemoveMultipleEmptyWrappers() {
+    Exception exception = new IOException(new IOException(new IllegalArgumentException("root cause")));
+    Throwable unwrapped = ExceptionCleanupUtils.removeEmptyWrappers(exception);
+    assertEquals(unwrapped.getClass(), IllegalArgumentException.class);
+  }
+
+  @Test
+  public void willNotRemoveExceptionWithMessage() {
+    Exception exception = new IOException("test message", new IllegalArgumentException("root cause"));
+    Throwable unwrapped = ExceptionCleanupUtils.removeEmptyWrappers(exception);
+    assertEquals(unwrapped.getClass(), IOException.class);
+  }
+}
\ No newline at end of file