You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/12/19 01:53:39 UTC

svn commit: r892411 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/tools/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/security/

Author: ddas
Date: Sat Dec 19 00:53:39 2009
New Revision: 892411

URL: http://svn.apache.org/viewvc?rev=892411&view=rev
Log:
MAPREDUCE-1083. Changes in MapReduce so that group information of users can be refreshed in the JobTracker via command line. Contributed by Boris Shkolnik.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=892411&r1=892410&r2=892411&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Dec 19 00:53:39 2009
@@ -80,6 +80,10 @@
     MAPREDUCE-967. TaskTracker does not need to fully unjar job jars.
     (Todd Lipcon via tomwhite)
 
+    MAPREDUCE-1083. Changes in MapReduce so that group information of users 
+    can be refreshed in the JobTracker via command line. 
+    (Boris Shkolnik via ddas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=892411&r1=892410&r2=892411&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Sat Dec 19 00:53:39 2009
@@ -87,6 +87,7 @@
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.PermissionChecker;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -109,7 +110,7 @@
  *
  *******************************************************/
 public class JobTracker implements MRConstants, InterTrackerProtocol,
-    ClientProtocol, TaskTrackerManager,
+    ClientProtocol, TaskTrackerManager, RefreshUserToGroupMappingsProtocol,
     RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JTConfig {
 
   static{
@@ -266,6 +267,8 @@
       return RefreshAuthorizationPolicyProtocol.versionID;
     } else if (protocol.equals(AdminOperationsProtocol.class.getName())){
       return AdminOperationsProtocol.versionID;
+    } else if (protocol.equals(RefreshUserToGroupMappingsProtocol.class.getName())){
+      return RefreshUserToGroupMappingsProtocol.versionID;
     } else {
       throw new IOException("Unknown protocol to job tracker: " + protocol);
     }
@@ -4094,6 +4097,15 @@
             limitMaxMemForReduceTasks).append(")"));
   }
 
