You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/24 22:48:14 UTC
[13/42] hadoop git commit: MAPREDUCE-7015. Possible race condition in
JHS if the job is not loaded. Contributed by Peter Bacsko
MAPREDUCE-7015. Possible race condition in JHS if the job is not loaded. Contributed by Peter Bacsko
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cff9edd4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cff9edd4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cff9edd4
Branch: refs/heads/YARN-6592
Commit: cff9edd4b514bdcfe22cd49964e3707fb78ab876
Parents: 55c3277
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jan 24 14:44:07 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jan 24 14:44:07 2018 -0600
----------------------------------------------------------------------
.../mapreduce/v2/hs/CachedHistoryStorage.java | 8 +++++-
.../mapreduce/v2/hs/HistoryFileManager.java | 30 ++++++++++++++++----
.../hadoop/mapreduce/v2/hs/TestJobHistory.java | 26 +++++++++++++++++
3 files changed, 57 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cff9edd4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
index b001ae4..69f4831 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
@@ -173,9 +173,14 @@ public class CachedHistoryStorage extends AbstractService implements
HistoryFileInfo fileInfo;
fileInfo = hsManager.getFileInfo(jobId);
+
if (fileInfo == null) {
throw new HSFileRuntimeException("Unable to find job " + jobId);
- } else if (fileInfo.isDeleted()) {
+ }
+
+ fileInfo.waitUntilMoved();
+
+ if (fileInfo.isDeleted()) {
throw new HSFileRuntimeException("Cannot load deleted job " + jobId);
} else {
return fileInfo.loadJob();
@@ -211,6 +216,7 @@ public class CachedHistoryStorage extends AbstractService implements
for (HistoryFileInfo mi : hsManager.getAllFileInfo()) {
if (mi != null) {
JobId id = mi.getJobId();
+ mi.waitUntilMoved();
result.put(id, new PartialJob(mi.getJobIndexInfo(), id));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cff9edd4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
index b418db7..a07ca26 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
@@ -452,6 +452,8 @@ public class HistoryFileManager extends AbstractService {
} catch (Throwable t) {
LOG.error("Error while trying to move a job to done", t);
this.state = HistoryInfoState.MOVE_FAILED;
+ } finally {
+ notifyAll();
}
}
@@ -485,12 +487,16 @@ public class HistoryFileManager extends AbstractService {
}
protected synchronized void delete() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("deleting " + historyFile + " and " + confFile);
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("deleting " + historyFile + " and " + confFile);
+ }
+ state = HistoryInfoState.DELETED;
+ doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
+ doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
+ } finally {
+ notifyAll();
}
- state = HistoryInfoState.DELETED;
- doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
- doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
}
public JobIndexInfo getJobIndexInfo() {
@@ -517,6 +523,17 @@ public class HistoryFileManager extends AbstractService {
jobIndexInfo.getNumMaps();
return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob);
}
+
+ public synchronized void waitUntilMoved() {
+ while (isMovePending() && !didMoveFail()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ LOG.warn("Waiting has been interrupted");
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
private SerialNumberIndex serialNumberIndex = null;
@@ -956,6 +973,7 @@ public class HistoryFileManager extends AbstractService {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling move to done of " +found);
}
+
moveToDoneExecutor.execute(new Runnable() {
@Override
public void run() {
@@ -1193,5 +1211,5 @@ public class HistoryFileManager extends AbstractService {
@VisibleForTesting
void setMaxHistoryAge(long newValue){
maxHistoryAge=newValue;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cff9edd4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
index 936c772..9f36477 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
@@ -446,6 +446,32 @@ public class TestJobHistory {
}
@Test
+ public void testCachedStorageWaitsForFileMove() throws IOException {
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
+ jobHistory = spy(new JobHistory());
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+
+ Configuration conf = new Configuration();
+ jobHistory.init(conf);
+ jobHistory.start();
+
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+ .getHistoryStorage());
+
+ Job job = mock(Job.class);
+ JobId jobId = mock(JobId.class);
+ when(job.getID()).thenReturn(jobId);
+ when(job.getTotalMaps()).thenReturn(10);
+ when(job.getTotalReduces()).thenReturn(2);
+ HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
+ when(historyManager.getFileInfo(eq(jobId))).thenReturn(fileInfo);
+ when(fileInfo.loadJob()).thenReturn(job);
+
+ storage.getFullJob(jobId);
+ verify(fileInfo).waitUntilMoved();
+ }
+
+ @Test
public void testRefreshLoadedJobCacheUnSupportedOperation() {
jobHistory = spy(new JobHistory());
HistoryStorage storage = new HistoryStorage() {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org