You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/08/21 20:59:34 UTC
incubator-gobblin git commit: [GOBBLIN-212] Fix for exception
handling of TaskStateCollectorServiceHanlder
Repository: incubator-gobblin
Updated Branches:
refs/heads/master c5caa9822 -> 784d7106f
[GOBBLIN-212] Fix for exception handling of TaskStateCollectorServiceHanlder
Closes #2064 from autumnust/inline-hive-fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/784d7106
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/784d7106
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/784d7106
Branch: refs/heads/master
Commit: 784d7106f64ef4594e2005bcfb25e2ed5574a6a7
Parents: c5caa98
Author: Lei Sun <au...@gmail.com>
Authored: Mon Aug 21 13:59:24 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Aug 21 13:59:24 2017 -0700
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 5 +++
...RegTaskStateCollectorServiceHandlerImpl.java | 14 +++----
.../gobblin/runtime/SafeDatasetCommit.java | 8 ++--
.../runtime/TaskStateCollectorService.java | 42 ++++++++++++++++----
.../TaskStateCollectorServiceHandler.java | 5 ++-
.../runtime/TaskStateCollectorServiceTest.java | 2 +-
6 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 3336426..c986fd6 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -579,6 +579,11 @@ public class ConfigurationKeys {
public static final String TASK_STATE_COLLECTOR_HANDLER_CLASS = "task.state.collector.handler.class";
/**
+ * Set to true so that job still proceed if TaskStateCollectorService failed.
+ */
+ public static final String JOB_PROCEED_ON_TASK_STATE_COLLECOTR_SERVICE_FAILURE = "job.proceed.onTaskStateCollectorServiceFailure";
+
+ /**
* Configuration properties for email settings.
*/
public static final String ALERT_EMAIL_ENABLED_KEY = "email.alert.enabled";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
index ecbd1a5..32ad1b9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
@@ -20,9 +20,11 @@ package org.apache.gobblin.runtime;
import java.io.IOException;
import java.util.Collection;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.publisher.HiveRegistrationPublisher;
+import lombok.extern.slf4j.Slf4j;
/**
* A {@link TaskStateCollectorServiceHandler} implementation that execute hive registration on driver level.
@@ -31,22 +33,18 @@ import org.apache.gobblin.publisher.HiveRegistrationPublisher;
* if a single batch of hive registration finishes within a minute, the latency can be hidden by the gap between two run
* of {@link TaskStateCollectorService}.
*/
-
+@Slf4j
public class HiveRegTaskStateCollectorServiceHandlerImpl implements TaskStateCollectorServiceHandler {
private HiveRegistrationPublisher hiveRegHandler;
- public HiveRegTaskStateCollectorServiceHandlerImpl(JobState jobState){
+ public HiveRegTaskStateCollectorServiceHandlerImpl(JobState jobState) {
hiveRegHandler = new HiveRegistrationPublisher(jobState);
}
@Override
- public void handle(Collection<? extends WorkUnitState> taskStates) {
- try {
- this.hiveRegHandler.publishData(taskStates);
- }catch (IOException ioe){
- throw new RuntimeException("Hive-registration pushling of data in TaskStateCollector run into IOException:", ioe);
- }
+ public void handle(Collection<? extends WorkUnitState> taskStates) throws IOException {
+ this.hiveRegHandler.publishData(taskStates);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 9521575..c7bd90f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -370,10 +370,12 @@ final class SafeDatasetCommit implements Callable<Void> {
/**
* Sets the {@link ConfigurationKeys#TASK_FAILURE_EXCEPTION_KEY} for each given {@link TaskState} to the given
* {@link Throwable}.
+ *
+ * Make this method public as this exception catching routine can be reusable in other occasions as well.
*/
- private static void setTaskFailureException(Collection<TaskState> taskStates, Throwable t) {
- for (TaskState taskState : taskStates) {
- taskState.setTaskFailureException(t);
+ public static void setTaskFailureException(Collection<? extends WorkUnitState> taskStates, Throwable t) {
+ for (WorkUnitState taskState : taskStates) {
+ ((TaskState) taskState).setTaskFailureException(t);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
index 6f19243..0318f67 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
@@ -24,6 +24,9 @@ import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +54,7 @@ import org.apache.gobblin.metastore.StateStore;
*
* @author Yinan Li
*/
+@Slf4j
public class TaskStateCollectorService extends AbstractScheduledService {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class);
@@ -74,9 +78,18 @@ public class TaskStateCollectorService extends AbstractScheduledService {
* A typical example to plug here is hive registration:
* We do hive registration everytime there are available taskStates deserialized from storage, on the driver level.
*/
- public final Optional<TaskStateCollectorServiceHandler> optionalTaskCollectorHandler;
+ @Getter
+ private final Optional<TaskStateCollectorServiceHandler> optionalTaskCollectorHandler;
private final Closer handlerCloser = Closer.create();
+ private boolean isJobProceedOnCollectorServiceFailure;
+
+ /**
+ * By default, whether {@link TaskStateCollectorService} finishes successfully or not won't influence
+ * job's proceed.
+ */
+ private static final boolean defaultPolicyOnCollectorServiceFailure = true;
+
public TaskStateCollectorService(Properties jobProps, JobState jobState, EventBus eventBus,
StateStore<TaskState> taskStateStore, Path outputTaskStateDir) {
this.jobState = jobState;
@@ -91,21 +104,24 @@ public class TaskStateCollectorService extends AbstractScheduledService {
Integer.parseInt(jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_INTERVAL_SECONDS,
Integer.toString(ConfigurationKeys.DEFAULT_TASK_STATE_COLLECTOR_INTERVAL_SECONDS)));
- if (jobProps.containsKey(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS)){
+ if (!StringUtils.isBlank(jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS))) {
String handlerTypeName = jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS);
- try{
+ try {
ClassAliasResolver<TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory> aliasResolver =
new ClassAliasResolver<>(TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory.class);
TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory handlerFactory =
aliasResolver.resolveClass(handlerTypeName).newInstance();
optionalTaskCollectorHandler = Optional.of(handlerCloser.register(handlerFactory.createHandler(this.jobState)));
- } catch (ReflectiveOperationException rfe){
+ } catch (ReflectiveOperationException rfe) {
throw new RuntimeException("Could not construct TaskCollectorHandler " + handlerTypeName, rfe);
}
- }
- else{
+ } else {
optionalTaskCollectorHandler = Optional.absent();
}
+
+ isJobProceedOnCollectorServiceFailure =
+ jobState.getPropAsBoolean(ConfigurationKeys.JOB_PROCEED_ON_TASK_STATE_COLLECOTR_SERVICE_FAILURE,
+ defaultPolicyOnCollectorServiceFailure);
}
@Override
@@ -190,9 +206,19 @@ public class TaskStateCollectorService extends AbstractScheduledService {
// Finish any addtional steps defined in handler on driver level.
// Currently implemented handler for Hive registration only.
- if (optionalTaskCollectorHandler.isPresent()){
+ if (optionalTaskCollectorHandler.isPresent()) {
LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks");
- optionalTaskCollectorHandler.get().handle(taskStateQueue);
+
+ try {
+ optionalTaskCollectorHandler.get().handle(taskStateQueue);
+ } catch (Throwable t) {
+ if (isJobProceedOnCollectorServiceFailure) {
+ log.error("Failed to commit dataset while job proceeds", t);
+ SafeDatasetCommit.setTaskFailureException(taskStateQueue, t);
+ } else {
+ throw new RuntimeException("Hive Registration as the TaskStateCollectorServiceHandler failed.", t);
+ }
+ }
}
// Notify the listeners for the completion of the tasks
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
index a35964c..7a45bc2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.runtime;
+import java.io.IOException;
import org.apache.gobblin.configuration.WorkUnitState;
import java.io.Closeable;
import java.util.Collection;
@@ -32,12 +33,12 @@ public interface TaskStateCollectorServiceHandler extends Closeable {
/**
* Interface of handler factory.
*/
- interface TaskStateCollectorServiceHandlerFactory{
+ interface TaskStateCollectorServiceHandlerFactory {
TaskStateCollectorServiceHandler createHandler(JobState jobState);
}
/**
* Execute the actions of handler.
*/
- public void handle(Collection<? extends WorkUnitState> states) ;
+ public void handle(Collection<? extends WorkUnitState> states) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
index 287d7f4..74f8f8c 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
@@ -109,7 +109,7 @@ public class TaskStateCollectorServiceTest {
props.setProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS, "hivereg");
TaskStateCollectorService taskStateCollectorServiceHive = new TaskStateCollectorService(props, this.jobState, this.eventBus,
this.taskStateStore, new Path(this.outputTaskStateDir, JOB_ID + "_prime"));
- Assert.assertEquals(taskStateCollectorServiceHive.optionalTaskCollectorHandler.get().getClass().getName(),
+ Assert.assertEquals(taskStateCollectorServiceHive.getOptionalTaskCollectorHandler().get().getClass().getName(),
"org.apache.gobblin.runtime.HiveRegTaskStateCollectorServiceHandlerImpl");
taskStateCollectorServiceHive.shutDown();
return;