+   
+  @Override
+  public void refreshUserToGroupsMappings(Configuration conf) throws IOException {
+    LOG.info("Refreshing all user-to-groups mappings. Requested by user: " + 
+             UserGroupInformation.getCurrentUGI().getUserName());
+    
+    SecurityUtil.getUserToGroupsMappingService(conf).refresh();
+  }
+  
   private boolean perTaskMemoryConfigurationSetOnJT() {
     if (limitMaxMemForMapTasks == JobConf.DISABLED_MEMORY_LIMIT
         || limitMaxMemForReduceTasks == JobConf.DISABLED_MEMORY_LIMIT

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java?rev=892411&r1=892410&r2=892411&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java Sat Dec 19 00:53:39 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.Service;
@@ -36,6 +37,8 @@
                   TaskUmbilicalProtocol.class),
       new Service("security.refresh.policy.protocol.acl", 
                   RefreshAuthorizationPolicyProtocol.class),
+      new Service("security.refresh.usertogroups.mappings.protocol.acl", 
+                  RefreshUserToGroupMappingsProtocol.class),
   };
   
   @Override

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java?rev=892411&r1=892410&r2=892411&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java Sat Dec 19 00:53:39 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.AdminOperationsProtocol;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.util.Tool;
@@ -53,8 +54,8 @@
   private static void printHelp(String cmd) {
     String summary = "hadoop mradmin is the command to execute Map-Reduce administrative commands.\n" +
     "The full syntax is: \n\n" +
-    "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] [-help [cmd]] "
-    + "[-refreshNodes]\n"; 
+    "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] " +
+    "[-refreshNodes] [-refreshUserToGroupsMappings] [-help [cmd]]\n"; 
 
   String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
     "\t\tJobtracker will reload the authorization policy file.\n";
@@ -63,6 +64,9 @@
         "-refreshQueues: Reload the queue acls and state.\n"
             + "\t\tJobTracker will reload the mapred-queues.xml file.\n";
 
+  String refreshUserToGroupsMappings = 
+    "-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
+  
   String refreshNodes =
     "-refreshNodes: Refresh the hosts information at the jobtracker.\n";
   
@@ -73,6 +77,8 @@
     System.out.println(refreshServiceAcl);
   } else if ("refreshQueues".equals(cmd)) {
     System.out.println(refreshQueues);
+  } else if ("refreshUserToGroupsMappings".equals(cmd)) {
+    System.out.println(refreshUserToGroupsMappings);
   }  else if ("refreshNodes".equals(cmd)) {
     System.out.println(refreshNodes);
   } else if ("help".equals(cmd)) {
@@ -98,12 +104,15 @@
       System.err.println("Usage: java MRAdmin" + " [-refreshServiceAcl]");
     } else if ("-refreshQueues".equals(cmd)) {
       System.err.println("Usage: java MRAdmin" + " [-refreshQueues]");
+    } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+      System.err.println("Usage: java MRAdmin" + " [-refreshUserToGroupsMappings]");
     } else if ("-refreshNodes".equals(cmd)) {
       System.err.println("Usage: java MRAdmin" + " [-refreshNodes]");
     } else {
       System.err.println("Usage: java MRAdmin");
       System.err.println("           [-refreshServiceAcl]");
       System.err.println("           [-refreshQueues]");
+      System.err.println("           [-refreshUserToGroupsMappings]");
       System.err.println("           [-refreshNodes]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
@@ -142,6 +151,29 @@
     return 0;
   }
 
+  /**
+   * Refresh the user-to-groups mappings on the {@link JobTracker}.
+   * @return exitcode 0 on success, non-zero on failure
+   * @throws IOException
+   */
+  private int refreshUserToGroupsMappings() throws IOException {
+    // Get the current configuration
+    Configuration conf = getConf();
+    // Create the client
+    RefreshUserToGroupMappingsProtocol refreshProtocol = 
+      (RefreshUserToGroupMappingsProtocol) 
+      RPC.getProxy(RefreshUserToGroupMappingsProtocol.class, 
+                   RefreshUserToGroupMappingsProtocol.versionID, 
+                   JobTracker.getAddress(conf), getUGI(conf), conf,
+                   NetUtils.getSocketFactory(conf, 
+                                             RefreshUserToGroupMappingsProtocol.class));
+    
+    // Refresh the user-to-groups mappings
+    refreshProtocol.refreshUserToGroupsMappings(conf);
+    
+    return 0;
+  }
+  
   private int refreshQueues() throws IOException {
     // Get the current configuration
     Configuration conf = getConf();
@@ -196,12 +228,11 @@
     int exitCode = -1;
     int i = 0;
     String cmd = args[i++];
-
     //
     // verify that we have enough command line parameters
     //
-    if ("-refreshServiceAcl".equals(cmd) || "-refreshQueues".equals(cmd)
-        || "-refreshNodes".equals(cmd)) {
+    if ("-refreshServiceAcl".equals(cmd) || "-refreshQueues".equals(cmd) ||
+        "-refreshNodes".equals(cmd) || "-refreshUserToGroupsMappings".equals(cmd)) {
       if (args.length != 1) {
         printUsage(cmd);
         return exitCode;
@@ -214,6 +245,8 @@
         exitCode = refreshAuthorizationPolicy();
       } else if ("-refreshQueues".equals(cmd)) {
         exitCode = refreshQueues();
+      } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+        exitCode = refreshUserToGroupsMappings();
       } else if ("-refreshNodes".equals(cmd)) {
         exitCode = refreshNodes();
       } else if ("-help".equals(cmd)) {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=892411&r1=892410&r2=892411&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Sat Dec 19 00:53:39 2009
@@ -22,7 +22,9 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import junit.framework.TestCase;
@@ -31,6 +33,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -65,6 +68,14 @@
                             int numExcluded, Configuration conf) 
   throws IOException {
     try {
+   // create fake mapping for the groups
+      Map<String, String[]> u2g_map = new HashMap<String, String[]> (1);
+      u2g_map.put("user1", new String[] {"user1" });
+      u2g_map.put("user2", new String[] {"user2" });
+      u2g_map.put("user3", new String[] {"abc" });
+      u2g_map.put("user4", new String[] {"supergroup" });
+      DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+      
       conf.setBoolean("dfs.replication.considerLoad", false);
       
       // prepare hosts info
@@ -171,7 +182,7 @@
     // refresh with super user
     success = false;
     UserGroupInformation ugi_super =
-      TestMiniMRWithDFSWithDistinctUsers.createUGI("user2", true);
+      TestMiniMRWithDFSWithDistinctUsers.createUGI("user4", true);
     client = getClient(conf, ugi_super);
     try {
       client.refreshNodes();

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java?rev=892411&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java Sat Dec 19 00:53:39 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.tools.MRAdmin;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMapredGroupMappingServiceRefresh {
+  private MiniDFSCluster cluster;
+  JobConf config;
+  private static long groupRefreshTimeoutSec = 2;
+  private static final Log LOG = LogFactory
+      .getLog(TestMapredGroupMappingServiceRefresh.class);
+  
+  public static class MockUnixGroupsMapping implements GroupMappingServiceProvider {
+    private int i=0;
+    
+    @Override
+    public List<String> getGroups(String user) throws IOException {
+      String g1 = user + (10 * i + 1);
+      String g2 = user + (10 * i + 2);
+      List<String> l = new ArrayList<String>(2);
+      l.add(g1);
+      l.add(g2);
+      i++;
+      return l;
+    }
+  }
+  
+  @Before
+  public void setUp() throws Exception {
+    config = new JobConf(new Configuration());
+    
+    config.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        TestMapredGroupMappingServiceRefresh.MockUnixGroupsMapping.class,
+        GroupMappingServiceProvider.class);
+    config.setLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 
+        groupRefreshTimeoutSec);
+    
+    LOG.info("GROUP MAPPING class name=" + 
+        config.getClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        ShellBasedUnixGroupsMapping.class,GroupMappingServiceProvider.class).
+        getName());
+    
+    String namenodeUrl = "hdfs://localhost:" + "0";
+    FileSystem.setDefaultUri(config, namenodeUrl);
+    
+    cluster = new MiniDFSCluster(0, config, 1, true, true, true,  null, null, 
+        null, null);
+    cluster.waitActive();
+    URI uri = cluster.getURI();
+    
+    MiniMRCluster miniMRCluster = new MiniMRCluster(0, uri.toString() , 
+      3, null, null, config);
+    
+    config.set(JTConfig.JT_IPC_ADDRESS, "localhost:"+miniMRCluster.getJobTrackerPort());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if(cluster!=null) {
+      cluster.shutdown();
+    }
+  }
+  
+  @Test
+  public void testGroupMappingRefresh() throws Exception {
+    MRAdmin admin = new MRAdmin(config);
+    String [] args = new String[] { "-refreshUserToGroupsMappings" };
+    
+    Groups groups = SecurityUtil.getUserToGroupsMappingService(config);
+    String user = UnixUserGroupInformation.getUnixUserName();
+    System.out.println("first attempt:");
+    List<String> g1 = groups.getGroups(user);
+    String [] str_groups = new String [g1.size()];
+    g1.toArray(str_groups);
+    System.out.println(Arrays.toString(str_groups));
+    
+    System.out.println("second attempt, should be same:");
+    List<String> g2 = groups.getGroups(user);
+    g2.toArray(str_groups);
+    System.out.println(Arrays.toString(str_groups));
+    for(int i=0; i<g2.size(); i++) {
+      assertEquals("Should be same group ", g1.get(i), g2.get(i));
+    }
+    // run refresh command
+    admin.run(args);
+    
+    System.out.println("third attempt(after refresh command), should be different:");
+    List<String> g3 = groups.getGroups(user);
+    g3.toArray(str_groups);
+    System.out.println(Arrays.toString(str_groups));
+    for(int i=0; i<g3.size(); i++) {
+      assertFalse("Should be different group ", g1.get(i).equals(g3.get(i)));
+    }
+    System.out.println("");
+    
+    // test time out
+    Thread.sleep(groupRefreshTimeoutSec*1100);
+    System.out.println("fourth attempt(after timeout), should be different:");
+    List<String> g4 = groups.getGroups(user);
+    g4.toArray(str_groups);
+    System.out.println(Arrays.toString(str_groups));
+    for(int i=0; i<g4.size(); i++) {
+      assertFalse("Should be different group ", g3.get(i).equals(g4.get(i)));
+    }
+  }
+}