You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/04/28 00:15:48 UTC

[hadoop] branch branch-2.10 updated: MAPREDUCE-7277. IndexCache totalMemoryUsed differs from cache contents. Contributed by Jon Eagles (jeagles).

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 568f185  MAPREDUCE-7277. IndexCache totalMemoryUsed differs from cache contents. Contributed by Jon Eagles (jeagles).
568f185 is described below

commit 568f185bf8c27887d3f4c2559b301275d4bfa486
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Tue Apr 28 00:14:21 2020 +0000

    MAPREDUCE-7277. IndexCache totalMemoryUsed differs from cache contents. Contributed by Jon Eagles (jeagles).
---
 .../java/org/apache/hadoop/mapred/IndexCache.java  | 101 +++++++++++++--------
 .../org/apache/hadoop/mapred/TestIndexCache.java   |  60 +++++++-----
 2 files changed, 100 insertions(+), 61 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java
index c3db951..80cbcca 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java
@@ -22,17 +22,17 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class IndexCache {
 
   private final JobConf conf;
   private final int totalMemoryAllowed;
   private AtomicInteger totalMemoryUsed = new AtomicInteger();
-  private static final Log LOG = LogFactory.getLog(IndexCache.class);
+  private static final Logger LOG = LoggerFactory.getLogger(IndexCache.class);
 
   private final ConcurrentHashMap<String,IndexInformation> cache =
     new ConcurrentHashMap<String,IndexInformation>();
@@ -72,11 +72,12 @@ class IndexCache {
           try {
             info.wait();
           } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new IOException("Interrupted waiting for construction", e);
           }
         }
       }
-      LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+      LOG.debug("IndexCache HIT: MapId {} found", mapId);
     }
 
     if (info.mapSpillRecord.size() == 0 ||
@@ -106,63 +107,91 @@ class IndexCache {
           try {
             info.wait();
           } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new IOException("Interrupted waiting for construction", e);
           }
         }
       }
-      LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+      LOG.debug("IndexCache HIT: MapId {} found", mapId);
       return info;
     }
-    LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
+    LOG.debug("IndexCache MISS: MapId {} not found", mapId);
     SpillRecord tmp = null;
+    boolean success = false;
     try { 
       tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner);
-    } catch (Throwable e) { 
+      success = true;
+    } catch (Throwable e) {
       tmp = new SpillRecord(0);
       cache.remove(mapId);
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       throw new IOException("Error Reading IndexFile", e);
-    } finally { 
-      synchronized (newInd) { 
+    } finally {
+      synchronized (newInd) {
         newInd.mapSpillRecord = tmp;
+        if (success) {
+          // Only add mapId to the queue for successful read and after added to
+          // the cache. Once in the queue, it is now eligible for removal once
+          // construction is finished.
+          queue.add(mapId);
+          if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
+            freeIndexInformation();
+          }
+        }
         newInd.notifyAll();
       } 
     } 
-    queue.add(mapId);
-    
-    if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
-      freeIndexInformation();
-    }
+
     return newInd;
   }
 
   /**
-   * This method removes the map from the cache if index information for this
-   * map is loaded(size>0), index information entry in cache will not be 
-   * removed if it is in the loading phrase(size=0), this prevents corruption  
-   * of totalMemoryUsed. It should be called when a map output on this tracker 
-   * is discarded.
+   * This method removes the map from the cache if it is present in the queue.
    * @param mapId The taskID of this map.
    */
