You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/08/21 20:59:34 UTC

incubator-gobblin git commit: [GOBBLIN-212] Fix for exception handling of TaskStateCollectorServiceHanlder

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master c5caa9822 -> 784d7106f


[GOBBLIN-212] Fix for exception handling of TaskStateCollectorServiceHanlder

Closes #2064 from autumnust/inline-hive-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/784d7106
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/784d7106
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/784d7106

Branch: refs/heads/master
Commit: 784d7106f64ef4594e2005bcfb25e2ed5574a6a7
Parents: c5caa98
Author: Lei Sun <au...@gmail.com>
Authored: Mon Aug 21 13:59:24 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Aug 21 13:59:24 2017 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  5 +++
 ...RegTaskStateCollectorServiceHandlerImpl.java | 14 +++----
 .../gobblin/runtime/SafeDatasetCommit.java      |  8 ++--
 .../runtime/TaskStateCollectorService.java      | 42 ++++++++++++++++----
 .../TaskStateCollectorServiceHandler.java       |  5 ++-
 .../runtime/TaskStateCollectorServiceTest.java  |  2 +-
 6 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 3336426..c986fd6 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -579,6 +579,11 @@ public class ConfigurationKeys {
   public static final String TASK_STATE_COLLECTOR_HANDLER_CLASS = "task.state.collector.handler.class";
 
   /**
+   * Set to true so that job still proceed if TaskStateCollectorService failed.
+   */
+  public static final String JOB_PROCEED_ON_TASK_STATE_COLLECOTR_SERVICE_FAILURE = "job.proceed.onTaskStateCollectorServiceFailure";
+
+  /**
    * Configuration properties for email settings.
    */
   public static final String ALERT_EMAIL_ENABLED_KEY = "email.alert.enabled";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
index ecbd1a5..32ad1b9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
@@ -20,9 +20,11 @@ package org.apache.gobblin.runtime;
 import java.io.IOException;
 import java.util.Collection;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.publisher.HiveRegistrationPublisher;
 
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * A {@link TaskStateCollectorServiceHandler} implementation that execute hive registration on driver level.
@@ -31,22 +33,18 @@ import org.apache.gobblin.publisher.HiveRegistrationPublisher;
  * if a single batch of hive registration finishes within a minute, the latency can be hidden by the gap between two run
  * of {@link TaskStateCollectorService}.
  */
-
+@Slf4j
 public class HiveRegTaskStateCollectorServiceHandlerImpl implements TaskStateCollectorServiceHandler {
 
   private HiveRegistrationPublisher hiveRegHandler;
 
-  public HiveRegTaskStateCollectorServiceHandlerImpl(JobState jobState){
+  public HiveRegTaskStateCollectorServiceHandlerImpl(JobState jobState) {
     hiveRegHandler = new HiveRegistrationPublisher(jobState);
   }
 
   @Override
-  public void handle(Collection<? extends WorkUnitState> taskStates) {
-    try {
-      this.hiveRegHandler.publishData(taskStates);
-    }catch (IOException ioe){
-      throw new RuntimeException("Hive-registration pushling of data in TaskStateCollector run into IOException:", ioe);
-    }
+  public void handle(Collection<? extends WorkUnitState> taskStates) throws IOException {
+    this.hiveRegHandler.publishData(taskStates);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 9521575..c7bd90f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -370,10 +370,12 @@ final class SafeDatasetCommit implements Callable<Void> {
   /**
    * Sets the {@link ConfigurationKeys#TASK_FAILURE_EXCEPTION_KEY} for each given {@link TaskState} to the given
    * {@link Throwable}.
+   *
+   * Make this method public as this exception catching routine can be reusable in other occasions as well.
    */
-  private static void setTaskFailureException(Collection<TaskState> taskStates, Throwable t) {
-    for (TaskState taskState : taskStates) {
-      taskState.setTaskFailureException(t);
+  public static void setTaskFailureException(Collection<? extends WorkUnitState> taskStates, Throwable t) {
+    for (WorkUnitState taskState : taskStates) {
+      ((TaskState) taskState).setTaskFailureException(t);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
index 6f19243..0318f67 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
@@ -24,6 +24,9 @@ import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +54,7 @@ import org.apache.gobblin.metastore.StateStore;
  *
  * @author Yinan Li
  */
+@Slf4j
 public class TaskStateCollectorService extends AbstractScheduledService {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class);
@@ -74,9 +78,18 @@ public class TaskStateCollectorService extends AbstractScheduledService {
    * A typical example to plug here is hive registration:
    * We do hive registration everytime there are available taskStates deserialized from storage, on the driver level.
    */
-  public final Optional<TaskStateCollectorServiceHandler> optionalTaskCollectorHandler;
+  @Getter
+  private final Optional<TaskStateCollectorServiceHandler> optionalTaskCollectorHandler;
   private final Closer handlerCloser = Closer.create();
 
+  private boolean isJobProceedOnCollectorServiceFailure;
+
+  /**
+   * By default, whether {@link TaskStateCollectorService} finishes successfully or not won't influence
+   * job's proceed.
+   */
+  private static final boolean defaultPolicyOnCollectorServiceFailure = true;
+
   public TaskStateCollectorService(Properties jobProps, JobState jobState, EventBus eventBus,
       StateStore<TaskState> taskStateStore, Path outputTaskStateDir) {
     this.jobState = jobState;
@@ -91,21 +104,24 @@ public class TaskStateCollectorService extends AbstractScheduledService {
         Integer.parseInt(jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_INTERVAL_SECONDS,
             Integer.toString(ConfigurationKeys.DEFAULT_TASK_STATE_COLLECTOR_INTERVAL_SECONDS)));
 
-    if (jobProps.containsKey(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS)){
+    if (!StringUtils.isBlank(jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS))) {
       String handlerTypeName = jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS);
-      try{
+      try {
         ClassAliasResolver<TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory> aliasResolver =
             new ClassAliasResolver<>(TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory.class);
         TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory handlerFactory =
             aliasResolver.resolveClass(handlerTypeName).newInstance();
         optionalTaskCollectorHandler = Optional.of(handlerCloser.register(handlerFactory.createHandler(this.jobState)));
-      } catch (ReflectiveOperationException rfe){
+      } catch (ReflectiveOperationException rfe) {
         throw new RuntimeException("Could not construct TaskCollectorHandler " + handlerTypeName, rfe);
       }
-    }
-    else{
+    } else {
       optionalTaskCollectorHandler = Optional.absent();
     }
+
+    isJobProceedOnCollectorServiceFailure =
+        jobState.getPropAsBoolean(ConfigurationKeys.JOB_PROCEED_ON_TASK_STATE_COLLECOTR_SERVICE_FAILURE,
+            defaultPolicyOnCollectorServiceFailure);
   }
 
   @Override
@@ -190,9 +206,19 @@ public class TaskStateCollectorService extends AbstractScheduledService {
 
     // Finish any addtional steps defined in handler on driver level.
     // Currently implemented handler for Hive registration only.
-    if (optionalTaskCollectorHandler.isPresent()){
+    if (optionalTaskCollectorHandler.isPresent()) {
       LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks");
-      optionalTaskCollectorHandler.get().handle(taskStateQueue);
+
+      try {
+        optionalTaskCollectorHandler.get().handle(taskStateQueue);
+      } catch (Throwable t) {
+        if (isJobProceedOnCollectorServiceFailure) {
+          log.error("Failed to commit dataset while job proceeds", t);
+          SafeDatasetCommit.setTaskFailureException(taskStateQueue, t);
+        } else {
+          throw new RuntimeException("Hive Registration as the TaskStateCollectorServiceHandler failed.", t);
+        }
+      }
     }
 
     // Notify the listeners for the completion of the tasks

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
index a35964c..7a45bc2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime;
 
+import java.io.IOException;
 import org.apache.gobblin.configuration.WorkUnitState;
 import java.io.Closeable;
 import java.util.Collection;
@@ -32,12 +33,12 @@ public interface TaskStateCollectorServiceHandler extends Closeable {
   /**
    * Interface of handler factory.
    */
-  interface TaskStateCollectorServiceHandlerFactory{
+  interface TaskStateCollectorServiceHandlerFactory {
     TaskStateCollectorServiceHandler createHandler(JobState jobState);
   }
 
   /**
    * Execute the actions of handler.
    */
-  public void handle(Collection<? extends WorkUnitState> states)  ;
+  public void handle(Collection<? extends WorkUnitState> states) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
index 287d7f4..74f8f8c 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
@@ -109,7 +109,7 @@ public class TaskStateCollectorServiceTest {
     props.setProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS, "hivereg");
     TaskStateCollectorService taskStateCollectorServiceHive = new TaskStateCollectorService(props, this.jobState, this.eventBus,
         this.taskStateStore, new Path(this.outputTaskStateDir, JOB_ID + "_prime"));
-    Assert.assertEquals(taskStateCollectorServiceHive.optionalTaskCollectorHandler.get().getClass().getName(),
+    Assert.assertEquals(taskStateCollectorServiceHive.getOptionalTaskCollectorHandler().get().getClass().getName(),
         "org.apache.gobblin.runtime.HiveRegTaskStateCollectorServiceHandlerImpl");
     taskStateCollectorServiceHive.shutDown();
     return;