You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/03/08 20:22:29 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1071] Retry
task initialization
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b011aae [GOBBLIN-1071] Retry task initialization
b011aae is described below
commit b011aaeddd976b742ddb04a667d233c9a698c451
Author: autumnust <le...@linkedin.com>
AuthorDate: Sun Mar 8 13:22:17 2020 -0700
[GOBBLIN-1071] Retry task initialization
Retry task initialization
Removing task-cancel blocking code ( and will
create in another PR)
Address comments
Fix travis failure
Fix travis failure II
Fix travis failure III
Closes #2909 from autumnust/retryTaskCreation
---
.../apache/gobblin/cluster/GobblinHelixTask.java | 52 ++++++++----
.../apache/gobblin/cluster/GobblinTaskRunner.java | 26 +++---
.../gobblin/cluster/InMemorySingleTaskRunner.java | 26 +++++-
.../cluster/InMemoryWuFailedSingleTask.java | 2 +-
.../gobblin/cluster/InMemoryWuSingleTask.java | 2 +-
.../org/apache/gobblin/cluster/SingleTask.java | 32 +++++---
.../apache/gobblin/cluster/SingleTaskRunner.java | 21 +++--
.../gobblin/cluster/GobblinHelixTaskTest.java | 96 +++++++++++++++-------
...estSingleTaskRerun.java => TestSingleTask.java} | 36 ++++++--
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 65 ++++++++++++---
.../main/java/org/apache/gobblin/runtime/Task.java | 1 -
.../gobblin/runtime/util/RuntimeConstructs.java | 4 +
.../apache/gobblin/TaskErrorIntegrationTest.java | 76 +++++++++++++----
.../apache/gobblin/util/retry/RetryerFactory.java | 19 ++++-
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 10 +--
15 files changed, 333 insertions(+), 135 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 122a8d5..21e420d 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -18,7 +18,11 @@
package org.apache.gobblin.cluster;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.retry.RetryerFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.task.JobContext;
@@ -29,6 +33,7 @@ import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskResult;
import org.slf4j.MDC;
+import com.github.rholder.retry.Retryer;
import com.google.common.base.Throwables;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
@@ -97,28 +102,37 @@ public class GobblinHelixTask implements Task {
builder.getAppWorkPath(),
this.jobId);
- Config dynamicConfig = builder.getDynamicConfig()
- .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
- .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
- .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
- .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
- .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId));
-
Integer partitionNum = getPartitionForHelixTask(taskDriver);
-
if (partitionNum == null) {
throw new IllegalStateException(String.format("Task %s, job %s on instance %s has no partition assigned",
this.helixTaskId, builder.getInstanceName(), this.helixJobId));
}
- dynamicConfig = dynamicConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
- this.task = new SingleTask(this.jobId,
- this.workUnitFilePath,
- jobStateFilePath,
- builder.getFs(),
- taskAttemptBuilder,
- stateStores,
- dynamicConfig);
+ // Dynamic config is considered as part of JobState in SingleTask
+ // Important to distinguish between dynamicConfig and Config
+ final Config dynamicConfig = builder.getDynamicConfig()
+ .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
+ .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
+
+ Retryer<SingleTask> retryer = RetryerFactory.newInstance(builder.getConfig());
+
+ try {
+ this.task = retryer.call(new Callable<SingleTask>() {
+ @Override
+ public SingleTask call()
+ throws Exception {
+ return new SingleTask(jobId, workUnitFilePath, jobStateFilePath, builder.getFs(), taskAttemptBuilder,
+ stateStores,
+ dynamicConfig);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Execution in creating a SingleTask-with-retry failed", e);
+ }
}
private void getInfoFromTaskConfig() {
@@ -170,7 +184,9 @@ public class GobblinHelixTask implements Task {
@Override
public void cancel() {
- log.warn("Gobblin helix task cancellation invoked.");
- this.task.cancel();
+ log.info("Gobblin helix task cancellation invoked.");
+ if (this.task != null ) {
+ this.task.cancel();
+ }
}
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 4504cb5..d71dbd9 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -142,7 +142,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
protected final EventBus eventBus = new EventBus(GobblinTaskRunner.class.getSimpleName());
- protected final Config config;
+ protected final Config clusterConfig;
@Getter
protected final FileSystem fs;
@@ -171,8 +171,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
Configuration conf = HadoopUtils.newConfiguration();
this.fs = buildFileSystem(config, conf);
this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
- this.config = saveConfigToFile(config);
- this.clusterName = this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ this.clusterConfig = saveConfigToFile(config);
+ this.clusterName = this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
//Set system properties passed in via application config. As an example, Helix uses System#getProperty() for ZK configuration
// overrides such as sessionTimeout. In this case, the overrides specified
@@ -183,7 +183,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
this.containerMetrics = buildContainerMetrics();
- String builderStr = ConfigUtils.getString(this.config,
+ String builderStr = ConfigUtils.getString(this.clusterConfig,
GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
TaskRunnerSuiteBase.Builder.class.getName());
@@ -196,7 +196,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
TaskRunnerSuiteBase.Builder builder = GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
- .resolveClass(builderStr), this.config);
+ .resolveClass(builderStr), this.clusterConfig);
TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath)
.setContainerMetrics(this.containerMetrics)
@@ -238,13 +238,13 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
private void initHelixManager() {
String zkConnectionString =
- this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ this.clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
logger.info("Using ZooKeeper connection string: " + zkConnectionString);
if (this.isTaskDriver && this.dedicatedTaskDriverCluster) {
// This will create a Helix manager to receive the planning job
this.taskDriverHelixManager = Optional.of(HelixManagerFactory.getZKHelixManager(
- ConfigUtils.getString(this.config, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, ""),
+ ConfigUtils.getString(this.clusterConfig, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, ""),
this.helixInstanceName,
InstanceType.PARTICIPANT,
zkConnectionString));
@@ -301,7 +301,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
// Start metric reporting
if (this.containerMetrics.isPresent()) {
this.containerMetrics.get()
- .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.config),
+ .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig),
this.taskRunnerId);
}
@@ -355,9 +355,9 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
*/
protected List<Service> getServices() {
List<Service> serviceList = new ArrayList<>();
- if (ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED,
+ if (ConfigUtils.getBoolean(this.clusterConfig, GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED,
GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED)) {
- serviceList.add(new ContainerHealthMetricsService(config));
+ serviceList.add(new ContainerHealthMetricsService(clusterConfig));
}
return serviceList;
}
@@ -392,7 +392,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
* the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance with EXAMPLE_INSTANCE_TAG was found.
*/
private void addInstanceTags() {
- List<String> tags = ConfigUtils.getStringList(this.config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
+ List<String> tags = ConfigUtils.getStringList(this.clusterConfig, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
HelixManager receiverManager = getReceiverManager();
if (receiverManager.isConnected()) {
if (!tags.isEmpty()) {
@@ -449,10 +449,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
private Optional<ContainerMetrics> buildContainerMetrics() {
- Properties properties = ConfigUtils.configToProperties(this.config);
+ Properties properties = ConfigUtils.configToProperties(this.clusterConfig);
if (GobblinMetrics.isEnabled(properties)) {
return Optional.of(ContainerMetrics
- .get(ConfigUtils.configToState(config), this.applicationName, this.taskRunnerId));
+ .get(ConfigUtils.configToState(clusterConfig), this.applicationName, this.taskRunnerId));
} else {
return Optional.absent();
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
index 7af586e..b411ff8 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
@@ -17,25 +17,43 @@
package org.apache.gobblin.cluster;
+import java.io.IOException;
+
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
/**
* An taskRunner for in-memory {@link SingleTask} that can be switched to run a meant-to-failed task.
+ * This class is primarily used for testing purpose.
*/
public class InMemorySingleTaskRunner extends SingleTaskRunner {
+ // Inject configuration by calling set method.
+ private Config injectedConfig = ConfigFactory.empty();
+
public InMemorySingleTaskRunner(String clusterConfigFilePath, String jobId, String workUnitFilePath) {
super(clusterConfigFilePath, jobId, workUnitFilePath);
}
@Override
- protected SingleTask createTaskAttempt(TaskAttemptBuilder taskAttemptBuilder, FileSystem fs, StateStores stateStores,
- Path jobStateFilePath, boolean fail) {
+ protected SingleTask createSingleTaskHelper(TaskAttemptBuilder taskAttemptBuilder, FileSystem fs,
+ StateStores stateStores, Path jobStateFilePath, boolean fail)
+ throws IOException {
return !fail ? new InMemoryWuSingleTask(this.jobId, new Path(this.workUnitFilePath), jobStateFilePath, fs,
- taskAttemptBuilder, stateStores, GobblinClusterUtils.getDynamicConfig(this.clusterConfig))
+ taskAttemptBuilder, stateStores,
+ GobblinClusterUtils.getDynamicConfig(this.clusterConfig).withFallback(injectedConfig))
: new InMemoryWuFailedSingleTask(this.jobId, new Path(this.workUnitFilePath), jobStateFilePath, fs,
- taskAttemptBuilder, stateStores, GobblinClusterUtils.getDynamicConfig(this.clusterConfig));
+ taskAttemptBuilder, stateStores,
+ GobblinClusterUtils.getDynamicConfig(this.clusterConfig).withFallback(injectedConfig));
+ }
+
+ @VisibleForTesting
+ void setInjectedConfig(Config injectedConfig) {
+ this.injectedConfig = injectedConfig;
}
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
index 75cbf61..43f6944 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
@@ -38,7 +38,7 @@ import com.typesafe.config.Config;
*/
public class InMemoryWuFailedSingleTask extends SingleTask {
public InMemoryWuFailedSingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
- TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
+ TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException {
super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, stateStores, dynamicConfig);
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
index f60ba9a..2e66cd5 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
@@ -41,7 +41,7 @@ import com.typesafe.config.Config;
*/
public class InMemoryWuSingleTask extends SingleTask {
public InMemoryWuSingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
- TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
+ TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException {
super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, stateStores, dynamicConfig);
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 16a64fa..be0f0de 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -50,8 +50,10 @@ import org.apache.gobblin.util.SerializationUtils;
public class SingleTask {
private static final Logger _logger = LoggerFactory.getLogger(SingleTask.class);
+ public static final String MAX_RETRY_WAITING_FOR_INIT_KEY = "maxRetryBlockedOnTaskAttemptInit";
+ public static final int DEFAULT_MAX_RETRY_WAITING_FOR_INIT = 2;
- private GobblinMultiTaskAttempt _taskattempt;
+ private GobblinMultiTaskAttempt _taskAttempt;
private String _jobId;
private Path _workUnitFilePath;
private Path _jobStateFilePath;
@@ -59,9 +61,14 @@ public class SingleTask {
private TaskAttemptBuilder _taskAttemptBuilder;
private StateStores _stateStores;
private Config _dynamicConfig;
+ private JobState _jobState;
+ /**
+ * Do all heavy-lifting of initialization in constructor which could be retried if failed,
+ * see the example in {@link GobblinHelixTask}.
+ */
SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
- TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
+ TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException {
_jobId = jobId;
_workUnitFilePath = workUnitFilePath;
_jobStateFilePath = jobStateFilePath;
@@ -69,30 +76,31 @@ public class SingleTask {
_taskAttemptBuilder = taskAttemptBuilder;
_stateStores = stateStores;
_dynamicConfig = dynamicConfig;
+ _jobState = getJobState();
}
public void run()
throws IOException, InterruptedException {
List<WorkUnit> workUnits = getWorkUnits();
- JobState jobState = getJobState();
// Add dynamic configuration to the job state
- _dynamicConfig.entrySet().forEach(e -> jobState.setProp(e.getKey(), e.getValue().unwrapped().toString()));
+ _dynamicConfig.entrySet().forEach(e -> _jobState.setProp(e.getKey(), e.getValue().unwrapped().toString()));
- Config jobConfig = getConfigFromJobState(jobState);
+ Config jobConfig = getConfigFromJobState(_jobState);
_logger.debug("SingleTask.run: jobId {} workUnitFilePath {} jobStateFilePath {} jobState {} jobConfig {}",
- _jobId, _workUnitFilePath, _jobStateFilePath, jobState, jobConfig);
+ _jobId, _workUnitFilePath, _jobStateFilePath, _jobState, jobConfig);
try (SharedResourcesBroker<GobblinScopeTypes> globalBroker = SharedResourcesBrokerFactory
.createDefaultTopLevelBroker(jobConfig, GobblinScopeTypes.GLOBAL.defaultScopeInstance())) {
- SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(jobState, globalBroker);
+ SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(_jobState, globalBroker);
+
+ _taskAttempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, _jobState, jobBroker);
+ _taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
- _taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, jobState, jobBroker);
- _taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
} finally {
_logger.info("Clearing all metrics object in cache.");
- _taskattempt.cleanMetrics();
+ _taskAttempt.cleanMetrics();
}
}
@@ -152,10 +160,10 @@ public class SingleTask {
}
public void cancel() {
- if (_taskattempt != null) {
+ if (_taskAttempt != null) {
try {
_logger.info("Task cancelled: Shutdown starting for tasks with jobId: {}", _jobId);
- _taskattempt.shutdownTasks();
+ _taskAttempt.shutdownTasks();
_logger.info("Task cancelled: Shutdown complete for tasks with jobId: {}", _jobId);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while shutting down task with jobId: " + _jobId, e);
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
index 73d535f..de97fec 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ServiceManager;
@@ -53,7 +54,8 @@ class SingleTaskRunner {
protected final String workUnitFilePath;
protected final Config clusterConfig;
private final Path appWorkPath;
- private SingleTask task;
+ @VisibleForTesting
+ SingleTask task;
private TaskExecutor taskExecutor;
private GobblinHelixTaskStateTracker taskStateTracker;
private ServiceManager serviceManager;
@@ -83,9 +85,10 @@ class SingleTaskRunner {
shutdownServices();
}
- private void startServices() {
+ @VisibleForTesting
+ void startServices() {
logger.info("SingleTaskRunner start services.");
- getServices();
+ initServices();
this.serviceManager.startAsync();
try {
this.serviceManager.awaitHealthy(10, TimeUnit.SECONDS);
@@ -107,11 +110,11 @@ class SingleTaskRunner {
private void runTask(boolean fail)
throws IOException, InterruptedException {
logger.info("SingleTaskRunner running task.");
- getClusterSingleTask(fail);
+ initClusterSingleTask(fail);
this.task.run();
}
- private void getClusterSingleTask(boolean fail)
+ void initClusterSingleTask(boolean fail)
throws IOException {
final FileSystem fs = getFileSystem();
final StateStores stateStores = new StateStores(this.clusterConfig, this.appWorkPath,
@@ -123,11 +126,11 @@ class SingleTaskRunner {
final TaskAttemptBuilder taskAttemptBuilder = getTaskAttemptBuilder(stateStores);
- this.task = createTaskAttempt(taskAttemptBuilder, fs, stateStores, jobStateFilePath, fail);
+ this.task = createSingleTaskHelper(taskAttemptBuilder, fs, stateStores, jobStateFilePath, fail);
}
- protected SingleTask createTaskAttempt(TaskAttemptBuilder taskAttemptBuilder, FileSystem fs,
- StateStores stateStores, Path jobStateFilePath, boolean fail) {
+ protected SingleTask createSingleTaskHelper(TaskAttemptBuilder taskAttemptBuilder, FileSystem fs,
+ StateStores stateStores, Path jobStateFilePath, boolean fail) throws IOException {
return new SingleTask(this.jobId, new Path(this.workUnitFilePath), jobStateFilePath, fs,
taskAttemptBuilder, stateStores, GobblinClusterUtils.getDynamicConfig(this.clusterConfig));
}
@@ -140,7 +143,7 @@ class SingleTaskRunner {
return taskAttemptBuilder;
}
- private void getServices() {
+ private void initServices() {
final Properties properties = ConfigUtils.configToProperties(this.clusterConfig);
this.taskExecutor = new TaskExecutor(properties);
this.taskStateTracker = new GobblinHelixTaskStateTracker(properties);
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index 24c597e..ce25fb9 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -23,6 +23,20 @@ import java.util.Map;
import java.util.Properties;
import org.apache.avro.Schema;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.example.simplejson.SimpleJsonConverter;
+import org.apache.gobblin.example.simplejson.SimpleJsonSource;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.retry.RetryerFactory;
+import org.apache.gobblin.writer.AvroDataWriterBuilder;
+import org.apache.gobblin.writer.Destination;
+import org.apache.gobblin.writer.WriterOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,20 +58,11 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.example.simplejson.SimpleJsonConverter;
-import org.apache.gobblin.example.simplejson.SimpleJsonSource;
-import org.apache.gobblin.metastore.FsStateStore;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.TaskExecutor;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.util.Id;
-import org.apache.gobblin.util.SerializationUtils;
-import org.apache.gobblin.writer.AvroDataWriterBuilder;
-import org.apache.gobblin.writer.Destination;
-import org.apache.gobblin.writer.WriterOutputFormat;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
+import static org.mockito.Mockito.when;
/**
@@ -96,7 +101,7 @@ public class GobblinHelixTaskTest {
this.taskExecutor = new TaskExecutor(configuration);
this.helixManager = Mockito.mock(HelixManager.class);
- Mockito.when(this.helixManager.getInstanceName()).thenReturn(GobblinHelixTaskTest.class.getSimpleName());
+ when(this.helixManager.getInstanceName()).thenReturn(GobblinHelixTaskTest.class.getSimpleName());
this.taskStateTracker = new GobblinHelixTaskStateTracker(new Properties());
this.localFs = FileSystem.getLocal(configuration);
@@ -143,21 +148,14 @@ public class GobblinHelixTaskTest {
TaskConfig taskConfig = new TaskConfig("", taskConfigMap, true);
TaskCallbackContext taskCallbackContext = Mockito.mock(TaskCallbackContext.class);
- Mockito.when(taskCallbackContext.getTaskConfig()).thenReturn(taskConfig);
- Mockito.when(taskCallbackContext.getManager()).thenReturn(this.helixManager);
- String helixJobId = Joiner.on("_").join(TestHelper.TEST_JOB_ID, TestHelper.TEST_JOB_ID);
- JobConfig jobConfig = Mockito.mock(JobConfig.class);
-
- Mockito.when(jobConfig.getJobId()).thenReturn(helixJobId);
- Mockito.when(taskCallbackContext.getJobConfig()).thenReturn(jobConfig);
- JobContext mockJobContext = Mockito.mock(JobContext.class);
- Map<String, Integer> taskIdPartitionMap = ImmutableMap.of(taskConfig.getId(), 0);
- Mockito.when(mockJobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
-
- TaskDriver taskDriver = Mockito.mock(TaskDriver.class);
- Mockito.when(taskDriver.getJobContext(Mockito.anyString())).thenReturn(mockJobContext);
-
- TaskRunnerSuiteBase.Builder builder = new TaskRunnerSuiteBase.Builder(ConfigFactory.empty());
+ when(taskCallbackContext.getTaskConfig()).thenReturn(taskConfig);
+ when(taskCallbackContext.getManager()).thenReturn(this.helixManager);
+ TaskDriver taskDriver = createTaskDriverWithMockedAttributes(taskCallbackContext, taskConfig);
+
+ TaskRunnerSuiteBase.Builder builder = new TaskRunnerSuiteBase.Builder(ConfigFactory.empty()
+ .withValue(RETRY_TYPE, ConfigValueFactory.fromAnyRef(RetryerFactory.RetryType.FIXED_ATTEMPT.name()))
+ .withValue(RETRY_TIMES, ConfigValueFactory.fromAnyRef(2))
+ );
TaskRunnerSuiteBase sb = builder.setInstanceName("TestInstance")
.setApplicationName("TestApplication")
.setAppWorkPath(appWorkDir)
@@ -174,8 +172,46 @@ public class GobblinHelixTaskTest {
ConfigFactory.empty(),
Optional.of(taskDriver));
+ // Expect to go through.
this.gobblinHelixTask = (GobblinHelixTask) gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
- Thread.sleep(1000);
+
+ // Mock the method getFs() which get called in SingleTask constructor, so that SingleTask could fail and trigger retry,
+ // which would also fail eventually with timeout.
+ TaskRunnerSuiteBase.Builder builderSpy = Mockito.spy(builder);
+ when(builderSpy.getFs()).thenThrow(new RuntimeException("failure on purpose"));
+ gobblinHelixTaskFactory =
+ new GobblinHelixTaskFactory(builderSpy,
+ sb.metricContext,
+ this.taskStateTracker,
+ ConfigFactory.empty(),
+ Optional.of(taskDriver));
+ try {
+ gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Execution in creating a SingleTask-with-retry failed"));
+ return;
+ }
+ // Won't reach here.
+ Assert.fail();
+ }
+
+ /**
+ * To test against org.apache.gobblin.cluster.GobblinHelixTask#getPartitionForHelixTask(org.apache.helix.task.TaskDriver)
+ * we need to assign the right partition id for each helix task, which would be queried from taskDriver.
+ * This method encapsulate all mocking steps for taskDriver object to return expected value.
+ */
+ private TaskDriver createTaskDriverWithMockedAttributes(TaskCallbackContext taskCallbackContext,
+ TaskConfig taskConfig) {
+ String helixJobId = Joiner.on("_").join(TestHelper.TEST_JOB_ID, TestHelper.TEST_JOB_ID);
+ JobConfig jobConfig = Mockito.mock(JobConfig.class);
+ when(jobConfig.getJobId()).thenReturn(helixJobId);
+ when(taskCallbackContext.getJobConfig()).thenReturn(jobConfig);
+ JobContext mockJobContext = Mockito.mock(JobContext.class);
+ Map<String, Integer> taskIdPartitionMap = ImmutableMap.of(taskConfig.getId(), 0);
+ when(mockJobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
+ TaskDriver taskDriver = Mockito.mock(TaskDriver.class);
+ when(taskDriver.getJobContext(Mockito.anyString())).thenReturn(mockJobContext);
+ return taskDriver;
}
@Test(dependsOnMethods = "testPrepareTask")
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
similarity index 68%
rename from gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java
rename to gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
index 4fa3d74..2b6499c 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
@@ -17,9 +17,22 @@
package org.apache.gobblin.cluster;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+
+import org.apache.gobblin.testing.AssertWithBackoff;
import org.junit.Assert;
import org.testng.annotations.Test;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.ConfigFactory;
+
+import javax.annotation.Nullable;
+
+import static org.apache.gobblin.cluster.SingleTask.MAX_RETRY_WAITING_FOR_INIT_KEY;
+
/**
* Notes & Usage:
@@ -28,7 +41,18 @@ import org.testng.annotations.Test;
* 2. When needed to reproduce certain errors, replace org.apache.gobblin.cluster.DummySource.DummyExtractor or
* {@link DummySource} to plug in required logic.
*/
-public class TestSingleTaskRerun {
+public class TestSingleTask {
+
+ private InMemorySingleTaskRunner createInMemoryTaskRunner() {
+ final String clusterConfigPath = "clusterConf";
+ final String wuPath = "_workunits/store/workunit.wu";
+ String clusterConfPath = this.getClass().getClassLoader().getResource(clusterConfigPath).getPath();
+
+ InMemorySingleTaskRunner inMemorySingleTaskRunner = new InMemorySingleTaskRunner(clusterConfPath, "testJob",
+ this.getClass().getClassLoader().getResource(wuPath).getPath());
+
+ return inMemorySingleTaskRunner;
+ }
/**
* An in-memory {@link SingleTask} runner that could be used to simulate how it works in Gobblin-Cluster.
@@ -36,15 +60,9 @@ public class TestSingleTaskRerun {
* re-run it again.
*/
@Test
- public void testMetricObjectCasting()
+ public void testSingleTaskRerunAfterFailure()
throws Exception {
- final String clusterConfigPath = "clusterConf";
- final String wuPath = "_workunits/store/workunit.wu";
- String clusterConfPath = this.getClass().getClassLoader().getResource(clusterConfigPath).getPath();
-
- InMemorySingleTaskRunner inMemorySingleTaskRunner =
- new InMemorySingleTaskRunner(clusterConfPath, "testJob",
- this.getClass().getClassLoader().getResource(wuPath).getPath());
+ SingleTaskRunner inMemorySingleTaskRunner = createInMemoryTaskRunner();
try {
inMemorySingleTaskRunner.run(true);
} catch (Exception e) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 402a3ff..eded48c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -28,20 +28,9 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-import javax.annotation.Nullable;
-import lombok.Setter;
-
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
@@ -61,10 +50,30 @@ import org.apache.gobblin.runtime.task.TaskUtils;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.runtime.util.TaskMetrics;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.TaskEventMetadataUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.gobblin.util.retry.RetryerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import javax.annotation.Nullable;
+import lombok.Setter;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIME_OUT_MS;
/**
@@ -393,7 +402,7 @@ public class GobblinMultiTaskAttempt {
Task task = null;
try {
countDownLatch.countUp();
- task = createTaskRunnable(workUnitState, countDownLatch);
+ task = createTaskWithRetry(workUnitState, countDownLatch);
this.taskStateTracker.registerNewTask(task);
task.setTaskFuture(this.taskExecutor.submit(task));
tasks.add(task);
@@ -456,6 +465,36 @@ public class GobblinMultiTaskAttempt {
}
}
+ /**
+ * As the initialization of {@link Task} could have unstable external connection which could be healed through
+ * retry, adding retry-wrapper here for the sake of fault-tolerance.
+ */
+ private Task createTaskWithRetry(WorkUnitState workUnitState, CountDownLatch countDownLatch) {
+ Config config = ConfigUtils.propertiesToConfig(this.jobState.getProperties())
+ .withValue(RETRY_TIME_OUT_MS, ConfigValueFactory.fromAnyRef(TimeUnit.MINUTES.toMillis(1L)))
+ .withValue(RETRY_INTERVAL_MS, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.toMillis(2L)));
+ Retryer<Task> retryer = RetryerFactory.newInstance(config);
+ // An "effectively final" variable for counting how many retried has been done, mostly for logging purpose.
+ final AtomicInteger counter = new AtomicInteger(0);
+
+ try {
+ return retryer.call(new Callable<Task>() {
+ @Override
+ public Task call()
+ throws Exception {
+ counter.incrementAndGet();
+ log.info(String.format("Task creation attempt %s", counter.get()));
+ return createTaskRunnable(workUnitState, countDownLatch);
+ }
+ });
+ } catch (RetryException re) {
+ log.error(String.format("Fatal Exception creating Task after %s retries", counter));
+ throw Throwables.propagate(re.getLastFailedAttempt().getExceptionCause());
+ } catch (ExecutionException ee) {
+ throw new RuntimeException("Failure in executing retryer due to, ", ee);
+ }
+ }
+
public void runAndOptionallyCommitTaskAttempt(CommitPolicy multiTaskAttemptCommitPolicy)
throws IOException, InterruptedException {
run();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index ecb7c66..3bc36be 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -237,7 +237,6 @@ public class Task implements TaskIFace {
this.shutdownLatch = new CountDownLatch(1);
// Setup Streaming constructs
-
if (isStreamingTask()) {
Extractor underlyingExtractor = this.taskContext.getRawSourceExtractor();
if (!(underlyingExtractor instanceof StreamingExtractor)) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/RuntimeConstructs.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/RuntimeConstructs.java
index 5060bac..f4ec28b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/RuntimeConstructs.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/RuntimeConstructs.java
@@ -20,6 +20,10 @@ package org.apache.gobblin.runtime.util;
import org.apache.gobblin.runtime.fork.Fork;
+/**
+ * @deprecated due to non essential usage within this repository.
+ */
+@Deprecated
public enum RuntimeConstructs {
FORK("Fork", Fork.class);
diff --git a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
index 9110d20..473719f 100644
--- a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
+++ b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
@@ -24,14 +24,24 @@ import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.publisher.NoopPublisher;
+import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.task.BaseAbstractTask;
import org.apache.gobblin.runtime.task.TaskFactory;
import org.apache.gobblin.runtime.task.TaskIFace;
import org.apache.gobblin.runtime.task.TaskUtils;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.retry.RetryerFactory;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -43,15 +53,8 @@ import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor;
-import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
-import org.apache.gobblin.source.Source;
-import org.apache.gobblin.source.extractor.DataRecordException;
-import org.apache.gobblin.source.extractor.Extractor;
-import org.apache.gobblin.source.workunit.WorkUnit;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
@Test (singleThreaded = true)
@@ -67,7 +70,7 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
/**
* Test that an extractor that raises an error on creation results in a log message from {@link GobblinMultiTaskAttempt}
- * and does not hang
+ * and does not hang.
* @throws Exception
*/
@Test
@@ -82,6 +85,9 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, BaseTestSource.class.getName());
jobProperties.setProperty(TestExtractor.RAISE_ERROR, "true");
+ jobProperties.setProperty(RETRY_TYPE, RetryerFactory.RetryType.FIXED_ATTEMPT.name());
+ // Disable retry
+ jobProperties.setProperty(RETRY_TIMES, "1");
GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
@@ -92,6 +98,31 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
}
/**
+ * Test when extractor failure happens as above, retry kicked in and heal the extractor itself.
+ */
+ @Test
+ public void extractorCreationErrorWithRetry() throws Exception {
+ Properties jobProperties =
+ GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
+
+ jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, BaseTestSource.class.getName());
+ jobProperties.setProperty(TestExtractor.RAISE_ERROR, "true");
+ jobProperties.setProperty(RETRY_TYPE, RetryerFactory.RetryType.FIXED_ATTEMPT.name());
+ jobProperties.setProperty(TestExtractor.ENABLE_RETRY_FLIP, "true");
+ // Enable retry and should work for the second time.
+ jobProperties.setProperty(RETRY_TIMES, "2");
+
+ // Any failure should fail the test.
+ try {
+ GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+ } catch (Throwable t) {
+ Assert.fail();
+ }
+
+ Assert.assertTrue(true);
+ }
+
+ /**
* Test that a task submission error results in a log message from {@link GobblinMultiTaskAttempt}
* and does not hang
* @throws Exception
@@ -129,26 +160,39 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
Properties jobProperties =
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, CustomizedTaskTestSource.class.getName());
+ // To demonstrate failure caught in task creation in test setting, disabled retry in task creation.
+ jobProperties.setProperty(RETRY_TIMES, "1");
+ jobProperties.setProperty(RETRY_TYPE, RetryerFactory.RetryType.FIXED_ATTEMPT.name());
GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
- Assert.assertTrue(testAppender.events.stream().anyMatch(e -> e.getRenderedMessage()
- .startsWith("Encountering memory error")));
-
+ Assert.assertTrue(testAppender.events.stream().anyMatch(e -> e.getRenderedMessage().contains("Encountering memory error")));
logger.removeAppender(testAppender);
}
/**
- * Test extractor that can be configured to raise an exception on construction
+ * Test extractor that can be configured to raise an exception on construction,
+ * or heal it self after even times of retry (constructor-attempt)
*/
public static class TestExtractor<S, D> extends InstrumentedExtractor<S, D> {
private static final String RAISE_ERROR = "raiseError";
+ private static int RETRY_COUNT = 1;
+ private static final String ENABLE_RETRY_FLIP = "enableRetry";
public TestExtractor(WorkUnitState workUnitState) {
super(workUnitState);
- if (workUnitState.getPropAsBoolean(RAISE_ERROR, false)) {
- throw new RuntimeException(EXCEPTION_MESSAGE);
+ try {
+ if (workUnitState.getPropAsBoolean(ENABLE_RETRY_FLIP, false) && RETRY_COUNT % 2 == 0) {
+ return;
+ }
+
+ if (workUnitState.getPropAsBoolean(RAISE_ERROR, false)) {
+ throw new RuntimeException(EXCEPTION_MESSAGE);
+ }
+ } finally {
+ // Need to make sure retryCount increment at the end of each constructor.
+ RETRY_COUNT += 1;
}
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
index e793bdd..97b033d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
@@ -38,7 +38,7 @@ import org.apache.gobblin.exception.NonTransientException;
* It's recommended to use with ConfigBuilder so that with State and with prefix of the config key,
* user can easily instantiate Retryer.
*
- * @see GoogleAnalyticsUnsampledExtractor for some examples.
+ * see GoogleAnalyticsUnsampledExtractor for some examples.
*
* @param <T>
*/
@@ -48,6 +48,8 @@ public class RetryerFactory<T> {
public static final String RETRY_INTERVAL_MS = "interval_ms";
public static final String RETRY_TIME_OUT_MS = "time_out_ms";
public static final String RETRY_TYPE = "retry_type";
+ // value large or equal to 1
+ public static final String RETRY_TIMES = "retry_times";
private static final Predicate<Throwable> RETRY_EXCEPTION_PREDICATE;
private static final Config DEFAULTS;
@@ -64,13 +66,15 @@ public class RetryerFactory<T> {
.put(RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(30L))
.put(RETRY_MULTIPLIER, 2L)
.put(RETRY_TYPE, RetryType.EXPONENTIAL.name())
+ .put(RETRY_TIMES, 2)
.build();
DEFAULTS = ConfigFactory.parseMap(configMap);
}
public static enum RetryType {
EXPONENTIAL,
- FIXED;
+ FIXED,
+ FIXED_ATTEMPT;
}
/**
@@ -90,6 +94,8 @@ public class RetryerFactory<T> {
return newExponentialRetryer(config);
case FIXED:
return newFixedRetryer(config);
+ case FIXED_ATTEMPT:
+ return newFixedAttemptBoundRetryer(config);
default:
throw new IllegalArgumentException(type + " is not supported");
}
@@ -104,7 +110,6 @@ public class RetryerFactory<T> {
}
private static <T> Retryer<T> newExponentialRetryer(Config config) {
-
return RetryerBuilder.<T> newBuilder()
.retryIfException(RETRY_EXCEPTION_PREDICATE)
.withWaitStrategy(WaitStrategies.exponentialWait(config.getLong(RETRY_MULTIPLIER),
@@ -113,4 +118,12 @@ public class RetryerFactory<T> {
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS), TimeUnit.MILLISECONDS))
.build();
}
+
+ private static <T> Retryer<T> newFixedAttemptBoundRetryer(Config config) {
+ return RetryerBuilder.<T> newBuilder()
+ .retryIfException(RETRY_EXCEPTION_PREDICATE)
+ .withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS), TimeUnit.MILLISECONDS))
+ .withStopStrategy(StopStrategies.stopAfterAttempt(config.getInt(RETRY_TIMES)))
+ .build();
+ }
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 6e8c72f..5b3fa7c 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -70,16 +70,16 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
services.addAll(super.getServices());
if (UserGroupInformation.isSecurityEnabled()) {
LOGGER.info("Adding YarnContainerSecurityManager since security is enabled");
- services.add(new YarnContainerSecurityManager(this.config, this.fs, this.eventBus));
+ services.add(new YarnContainerSecurityManager(this.clusterConfig, this.fs, this.eventBus));
}
- if (config.hasPath(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY)) {
+ if (clusterConfig.hasPath(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY)) {
GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
- String containerLogDir = config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
+ String containerLogDir = clusterConfig.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
if (gobblinYarnLogSource.isLogSourcePresent()) {
try {
- services.add(gobblinYarnLogSource.buildLogCopier(this.config, this.taskRunnerId, this.fs,
+ services.add(gobblinYarnLogSource.buildLogCopier(this.clusterConfig, this.taskRunnerId, this.fs,
new Path(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.applicationName, this.applicationId))));
} catch (Exception e) {
LOGGER.warn("Cannot add LogCopier service to the service manager due to {}", e);
@@ -195,7 +195,7 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
if (!Strings.isNullOrEmpty(helixInstanceTags)) {
config = config.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, ConfigValueFactory.fromAnyRef(helixInstanceTags));
}
-
+
GobblinTaskRunner gobblinTaskRunner =
new GobblinYarnTaskRunner(applicationName, helixInstanceName, containerId, config,
Optional.<Path>absent());