-  public void removeMap(String mapId) {
-    IndexInformation info = cache.get(mapId);
-    if (info == null || isUnderConstruction(info)) {
+  public void removeMap(String mapId) throws IOException {
+    // Successfully removing the mapId from the queue enters into a contract
+    // that this thread will remove the corresponding mapId from the cache.
+    if (!queue.remove(mapId)) {
+      LOG.debug("Map ID {} not found in queue", mapId);
       return;
     }
-    info = cache.remove(mapId);
-    if (info != null) {
-      totalMemoryUsed.addAndGet(-info.getSize());
-      if (!queue.remove(mapId)) {
-        LOG.warn("Map ID" + mapId + " not found in queue!!");
+    removeMapInternal(mapId);
+  }
+
+  /** This method should only be called upon successful removal of mapId from
+   * the queue. The mapId will be removed from the cache and totalUsedMemory
+   * will be decremented.
+   * @param mapId the cache item to be removed
+   * @throws IOException
+   */
+  private void removeMapInternal(String mapId) throws IOException {
+    IndexInformation info = cache.remove(mapId);
+    if (info == null) {
+      // Inconsistent state as presence in queue implies presence in cache
+      LOG.warn("Map ID " + mapId + " not found in cache");
+      return;
+    }
+    try {
+      synchronized(info) {
+        while (isUnderConstruction(info)) {
+          info.wait();
+        }
+        totalMemoryUsed.getAndAdd(-info.getSize());
       }
-    } else {
-      LOG.info("Map ID " + mapId + " not found in cache");
+    } catch (InterruptedException e) {
+      totalMemoryUsed.getAndAdd(-info.getSize());
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted waiting for construction", e);
     }
   }
 
   /**
-   * This method checks if cache and totolMemoryUsed is consistent.
+   * This method checks if cache and totalMemoryUsed is consistent.
    * It is only used for unit test.
-   * @return True if cache and totolMemoryUsed is consistent
+   * @return True if cache and totalMemoryUsed is consistent
    */
   boolean checkTotalMemoryUsed() {
     int totalSize = 0;
@@ -175,13 +204,13 @@ class IndexCache {
   /**
    * Bring memory usage below totalMemoryAllowed.
    */
-  private synchronized void freeIndexInformation() {
+  private synchronized void freeIndexInformation() throws IOException {
     while (totalMemoryUsed.get() > totalMemoryAllowed) {
-      String s = queue.remove();
-      IndexInformation info = cache.remove(s);
-      if (info != null) {
-        totalMemoryUsed.addAndGet(-info.getSize());
+      if(queue.isEmpty()) {
+        break;
       }
+      String mapId = queue.remove();
+      removeMapInternal(mapId);
     }
   }
 
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java
index b6a2df0..ae97d97 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.CRC32;
 import java.util.zip.CheckedOutputStream;
 
@@ -210,23 +211,32 @@ public class TestIndexCache extends TestCase {
     final String user = 
       UserGroupInformation.getCurrentUser().getShortUserName();
     writeFile(fs, big, bytesPerFile, partsPerMap);
-    
+
+    // Capture if any runtime exception occurred
+    final AtomicBoolean failed = new AtomicBoolean();
+
     // run multiple times
     for (int i = 0; i < 20; ++i) {
       Thread getInfoThread = new Thread() {
         @Override
         public void run() {
           try {
-            cache.getIndexInformation("bigIndex", partsPerMap, big, user);
+            cache.getIndexInformation("bigIndex", 0, big, user);
           } catch (Exception e) {
             // should not be here
+            failed.set(true);
           }
         }
       };
       Thread removeMapThread = new Thread() {
         @Override
         public void run() {
-          cache.removeMap("bigIndex");
+          try {
+            cache.removeMap("bigIndex");
+          } catch (Exception e) {
+            // should not be here
+            failed.set(true);
+          }
         }
       };
       if (i%2==0) {
@@ -238,8 +248,9 @@ public class TestIndexCache extends TestCase {
       }
       getInfoThread.join();
       removeMapThread.join();
-      assertEquals(true, cache.checkTotalMemoryUsed());
-    }      
+      assertFalse("An unexpected exception", failed.get());
+      assertTrue(cache.checkTotalMemoryUsed());
+    }
   }
   
   public void testCreateRace() throws Exception {
@@ -254,6 +265,9 @@ public class TestIndexCache extends TestCase {
       UserGroupInformation.getCurrentUser().getShortUserName();
     writeFile(fs, racy, bytesPerFile, partsPerMap);
 
+    // Capture if any runtime exception occurred
+    final AtomicBoolean failed = new AtomicBoolean();
+
     // run multiple instances
     Thread[] getInfoThreads = new Thread[50];
     for (int i = 0; i < 50; i++) {
@@ -261,10 +275,15 @@ public class TestIndexCache extends TestCase {
         @Override
         public void run() {
           try {
-            cache.getIndexInformation("racyIndex", partsPerMap, racy, user);
-            cache.removeMap("racyIndex");
+            while (!Thread.currentThread().isInterrupted()) {
+              cache.getIndexInformation("racyIndex", 0, racy, user);
+              cache.removeMap("racyIndex");
+            }
           } catch (Exception e) {
-            // should not be here
+            if (!Thread.currentThread().isInterrupted()) {
+              // should not be here
+              failed.set(true);
+            }
           }
         }
       };
@@ -274,21 +293,13 @@ public class TestIndexCache extends TestCase {
       getInfoThreads[i].start();
     }
 
-    final Thread mainTestThread = Thread.currentThread();
-
-    Thread timeoutThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          Thread.sleep(15000);
-          mainTestThread.interrupt();
-        } catch (InterruptedException ie) {
-          // we are done;
-        }
-      }
-    };
+    // The duration to keep the threads testing
+    Thread.sleep(5000);
 
     for (int i = 0; i < 50; i++) {
+      getInfoThreads[i].interrupt();
+    }
+    for (int i = 0; i < 50; i++) {
       try {
         getInfoThreads[i].join();
       } catch (InterruptedException ie) {
@@ -296,10 +307,9 @@ public class TestIndexCache extends TestCase {
         fail("Unexpectedly long delay during concurrent cache entry creations");
       }
     }
-    // stop the timeoutThread. If we get interrupted before stopping, there
-    // must be something wrong, although it wasn't a deadlock. No need to
-    // catch and swallow.
-    timeoutThread.interrupt();
+    assertFalse("An unexpected exception", failed.get());
+    assertTrue("Total memory used does not represent contents of the cache",
+        cache.checkTotalMemoryUsed());
   }
 
   private static void checkRecord(IndexRecord rec, long fill) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org