You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/12/15 19:36:33 UTC

[42/50] [abbrv] hadoop git commit: HADOOP-11238. Update the NameNode's Group Cache in the background when possible (Chris Li via Colin P. McCabe)

HADOOP-11238. Update the NameNode's Group Cache in the background when possible (Chris Li via Colin P. McCabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e5a69251
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e5a69251
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e5a69251

Branch: refs/heads/YARN-2139
Commit: e5a692519956aefb3a540ed0137b63cf598ac10d
Parents: c78e3a7
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Fri Dec 12 16:30:52 2014 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Fri Dec 12 16:30:52 2014 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../java/org/apache/hadoop/security/Groups.java | 193 ++++++++-------
 .../hadoop/security/TestGroupsCaching.java      | 236 +++++++++++++++++++
 3 files changed, 342 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5a69251/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 45f226f..1e59395 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -437,6 +437,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11323. WritableComparator#compare keeps reference to byte array.
     (Wilfred Spiegelenburg via wang)
 
+    HADOOP-11238. Update the NameNode's Group Cache in the background when
+    possible (Chris Li via Colin P. McCabe)
+
   BUG FIXES
 
     HADOOP-11236. NFS: Fix javadoc warning in RpcProgram.java (Abhiraj Butala via harsh)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5a69251/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java
