You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2018/01/24 20:45:08 UTC

hadoop git commit: MAPREDUCE-7015. Possible race condition in JHS if the job is not loaded. Contributed by Peter Bacsko

Repository: hadoop
Updated Branches:
  refs/heads/trunk 55c32776b -> cff9edd4b


MAPREDUCE-7015. Possible race condition in JHS if the job is not loaded. Contributed by Peter Bacsko


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cff9edd4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cff9edd4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cff9edd4

Branch: refs/heads/trunk
Commit: cff9edd4b514bdcfe22cd49964e3707fb78ab876
Parents: 55c3277
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jan 24 14:44:07 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jan 24 14:44:07 2018 -0600

----------------------------------------------------------------------
 .../mapreduce/v2/hs/CachedHistoryStorage.java   |  8 +++++-
 .../mapreduce/v2/hs/HistoryFileManager.java     | 30 ++++++++++++++++----
 .../hadoop/mapreduce/v2/hs/TestJobHistory.java  | 26 +++++++++++++++++
 3 files changed, 57 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cff9edd4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
index b001ae4..69f4831 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
@@ -173,9 +173,14 @@ public class CachedHistoryStorage extends AbstractService implements
     HistoryFileInfo fileInfo;
 
     fileInfo = hsManager.getFileInfo(jobId);
+
     if (fileInfo == null) {
       throw new HSFileRuntimeException("Unable to find job " + jobId);
-    } else if (fileInfo.isDeleted()) {
+    }
+
+    fileInfo.waitUntilMoved();
+
+    if (fileInfo.isDeleted()) {
       throw new HSFileRuntimeException("Cannot load deleted job " + jobId);
     } else {
       return fileInfo.loadJob();
@@ -211,6 +216,7 @@ public class CachedHistoryStorage extends AbstractService implements
       for (HistoryFileInfo mi : hsManager.getAllFileInfo()) {
         if (mi != null) {
           JobId id = mi.getJobId();
+          mi.waitUntilMoved();
           result.put(id, new PartialJob(mi.getJobIndexInfo(), id));
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cff9edd4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
index b418db7..a07ca26 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
@@ -452,6 +452,8 @@ public class HistoryFileManager extends AbstractService {
       } catch (Throwable t) {
         LOG.error("Error while trying to move a job to done", t);
         this.state = HistoryInfoState.MOVE_FAILED;
+      } finally {
+        notifyAll();
       }
     }
 
@@ -485,12 +487,16 @@ public class HistoryFileManager extends AbstractService {
     }
     
     protected synchronized void delete() throws IOException {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("deleting " + historyFile + " and " + confFile);
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("deleting " + historyFile + " and " + confFile);
+        }
+        state = HistoryInfoState.DELETED;
+        doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
+        doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
+      } finally {
+        notifyAll();
       }
-      state = HistoryInfoState.DELETED;
-      doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
-      doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
     }
 
     public JobIndexInfo getJobIndexInfo() {
@@ -517,6 +523,17 @@ public class HistoryFileManager extends AbstractService {
           jobIndexInfo.getNumMaps();
       return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob);
     }
+
+    public synchronized void waitUntilMoved() {
+      while (isMovePending() && !didMoveFail()) {
+        try {
+          wait();
+        } catch (InterruptedException e) {
+          LOG.warn("Waiting has been interrupted");
+          throw new RuntimeException(e);
+        }
+      }
+    }
   }
 
   private SerialNumberIndex serialNumberIndex = null;
@@ -956,6 +973,7 @@ public class HistoryFileManager extends AbstractService {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Scheduling move to done of " +found);
           }
+
           moveToDoneExecutor.execute(new Runnable() {
             @Override
             public void run() {
@@ -1193,5 +1211,5 @@ public class HistoryFileManager extends AbstractService {
   @VisibleForTesting
   void setMaxHistoryAge(long newValue){
     maxHistoryAge=newValue;
-  } 
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cff9edd4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
index 936c772..9f36477 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
@@ -446,6 +446,32 @@ public class TestJobHistory {
   }
 
   @Test
+  public void testCachedStorageWaitsForFileMove() throws IOException {
+    HistoryFileManager historyManager = mock(HistoryFileManager.class);
+    jobHistory = spy(new JobHistory());
+    doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+
+    Configuration conf = new Configuration();
+    jobHistory.init(conf);
+    jobHistory.start();
+
+    CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+        .getHistoryStorage());
+
+    Job job  = mock(Job.class);
+    JobId jobId  = mock(JobId.class);
+    when(job.getID()).thenReturn(jobId);
+    when(job.getTotalMaps()).thenReturn(10);
+    when(job.getTotalReduces()).thenReturn(2);
+    HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
+    when(historyManager.getFileInfo(eq(jobId))).thenReturn(fileInfo);
+    when(fileInfo.loadJob()).thenReturn(job);
+
+    storage.getFullJob(jobId);
+    verify(fileInfo).waitUntilMoved();
+  }
+
+  @Test
   public void testRefreshLoadedJobCacheUnSupportedOperation() {
     jobHistory = spy(new JobHistory());
     HistoryStorage storage = new HistoryStorage() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org