You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/06 00:08:14 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-940] Add
synchronization on workunit persistency before Helix job launching
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bf33b58 [GOBBLIN-940] Add synchronization on workunit persistency before Helix job launching
bf33b58 is described below
commit bf33b58d450d1f35e5f5cc147159710815210cd0
Author: autumnust <le...@linkedin.com>
AuthorDate: Tue Nov 5 16:08:04 2019 -0800
[GOBBLIN-940] Add synchronization on workunit persistency before Helix job launching
Closes #2789 from
autumnust/synchronizedBeforeHelixSubmission
---
.../gobblin/cluster/GobblinHelixJobLauncher.java | 29 +++++++----
gobblin-utility/build.gradle | 19 +++-----
.../org/apache/gobblin/util/ParallelRunner.java | 20 +++++++-
.../apache/gobblin/util/ParallelRunnerTest.java | 57 +++++++++++++++-------
4 files changed, 85 insertions(+), 40 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index e820c1f..975037c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -219,7 +219,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
if (helixMetrics.isPresent()) {
helixMetrics.get().submitMeter.mark();
}
- submitJobToHelix(createJob(workUnits));
+ submitJobToHelix(createHelixJob(workUnits));
if (helixMetrics.isPresent()) {
this.helixMetrics.get().updateTimeForHelixSubmit(submitStart);
}
@@ -271,7 +271,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
/**
* Create a job from a given batch of {@link WorkUnit}s.
*/
- private JobConfig.Builder createJob(List<WorkUnit> workUnits) throws IOException {
+ JobConfig.Builder createHelixJob(List<WorkUnit> workUnits) throws IOException {
Map<String, TaskConfig> taskConfigMap = Maps.newHashMap();
try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
@@ -295,23 +295,34 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
SerializationUtils.serializeState(this.fs, jobStateFilePath, this.jobContext.getJobState());
}
- LOGGER.debug("GobblinHelixJobLauncher.createJob: jobStateFilePath {}, jobState {} jobProperties {}",
+ // Block on persistence of all workunits to be finished.
+ // It is necessary when underlying storage being slow and Helix activate task-execution before the workunit being persisted.
+ stateSerDeRunner.waitForTasks();
+
+ LOGGER.debug("GobblinHelixJobLauncher.createHelixJob: jobStateFilePath {}, jobState {} jobProperties {}",
jobStateFilePath, this.jobContext.getJobState().toString(), this.jobContext.getJobState().getProperties());
+
+ return translateGobblinJobConfigToHelixJobConfig(this.jobContext.getJobState(), workUnits, taskConfigMap);
}
+ }
+ /**
+ * Populate {@link JobConfig.Builder} with relevant gobblin job-configurations.
+ */
+ JobConfig.Builder translateGobblinJobConfigToHelixJobConfig(JobState gobblinJobState, List<WorkUnit> workUnits,
+ Map<String, TaskConfig> taskConfigMap) {
JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
// Helix task attempts = retries + 1 (fallback to general task retry for backward compatibility)
- jobConfigBuilder.setMaxAttemptsPerTask(this.jobContext.getJobState().getPropAsInt(
- GobblinClusterConfigurationKeys.HELIX_TASK_MAX_ATTEMPTS_KEY,
- this.jobContext.getJobState().getPropAsInt(
+ jobConfigBuilder.setMaxAttemptsPerTask(gobblinJobState.getPropAsInt(
+ GobblinClusterConfigurationKeys.HELIX_TASK_MAX_ATTEMPTS_KEY, gobblinJobState.getPropAsInt(
ConfigurationKeys.MAX_TASK_RETRIES_KEY,
ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)) + 1);
// Helix task timeout (fallback to general task timeout for backward compatibility)
- jobConfigBuilder.setTimeoutPerTask(this.jobContext.getJobState().getPropAsLong(
+ jobConfigBuilder.setTimeoutPerTask(gobblinJobState.getPropAsLong(
GobblinClusterConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
- this.jobContext.getJobState().getPropAsLong(
+ gobblinJobState.getPropAsLong(
ConfigurationKeys.TASK_TIMEOUT_SECONDS,
ConfigurationKeys.DEFAULT_TASK_TIMEOUT_SECONDS)) * 1000);
@@ -337,7 +348,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
jobConfigBuilder.setRebalanceRunningTask(true);
}
- jobConfigBuilder.setExpiry(this.jobContext.getJobState().getPropAsLong(
+ jobConfigBuilder.setExpiry(gobblinJobState.getPropAsLong(
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
return jobConfigBuilder;
diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle
index ad49820..485641b 100644
--- a/gobblin-utility/build.gradle
+++ b/gobblin-utility/build.gradle
@@ -59,20 +59,17 @@ dependencies {
}
configurations {
- compile {
- transitive = true
- }
- archives
+ compile {
+ transitive = true
+ }
+ archives
}
test {
- workingDir rootProject.rootDir
+ workingDir rootProject.rootDir
}
-
-
-ext.classification="library"
-
+ext.classification = "library"
task utilityTar(type: Tar) {
//there seems to be a bug in the Gradle signing module where X.tar.gz will generate
@@ -83,8 +80,8 @@ task utilityTar(type: Tar) {
compression = Compression.GZIP
into("lib") { from configurations.runtime }
- into("lib") { from "${project.rootDir}/build/${project.name}/libs/${project.name}.jar"}
- into("lib") { from "${project.rootDir}/build/${project.name}/libs/${project.name}-${project.version}.jar"}
+ into("lib") { from "${project.rootDir}/build/${project.name}/libs/${project.name}.jar" }
+ into("lib") { from "${project.rootDir}/build/${project.name}/libs/${project.name}-${project.version}.jar" }
into("bin") {
from "src/main/bash"
fileMode = 0755
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
index e2f4964..04cc21a 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.conf.Configuration;
@@ -81,6 +83,8 @@ public class ParallelRunner implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelRunner.class);
+ private static final long DEFAULT_PARALLEL_RUNNER_WAIT_ON_FINISH_TIMEOUT = 10000;
+
public static final String PARALLEL_RUNNER_THREADS_KEY = "parallel.runner.threads";
public static final int DEFAULT_PARALLEL_RUNNER_THREADS = 10;
@@ -354,7 +358,7 @@ public class ParallelRunner implements Closeable {
* Wait for all submitted tasks to complete. The {@link ParallelRunner} can be reused after this call.
* @throws IOException
*/
- public void waitForTasks() throws IOException {
+ public void waitForTasks(long timeoutInMills) throws IOException {
// Wait for all submitted tasks to complete
boolean wasInterrupted = false;
IOException exception = null;
@@ -363,7 +367,7 @@ public class ParallelRunner implements Closeable {
if (wasInterrupted) {
future.getFuture().cancel(true);
} else {
- future.getFuture().get();
+ future.getFuture().get(timeoutInMills, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException ie) {
LOGGER.warn("Task was interrupted: " + future.getName());
@@ -376,6 +380,11 @@ public class ParallelRunner implements Closeable {
if (exception == null) {
exception = new IOException(ee.getCause());
}
+ } catch (TimeoutException te) {
+ LOGGER.warn("Tasks not fully finished before Parallel runner waiting until timeout due to:", te);
+ if (exception == null) {
+ exception = new IOException(te.getCause());
+ }
}
}
if (wasInterrupted) {
@@ -389,6 +398,13 @@ public class ParallelRunner implements Closeable {
futures.clear();
}
+ /**
+ * Wait until default timeout reached for all tasks under this parallel runner.
+ */
+ public void waitForTasks() throws IOException{
+ this.waitForTasks(DEFAULT_PARALLEL_RUNNER_WAIT_ON_FINISH_TIMEOUT);
+ }
+
@Override
public void close() throws IOException {
try {
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/ParallelRunnerTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/ParallelRunnerTest.java
index d2cd06e..6e11944 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/ParallelRunnerTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/ParallelRunnerTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -191,9 +192,18 @@ public class ParallelRunnerTest {
Assert.assertEquals(actual.toString(), expected);
}
- @Test(groups = { "ignore" })
+ /**
+ * A test verifying when {@link ParallelRunner#close()} is called, everything inside runner has been collected.
+ * Positive case: Both files get mock-deleted and parallelRunner returns.
+ * Negative case: Using countdown-latch to specify causality between two deletion of files and construct a deadlock.
+ * The src2 deletion won't happen as it depends on src1's deletion and vice versa.
+ * {@link ParallelRunner#close()} will finally timeout and expect to catch the {@link java.util.concurrent.TimeoutException}.
+ */
public void testWaitsForFuturesWhenClosing() throws IOException, InterruptedException {
+
+ // Indicate if deadlock is being constructed.
final AtomicBoolean flag = new AtomicBoolean();
+ flag.set(true);
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
Path src1 = new Path("/src/file1.txt");
@@ -203,23 +213,23 @@ public class ParallelRunnerTest {
Mockito.when(fs.delete(src1, true)).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ if (flag.get()) {
+ latch2.await();
+ }
latch1.countDown();
- return false;
+ return true;
}
});
Mockito.when(fs.exists(src2)).thenReturn(true);
+
+ /** Will make deletion of files from parallelRunner to be timeout after 5 seconds.*/
+ final int timeout = 5000;
Mockito.when(fs.delete(src2, true)).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
- latch1.await();
- long end = System.currentTimeMillis() + 70000;
- while (System.currentTimeMillis() < end) {
- try {
- Thread.sleep(Math.max(1, end - System.currentTimeMillis()));
- } catch (Exception ignored) {
- }
+ if (flag.get()) {
+ latch1.await();
}
- flag.set(true);
latch2.countDown();
return true;
}
@@ -228,19 +238,30 @@ public class ParallelRunnerTest {
boolean caughtException = false;
ParallelRunner parallelRunner = new ParallelRunner(2, fs);
try {
- parallelRunner.deletePath(src1, true);
parallelRunner.deletePath(src2, true);
- System.out.println(System.currentTimeMillis() + ": START - ParallelRunner.close()");
- parallelRunner.close();
+ parallelRunner.deletePath(src1, true);
+ parallelRunner.waitForTasks(timeout);
} catch (IOException e) {
caughtException = true;
}
Assert.assertTrue(caughtException);
- System.out.println(System.currentTimeMillis() + ": END - ParallelRunner.close()");
- System.out.println(System.currentTimeMillis() + ": Waiting for unkillable task to finish...");
- latch2.await();
- System.out.println(System.currentTimeMillis() + ": Unkillable task completed.");
- Assert.assertTrue(flag.get());
+ Assert.assertEquals(latch2.getCount(), 1);
+ Assert.assertEquals(latch1.getCount(), 1);
+
+ // Remove deadlock
+ flag.set(false);
+ caughtException = false;
+ parallelRunner = new ParallelRunner(2, fs);
+ try {
+ parallelRunner.deletePath(src2, true);
+ parallelRunner.deletePath(src1, true);
+ parallelRunner.waitForTasks(timeout);
+ } catch (IOException e) {
+ caughtException = true;
+ }
+ Assert.assertFalse(caughtException);
+ Assert.assertEquals(latch2.getCount(), 0);
+ Assert.assertEquals(latch1.getCount(), 0);
}
@AfterClass