index c500419..f3c5094 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java
@@ -24,7 +24,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Ticker;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -52,10 +58,11 @@ public class Groups {
   private static final Log LOG = LogFactory.getLog(Groups.class);
   
   private final GroupMappingServiceProvider impl;
-  
-  private final Map<String, CachedGroups> userToGroupsMap = 
-    new ConcurrentHashMap<String, CachedGroups>();
-  private final Map<String, List<String>> staticUserToGroupsMap = 
+
+  private final LoadingCache<String, List<String>> cache;
+  private final ConcurrentHashMap<String, Long> negativeCacheMask =
+    new ConcurrentHashMap<String, Long>();
+  private final Map<String, List<String>> staticUserToGroupsMap =
       new HashMap<String, List<String>>();
   private final long cacheTimeout;
   private final long negativeCacheTimeout;
@@ -66,7 +73,7 @@ public class Groups {
     this(conf, new Timer());
   }
 
-  public Groups(Configuration conf, Timer timer) {
+  public Groups(Configuration conf, final Timer timer) {
     impl = 
       ReflectionUtils.newInstance(
           conf.getClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, 
@@ -86,6 +93,11 @@ public class Groups {
     parseStaticMapping(conf);
 
     this.timer = timer;
+    this.cache = CacheBuilder.newBuilder()
+      .refreshAfterWrite(cacheTimeout, TimeUnit.MILLISECONDS)
+      .ticker(new TimerToTickerAdapter(timer))
+      .expireAfterWrite(10 * cacheTimeout, TimeUnit.MILLISECONDS)
+      .build(new GroupCacheLoader());
 
     if(LOG.isDebugEnabled())
       LOG.debug("Group mapping impl=" + impl.getClass().getName() + 
@@ -123,78 +135,112 @@ public class Groups {
     }
   }
 
-  /**
-   * Determine whether the CachedGroups is expired.
-   * @param groups cached groups for one user.
-   * @return true if groups is expired from useToGroupsMap.
-   */
-  private boolean hasExpired(CachedGroups groups, long startMs) {
-    if (groups == null) {
-      return true;
-    }
-    long timeout = cacheTimeout;
-    if (isNegativeCacheEnabled() && groups.getGroups().isEmpty()) {
-      // This CachedGroups is in the negative cache, thus it should expire
-      // sooner.
-      timeout = negativeCacheTimeout;
-    }
-    return groups.getTimestamp() + timeout <= startMs;
-  }
-  
   private boolean isNegativeCacheEnabled() {
     return negativeCacheTimeout > 0;
   }
 
+  private IOException noGroupsForUser(String user) {
+    return new IOException("No groups found for user " + user);
+  }
+
   /**
    * Get the group memberships of a given user.
+   * If the user's group is not cached, this method may block.
    * @param user User's name
    * @return the group memberships of the user
-   * @throws IOException
+   * @throws IOException if user does not exist
    */
-  public List<String> getGroups(String user) throws IOException {
+  public List<String> getGroups(final String user) throws IOException {
     // No need to lookup for groups of static users
     List<String> staticMapping = staticUserToGroupsMap.get(user);
     if (staticMapping != null) {
       return staticMapping;
     }
-    // Return cached value if available
-    CachedGroups groups = userToGroupsMap.get(user);
-    long startMs = timer.monotonicNow();
-    if (!hasExpired(groups, startMs)) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Returning cached groups for '" + user + "'");
-      }
-      if (groups.getGroups().isEmpty()) {
-        // Even with enabling negative cache, getGroups() has the same behavior
-        // that throws IOException if the groups for the user is empty.
-        throw new IOException("No groups found for user " + user);
+
+    // Check the negative cache first
+    if (isNegativeCacheEnabled()) {
+      Long expirationTime = negativeCacheMask.get(user);
+      if (expirationTime != null) {
+        if (timer.monotonicNow() < expirationTime) {
+          throw noGroupsForUser(user);
+        } else {
+          negativeCacheMask.remove(user, expirationTime);
+        }
       }
-      return groups.getGroups();
     }
 
-    // Create and cache user's groups
-    List<String> groupList = impl.getGroups(user);
-    long endMs = timer.monotonicNow();
-    long deltaMs = endMs - startMs ;
-    UserGroupInformation.metrics.addGetGroups(deltaMs);
-    if (deltaMs > warningDeltaMs) {
-      LOG.warn("Potential performance problem: getGroups(user=" + user +") " +
-          "took " + deltaMs + " milliseconds.");
+    try {
+      return cache.get(user);
+    } catch (ExecutionException e) {
+      throw (IOException)e.getCause();
     }
-    groups = new CachedGroups(groupList, endMs);
-    if (groups.getGroups().isEmpty()) {
-      if (isNegativeCacheEnabled()) {
-        userToGroupsMap.put(user, groups);
+  }
+
+  /**
+   * Convert millisecond times from hadoop's timer to guava's nanosecond ticker.
+   */
+  private static class TimerToTickerAdapter extends Ticker {
+    private Timer timer;
+
+    public TimerToTickerAdapter(Timer timer) {
+      this.timer = timer;
+    }
+
+    @Override
+    public long read() {
+      final long NANOSECONDS_PER_MS = 1000000;
+      return timer.monotonicNow() * NANOSECONDS_PER_MS;
+    }
+  }
+
+  /**
+   * Deals with loading data into the cache.
+   */
+  private class GroupCacheLoader extends CacheLoader<String, List<String>> {
+    /**
+     * This method will block if a cache entry doesn't exist, and
+     * any subsequent requests for the same user will wait on this
+     * request to return. If a user already exists in the cache,
+     * this will be run in the background.
+     * @param user key of cache
+     * @return List of groups belonging to user
+     * @throws IOException to prevent caching negative entries
+     */
+    @Override
+    public List<String> load(String user) throws Exception {
+      List<String> groups = fetchGroupList(user);
+
+      if (groups.isEmpty()) {
+        if (isNegativeCacheEnabled()) {
+          long expirationTime = timer.monotonicNow() + negativeCacheTimeout;
+          negativeCacheMask.put(user, expirationTime);
+        }
+
+        // We throw here to prevent Cache from retaining an empty group
+        throw noGroupsForUser(user);
       }
-      throw new IOException("No groups found for user " + user);
+
+      return groups;
     }
-    userToGroupsMap.put(user, groups);
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Returning fetched groups for '" + user + "'");
+
+    /**
+     * Queries impl for groups belonging to the user. This could involve I/O and take awhile.
+     */
+    private List<String> fetchGroupList(String user) throws IOException {
+      long startMs = timer.monotonicNow();
+      List<String> groupList = impl.getGroups(user);
+      long endMs = timer.monotonicNow();
+      long deltaMs = endMs - startMs ;
+      UserGroupInformation.metrics.addGetGroups(deltaMs);
+      if (deltaMs > warningDeltaMs) {
+        LOG.warn("Potential performance problem: getGroups(user=" + user +") " +
+          "took " + deltaMs + " milliseconds.");
+      }
+
+      return groupList;
     }
-    return groups.getGroups();
   }
-  
+
   /**
    * Refresh all user-to-groups mappings.
    */
@@ -205,7 +251,8 @@ public class Groups {
     } catch (IOException e) {
       LOG.warn("Error refreshing groups cache", e);
     }
-    userToGroupsMap.clear();
+    cache.invalidateAll();
+    negativeCacheMask.clear();
   }
 
   /**
@@ -221,40 +268,6 @@ public class Groups {
     }
   }
 
-  /**
-   * Class to hold the cached groups
-   */
-  private static class CachedGroups {
-    final long timestamp;
-    final List<String> groups;
-    
-    /**
-     * Create and initialize group cache
-     */
-    CachedGroups(List<String> groups, long timestamp) {
-      this.groups = groups;
-      this.timestamp = timestamp;
-    }
-
-    /**
-     * Returns time of last cache update
-     *
-     * @return time of last cache update
-     */
-    public long getTimestamp() {
-      return timestamp;
-    }
-
-    /**
-     * Get list of cached groups
-     *
-     * @return cached groups
-     */
-    public List<String> getGroups() {
-      return groups;
-    }
-  }
-
   private static Groups GROUPS = null;
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5a69251/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java
index a814b0d..89e5b2d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java
@@ -51,6 +51,9 @@ public class TestGroupsCaching {
 
   @Before
   public void setup() {
+    FakeGroupMapping.resetRequestCount();
+    ExceptionalGroupMapping.resetRequestCount();
+
     conf = new Configuration();
     conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
       FakeGroupMapping.class,
@@ -61,16 +64,32 @@ public class TestGroupsCaching {
     // any to n mapping
     private static Set<String> allGroups = new HashSet<String>();
     private static Set<String> blackList = new HashSet<String>();
+    private static int requestCount = 0;
+    private static long getGroupsDelayMs = 0;
 
     @Override
     public List<String> getGroups(String user) throws IOException {
       LOG.info("Getting groups for " + user);
+      requestCount++;
+
+      delayIfNecessary();
+
       if (blackList.contains(user)) {
         return new LinkedList<String>();
       }
       return new LinkedList<String>(allGroups);
     }
 
+    private void delayIfNecessary() {
+      if (getGroupsDelayMs > 0) {
+        try {
+          Thread.sleep(getGroupsDelayMs);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
     @Override
     public void cacheGroupsRefresh() throws IOException {
       LOG.info("Cache is being refreshed.");
@@ -93,6 +112,36 @@ public class TestGroupsCaching {
       LOG.info("Adding " + user + " to the blacklist");
       blackList.add(user);
     }
+
+    public static int getRequestCount() {
+      return requestCount;
+    }
+
+    public static void resetRequestCount() {
+      requestCount = 0;
+    }
+
+    public static void setGetGroupsDelayMs(long delayMs) {
+      getGroupsDelayMs = delayMs;
+    }
+  }
+
+  public static class ExceptionalGroupMapping extends ShellBasedUnixGroupsMapping {
+    private static int requestCount = 0;
+
+    @Override
+    public List<String> getGroups(String user) throws IOException {
+      requestCount++;
+      throw new IOException("For test");
+    }
+
+    public static int getRequestCount() {
+      return requestCount;
+    }
+
+    public static void resetRequestCount() {
+      requestCount = 0;
+    }
   }
 
   @Test
@@ -219,4 +268,191 @@ public class TestGroupsCaching {
     // groups for the user is fetched.
     assertEquals(Arrays.asList(myGroups), groups.getGroups(user));
   }
+
+  @Test
+  public void testCachePreventsImplRequest() throws Exception {
+    // Disable negative cache.
+    conf.setLong(
+      CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 0);
+    Groups groups = new Groups(conf);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.clearBlackList();
+
+    assertEquals(0, FakeGroupMapping.getRequestCount());
+
+    // First call hits the wire
+    assertTrue(groups.getGroups("me").size() == 2);
+    assertEquals(1, FakeGroupMapping.getRequestCount());
+
+    // Second count hits cache
+    assertTrue(groups.getGroups("me").size() == 2);
+    assertEquals(1, FakeGroupMapping.getRequestCount());
+  }
+
+  @Test
+  public void testExceptionsFromImplNotCachedInNegativeCache() {
+    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+      ExceptionalGroupMapping.class,
+      ShellBasedUnixGroupsMapping.class);
+    conf.setLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 10000);
+    Groups groups = new Groups(conf);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+
+    assertEquals(0, ExceptionalGroupMapping.getRequestCount());
+
+    // First call should hit the wire
+    try {
+      groups.getGroups("anything");
+      fail("Should have thrown");
+    } catch (IOException e) {
+      // okay
+    }
+    assertEquals(1, ExceptionalGroupMapping.getRequestCount());
+
+    // Second call should hit the wire (no negative caching)
+    try {
+      groups.getGroups("anything");
+      fail("Should have thrown");
+    } catch (IOException e) {
+      // okay
+    }
+    assertEquals(2, ExceptionalGroupMapping.getRequestCount());
+  }
+
+  @Test
+  public void testOnlyOneRequestWhenNoEntryIsCached() throws Exception {
+    // Disable negative cache.
+    conf.setLong(
+      CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 0);
+    final Groups groups = new Groups(conf);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.clearBlackList();
+    FakeGroupMapping.setGetGroupsDelayMs(100);
+
+    ArrayList<Thread> threads = new ArrayList<Thread>();
+    for (int i = 0; i < 10; i++) {
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            assertEquals(2, groups.getGroups("me").size());
+          } catch (IOException e) {
+            fail("Should not happen");
+          }
+        }
+      });
+    }
+
+    // We start a bunch of threads who all see no cached value
+    for (Thread t : threads) {
+      t.start();
+    }
+
+    for (Thread t : threads) {
+      t.join();
+    }
+
+    // But only one thread should have made the request
+    assertEquals(1, FakeGroupMapping.getRequestCount());
+  }
+
+  @Test
+  public void testOnlyOneRequestWhenExpiredEntryExists() throws Exception {
+    conf.setLong(
+      CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
+    FakeTimer timer = new FakeTimer();
+    final Groups groups = new Groups(conf, timer);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.clearBlackList();
+    FakeGroupMapping.setGetGroupsDelayMs(100);
+
+    // We make an initial request to populate the cache
+    groups.getGroups("me");
+    int startingRequestCount = FakeGroupMapping.getRequestCount();
+
+    // Then expire that entry
+    timer.advance(400 * 1000);
+    Thread.sleep(100);
+
+    ArrayList<Thread> threads = new ArrayList<Thread>();
+    for (int i = 0; i < 10; i++) {
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            assertEquals(2, groups.getGroups("me").size());
+          } catch (IOException e) {
+            fail("Should not happen");
+          }
+        }
+      });
+    }
+
+    // We start a bunch of threads who all see the cached value
+    for (Thread t : threads) {
+      t.start();
+    }
+
+    for (Thread t : threads) {
+      t.join();
+    }
+
+    // Only one extra request is made
+    assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
+  }
+
+  @Test
+  public void testCacheEntriesExpire() throws Exception {
+    conf.setLong(
+      CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
+    FakeTimer timer = new FakeTimer();
+    final Groups groups = new Groups(conf, timer);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.clearBlackList();
+
+    // We make an entry
+    groups.getGroups("me");
+    int startingRequestCount = FakeGroupMapping.getRequestCount();
+
+    timer.advance(20 * 1000);
+
+    // Cache entry has expired so it results in a new fetch
+    groups.getGroups("me");
+    assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
+  }
+
+  @Test
+  public void testNegativeCacheClearedOnRefresh() throws Exception {
+    conf.setLong(
+      CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 100);
+    final Groups groups = new Groups(conf);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.clearBlackList();
+    FakeGroupMapping.addToBlackList("dne");
+
+    try {
+      groups.getGroups("dne");
+      fail("Should have failed to find this group");
+    } catch (IOException e) {
+      // pass
+    }
+
+    int startingRequestCount = FakeGroupMapping.getRequestCount();
+
+    groups.refresh();
+    FakeGroupMapping.addToBlackList("dne");
+
+    try {
+      List<String> g = groups.getGroups("dne");
+      fail("Should have failed to find this group");
+    } catch (IOException e) {
+      // pass
+    }
+
+    assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
+  }
 }