You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/05/11 14:39:41 UTC
svn commit: r773542 - in /hadoop/core/trunk: ./
src/core/org/apache/hadoop/security/ src/core/org/apache/hadoop/util/
src/hdfs/org/apache/hadoop/hdfs/server/namenode/
src/mapred/org/apache/hadoop/mapred/
src/mapred/org/apache/hadoop/mapred/tools/ src/t...
Author: ddas
Date: Mon May 11 12:39:40 2009
New Revision: 773542
URL: http://svn.apache.org/viewvc?rev=773542&view=rev
Log:
HADOOP-5643. Adds a way to decommission TaskTrackers while the JobTracker is running. Contributed by Amar Kamat.
Added:
hadoop/core/trunk/src/core/org/apache/hadoop/security/PermissionChecker.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/util/HostsFileReader.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
hadoop/core/trunk/src/test/mapred-site.xml
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
hadoop/core/trunk/src/webapps/job/jobtracker.jsp
hadoop/core/trunk/src/webapps/job/machines.jsp
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 11 12:39:40 2009
@@ -116,6 +116,9 @@
HADOOP-5745. Allow setting the default value of maxRunningJobs for all
pools. (dhruba via matei)
+ HADOOP-5643. Adds a way to decommission TaskTrackers while the JobTracker
+ is running. (Amar Kamat via ddas)
+
IMPROVEMENTS
HADOOP-4565. Added CombineFileInputFormat to use data locality information
Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/PermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/PermissionChecker.java?rev=773542&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/PermissionChecker.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/PermissionChecker.java Mon May 11 12:39:40 2009
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/** Perform permission checking. */
+public class PermissionChecker {
+ static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
+
+ public final String user;
+ protected final Set<String> groups = new HashSet<String>();
+ public final boolean isSuper;
+
+ /**
+ * Checks if the caller has the required permission.
+ * @param owner username of the owner
+ * @param supergroup supergroup that the owner belongs to
+ */
+ public PermissionChecker(String owner, String supergroup
+ ) throws AccessControlException{
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ugi=" + ugi);
+ }
+
+ if (ugi != null) {
+ user = ugi.getUserName();
+ groups.addAll(Arrays.asList(ugi.getGroupNames()));
+ isSuper = user.equals(owner) || groups.contains(supergroup);
+ }
+ else {
+ throw new AccessControlException("ugi = null");
+ }
+ }
+
+ /**
+ * Check if the callers group contains the required values.
+ * @param group group to check
+ */
+ public boolean containsGroup(String group) {return groups.contains(group);}
+
+ /**
+ * Verify if the caller has the required permission. This will result into
+ * an exception if the caller is not allowed to access the resource.
+ * @param owner owner of the system
+ * @param supergroup supergroup of the system
+ */
+ public static void checkSuperuserPrivilege(UserGroupInformation owner,
+ String supergroup)
+ throws AccessControlException {
+ PermissionChecker checker =
+ new PermissionChecker(owner.getUserName(), supergroup);
+ if (!checker.isSuper) {
+ throw new AccessControlException("Access denied for user "
+ + checker.user + ". Superuser privilege is required");
+ }
+ }
+}
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/HostsFileReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/HostsFileReader.java?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/HostsFileReader.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/HostsFileReader.java Mon May 11 12:39:40 2009
@@ -22,13 +22,18 @@
import java.util.Set;
import java.util.HashSet;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
-// Keeps track of which datanodes are allowed to connect to the namenode.
+// Keeps track of which datanodes/tasktrackers are allowed to connect to the
+// namenode/jobtracker.
public class HostsFileReader {
private Set<String> includes;
private Set<String> excludes;
private String includesFile;
private String excludesFile;
+
+ private static final Log LOG = LogFactory.getLog(HostsFileReader.class);
public HostsFileReader(String inFile,
String exFile) throws IOException {
@@ -40,7 +45,11 @@
}
private void readFileToSet(String filename, Set<String> set) throws IOException {
- FileInputStream fis = new FileInputStream(new File(filename));
+ File file = new File(filename);
+ if (!file.exists()) {
+ return;
+ }
+ FileInputStream fis = new FileInputStream(file);
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(fis));
@@ -64,30 +73,36 @@
}
public synchronized void refresh() throws IOException {
- includes.clear();
- excludes.clear();
-
+ LOG.info("Refreshing hosts (include/exclude) list");
if (!includesFile.equals("")) {
- readFileToSet(includesFile, includes);
+ Set<String> newIncludes = new HashSet<String>();
+ readFileToSet(includesFile, newIncludes);
+ // switch the new hosts that are to be included
+ includes = newIncludes;
}
if (!excludesFile.equals("")) {
- readFileToSet(excludesFile, excludes);
+ Set<String> newExcludes = new HashSet<String>();
+ readFileToSet(excludesFile, newExcludes);
+ // switch the excluded hosts
+ excludes = newExcludes;
}
}
- public Set<String> getHosts() {
+ public synchronized Set<String> getHosts() {
return includes;
}
- public Set<String> getExcludedHosts() {
+ public synchronized Set<String> getExcludedHosts() {
return excludes;
}
public synchronized void setIncludesFile(String includesFile) {
+ LOG.info("Setting the includes file to " + includesFile);
this.includesFile = includesFile;
}
public synchronized void setExcludesFile(String excludesFile) {
+ LOG.info("Setting the excludes file to " + excludesFile);
this.excludesFile = excludesFile;
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon May 11 12:39:40 2009
@@ -30,6 +30,7 @@
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.PermissionChecker;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.*;
@@ -447,7 +448,7 @@
* Dump all metadata into specified file
*/
synchronized void metaSave(String filename) throws IOException {
- checkSuperuserPrivilege();
+ checkAccess();
File file = new File(System.getProperty("hadoop.log.dir"),
filename);
PrintWriter out = new PrintWriter(new BufferedWriter(
@@ -490,7 +491,7 @@
*/
synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
throws IOException {
- checkSuperuserPrivilege();
+ checkAccess();
DatanodeDescriptor node = getDatanode(datanode);
if (node == null) {
@@ -570,7 +571,7 @@
*/
public synchronized void setOwner(String src, String username, String group
) throws IOException {
- PermissionChecker pc = checkOwner(src);
+ FSPermissionChecker pc = checkOwner(src);
if (!pc.isSuper) {
if (username != null && !pc.user.equals(username)) {
throw new AccessControlException("Non-super user cannot change owner.");
@@ -1442,7 +1443,7 @@
*/
void setQuota(String path, long nsQuota, long dsQuota) throws IOException {
if (isPermissionEnabled) {
- checkSuperuserPrivilege();
+ checkAccess();
}
dir.setQuota(path, nsQuota, dsQuota);
@@ -2464,7 +2465,7 @@
public synchronized DatanodeInfo[] datanodeReport( DatanodeReportType type
) throws AccessControlException {
- checkSuperuserPrivilege();
+ checkAccess();
ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
DatanodeInfo[] arr = new DatanodeInfo[results.size()];
@@ -2483,7 +2484,7 @@
* @throws IOException if
*/
synchronized void saveNamespace() throws AccessControlException, IOException {
- checkSuperuserPrivilege();
+ checkAccess();
if(!isInSafeMode()) {
throw new IOException("Safe mode should be turned ON " +
"in order to create namespace image.");
@@ -2499,7 +2500,7 @@
* @throws AccessControlException if superuser privilege is violated.
*/
synchronized boolean restoreFailedStorage(String arg) throws AccessControlException {
- checkSuperuserPrivilege();
+ checkAccess();
// if it is disabled - enable it and vice versa.
if(arg.equals("check"))
@@ -2675,7 +2676,7 @@
* 4. Removed from exclude --> stop decommission.
*/
public void refreshNodes(Configuration conf) throws IOException {
- checkSuperuserPrivilege();
+ checkAccess();
// Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
// Update the file names and refresh internal includes and excludes list
if (conf == null)
@@ -2709,7 +2710,7 @@
}
void finalizeUpgrade() throws IOException {
- checkSuperuserPrivilege();
+ checkAccess();
getFSImage().finalizeUpgrade();
}
@@ -3160,7 +3161,7 @@
boolean setSafeMode(SafeModeAction action) throws IOException {
if (action != SafeModeAction.SAFEMODE_GET) {
- checkSuperuserPrivilege();
+ checkAccess();
switch(action) {
case SAFEMODE_LEAVE: // leave safe mode
leaveSafeMode(false);
@@ -3336,49 +3337,45 @@
return new PermissionStatus(fsOwner.getUserName(), supergroup, permission);
}
- private PermissionChecker checkOwner(String path) throws AccessControlException {
+ private FSPermissionChecker checkOwner(String path) throws AccessControlException {
return checkPermission(path, true, null, null, null, null);
}
- private PermissionChecker checkPathAccess(String path, FsAction access
+ private FSPermissionChecker checkPathAccess(String path, FsAction access
) throws AccessControlException {
return checkPermission(path, false, null, null, access, null);
}
- private PermissionChecker checkParentAccess(String path, FsAction access
+ private FSPermissionChecker checkParentAccess(String path, FsAction access
) throws AccessControlException {
return checkPermission(path, false, null, access, null, null);
}
- private PermissionChecker checkAncestorAccess(String path, FsAction access
+ private FSPermissionChecker checkAncestorAccess(String path, FsAction access
) throws AccessControlException {
return checkPermission(path, false, access, null, null, null);
}
- private PermissionChecker checkTraverse(String path
+ private FSPermissionChecker checkTraverse(String path
) throws AccessControlException {
return checkPermission(path, false, null, null, null, null);
}
- private void checkSuperuserPrivilege() throws AccessControlException {
+ private void checkAccess() throws AccessControlException {
if (isPermissionEnabled) {
- PermissionChecker pc = new PermissionChecker(
- fsOwner.getUserName(), supergroup);
- if (!pc.isSuper) {
- throw new AccessControlException("Superuser privilege is required");
- }
+ PermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
}
}
/**
* Check whether current user have permissions to access the path.
* For more details of the parameters, see
- * {@link PermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
+ * {@link FSPermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
*/
- private PermissionChecker checkPermission(String path, boolean doCheckOwner,
+ private FSPermissionChecker checkPermission(String path, boolean doCheckOwner,
FsAction ancestorAccess, FsAction parentAccess, FsAction access,
FsAction subAccess) throws AccessControlException {
- PermissionChecker pc = new PermissionChecker(
+ FSPermissionChecker pc = new FSPermissionChecker(
fsOwner.getUserName(), supergroup);
if (!pc.isSuper) {
dir.waitForReady();
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=773542&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Mon May 11 12:39:40 2009
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.PermissionChecker;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/** Perform permission checking in {@link FSNamesystem}. */
+class FSPermissionChecker extends PermissionChecker {
+ static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
+
+ FSPermissionChecker(String fsOwner, String supergroup
+ ) throws AccessControlException{
+ super(fsOwner, supergroup);
+ }
+
+ /**
+ * Check whether current user have permissions to access the path.
+ * Traverse is always checked.
+ *
+ * Parent path means the parent directory for the path.
+ * Ancestor path means the last (the closest) existing ancestor directory
+ * of the path.
+ * Note that if the parent path exists,
+ * then the parent path and the ancestor path are the same.
+ *
+ * For example, suppose the path is "/foo/bar/baz".
+ * No matter baz is a file or a directory,
+ * the parent path is "/foo/bar".
+ * If bar exists, then the ancestor path is also "/foo/bar".
+ * If bar does not exist and foo exists,
+ * then the ancestor path is "/foo".
+ * Further, if both foo and bar do not exist,
+ * then the ancestor path is "/".
+ *
+ * @param doCheckOwner Require user to be the owner of the path?
+ * @param ancestorAccess The access required by the ancestor of the path.
+ * @param parentAccess The access required by the parent of the path.
+ * @param access The access required by the path.
+ * @param subAccess If path is a directory,
+ * it is the access required of the path and all the sub-directories.
+ * If path is not a directory, there is no effect.
+ * @return a PermissionChecker object which caches data for later use.
+ * @throws AccessControlException
+ */
+ void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
+ FsAction ancestorAccess, FsAction parentAccess, FsAction access,
+ FsAction subAccess) throws AccessControlException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ACCESS CHECK: " + this
+ + ", doCheckOwner=" + doCheckOwner
+ + ", ancestorAccess=" + ancestorAccess
+ + ", parentAccess=" + parentAccess
+ + ", access=" + access
+ + ", subAccess=" + subAccess);
+ }
+ // check if (parentAccess != null) && file exists, then check sb
+ synchronized(root) {
+ INode[] inodes = root.getExistingPathINodes(path);
+ int ancestorIndex = inodes.length - 2;
+ for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
+ ancestorIndex--);
+ checkTraverse(inodes, ancestorIndex);
+
+ if(parentAccess != null && parentAccess.implies(FsAction.WRITE)
+ && inodes[inodes.length - 1] != null)
+ checkStickyBit(inodes[inodes.length - 2], inodes[inodes.length - 1]);
+
+ if (ancestorAccess != null && inodes.length > 1) {
+ check(inodes, ancestorIndex, ancestorAccess);
+ }
+ if (parentAccess != null && inodes.length > 1) {
+ check(inodes, inodes.length - 2, parentAccess);
+ }
+ if (access != null) {
+ check(inodes[inodes.length - 1], access);
+ }
+ if (subAccess != null) {
+ checkSubAccess(inodes[inodes.length - 1], subAccess);
+ }
+ if (doCheckOwner) {
+ checkOwner(inodes[inodes.length - 1]);
+ }
+ }
+ }
+
+ private void checkOwner(INode inode) throws AccessControlException {
+ if (inode != null && user.equals(inode.getUserName())) {
+ return;
+ }
+ throw new AccessControlException("Permission denied");
+ }
+
+ private void checkTraverse(INode[] inodes, int last
+ ) throws AccessControlException {
+ for(int j = 0; j <= last; j++) {
+ check(inodes[j], FsAction.EXECUTE);
+ }
+ }
+
+ private void checkSubAccess(INode inode, FsAction access
+ ) throws AccessControlException {
+ if (inode == null || !inode.isDirectory()) {
+ return;
+ }
+
+ Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
+ for(directories.push((INodeDirectory)inode); !directories.isEmpty(); ) {
+ INodeDirectory d = directories.pop();
+ check(d, access);
+
+ for(INode child : d.getChildren()) {
+ if (child.isDirectory()) {
+ directories.push((INodeDirectory)child);
+ }
+ }
+ }
+ }
+
+ private void check(INode[] inodes, int i, FsAction access
+ ) throws AccessControlException {
+ check(i >= 0? inodes[i]: null, access);
+ }
+
+ private void check(INode inode, FsAction access
+ ) throws AccessControlException {
+ if (inode == null) {
+ return;
+ }
+ FsPermission mode = inode.getFsPermission();
+
+ if (user.equals(inode.getUserName())) { //user class
+ if (mode.getUserAction().implies(access)) { return; }
+ }
+ else if (groups.contains(inode.getGroupName())) { //group class
+ if (mode.getGroupAction().implies(access)) { return; }
+ }
+ else { //other class
+ if (mode.getOtherAction().implies(access)) { return; }
+ }
+ throw new AccessControlException("Permission denied: user=" + user
+ + ", access=" + access + ", inode=" + inode);
+ }
+
+ private void checkStickyBit(INode parent, INode inode) throws AccessControlException {
+ if(!parent.getFsPermission().getStickyBit()) {
+ return;
+ }
+
+ // If this user is the directory owner, return
+ if(parent.getUserName().equals(user)) {
+ return;
+ }
+
+ // if this user is the file owner, return
+ if(inode.getUserName().equals(user)) {
+ return;
+ }
+
+ throw new AccessControlException("Permission denied by sticky bit setting:" +
+ " user=" + user + ", inode=" + inode);
+ }
+}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java Mon May 11 12:39:40 2009
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import java.util.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/** Perform permission checking in {@link FSNamesystem}. */
-class PermissionChecker {
- static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
-
- final String user;
- private final Set<String> groups = new HashSet<String>();
- final boolean isSuper;
-
- PermissionChecker(String fsOwner, String supergroup
- ) throws AccessControlException{
- UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
- if (LOG.isDebugEnabled()) {
- LOG.debug("ugi=" + ugi);
- }
-
- if (ugi != null) {
- user = ugi.getUserName();
- groups.addAll(Arrays.asList(ugi.getGroupNames()));
- isSuper = user.equals(fsOwner) || groups.contains(supergroup);
- }
- else {
- throw new AccessControlException("ugi = null");
- }
- }
-
- boolean containsGroup(String group) {return groups.contains(group);}
-
- /**
- * Check whether current user have permissions to access the path.
- * Traverse is always checked.
- *
- * Parent path means the parent directory for the path.
- * Ancestor path means the last (the closest) existing ancestor directory
- * of the path.
- * Note that if the parent path exists,
- * then the parent path and the ancestor path are the same.
- *
- * For example, suppose the path is "/foo/bar/baz".
- * No matter baz is a file or a directory,
- * the parent path is "/foo/bar".
- * If bar exists, then the ancestor path is also "/foo/bar".
- * If bar does not exist and foo exists,
- * then the ancestor path is "/foo".
- * Further, if both foo and bar do not exist,
- * then the ancestor path is "/".
- *
- * @param doCheckOwner Require user to be the owner of the path?
- * @param ancestorAccess The access required by the ancestor of the path.
- * @param parentAccess The access required by the parent of the path.
- * @param access The access required by the path.
- * @param subAccess If path is a directory,
- * it is the access required of the path and all the sub-directories.
- * If path is not a directory, there is no effect.
- * @return a PermissionChecker object which caches data for later use.
- * @throws AccessControlException
- */
- void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
- FsAction ancestorAccess, FsAction parentAccess, FsAction access,
- FsAction subAccess) throws AccessControlException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("ACCESS CHECK: " + this
- + ", doCheckOwner=" + doCheckOwner
- + ", ancestorAccess=" + ancestorAccess
- + ", parentAccess=" + parentAccess
- + ", access=" + access
- + ", subAccess=" + subAccess);
- }
- // check if (parentAccess != null) && file exists, then check sb
- synchronized(root) {
- INode[] inodes = root.getExistingPathINodes(path);
- int ancestorIndex = inodes.length - 2;
- for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
- ancestorIndex--);
- checkTraverse(inodes, ancestorIndex);
-
- if(parentAccess != null && parentAccess.implies(FsAction.WRITE)
- && inodes[inodes.length - 1] != null)
- checkStickyBit(inodes[inodes.length - 2], inodes[inodes.length - 1]);
-
- if (ancestorAccess != null && inodes.length > 1) {
- check(inodes, ancestorIndex, ancestorAccess);
- }
- if (parentAccess != null && inodes.length > 1) {
- check(inodes, inodes.length - 2, parentAccess);
- }
- if (access != null) {
- check(inodes[inodes.length - 1], access);
- }
- if (subAccess != null) {
- checkSubAccess(inodes[inodes.length - 1], subAccess);
- }
- if (doCheckOwner) {
- checkOwner(inodes[inodes.length - 1]);
- }
- }
- }
-
- private void checkOwner(INode inode) throws AccessControlException {
- if (inode != null && user.equals(inode.getUserName())) {
- return;
- }
- throw new AccessControlException("Permission denied");
- }
-
- private void checkTraverse(INode[] inodes, int last
- ) throws AccessControlException {
- for(int j = 0; j <= last; j++) {
- check(inodes[j], FsAction.EXECUTE);
- }
- }
-
- private void checkSubAccess(INode inode, FsAction access
- ) throws AccessControlException {
- if (inode == null || !inode.isDirectory()) {
- return;
- }
-
- Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
- for(directories.push((INodeDirectory)inode); !directories.isEmpty(); ) {
- INodeDirectory d = directories.pop();
- check(d, access);
-
- for(INode child : d.getChildren()) {
- if (child.isDirectory()) {
- directories.push((INodeDirectory)child);
- }
- }
- }
- }
-
- private void check(INode[] inodes, int i, FsAction access
- ) throws AccessControlException {
- check(i >= 0? inodes[i]: null, access);
- }
-
- private void check(INode inode, FsAction access
- ) throws AccessControlException {
- if (inode == null) {
- return;
- }
- FsPermission mode = inode.getFsPermission();
-
- if (user.equals(inode.getUserName())) { //user class
- if (mode.getUserAction().implies(access)) { return; }
- }
- else if (groups.contains(inode.getGroupName())) { //group class
- if (mode.getGroupAction().implies(access)) { return; }
- }
- else { //other class
- if (mode.getOtherAction().implies(access)) { return; }
- }
- throw new AccessControlException("Permission denied: user=" + user
- + ", access=" + access + ", inode=" + inode);
- }
-
- private void checkStickyBit(INode parent, INode inode) throws AccessControlException {
- if(!parent.getFsPermission().getStickyBit()) {
- return;
- }
-
- // If this user is the directory owner, return
- if(parent.getUserName().equals(user)) {
- return;
- }
-
- // if this user is the file owner, return
- if(inode.getUserName().equals(user)) {
- return;
- }
-
- throw new AccessControlException("Permission denied by sticky bit setting:" +
- " user=" + user + ", inode=" + inode);
- }
-}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java Mon May 11 12:39:40 2009
@@ -30,11 +30,17 @@
/**
* Version 1: Initial version. Added refreshQueueAcls.
+ * Version 2: Added node refresh facility
*/
- public static final long versionID = 1L;
+ public static final long versionID = 2L;
/**
* Refresh the queue acls in use currently.
*/
void refreshQueueAcls() throws IOException;
+
+ /**
+ * Refresh the node list at the {@link JobTracker}
+ */
+ void refreshNodes() throws IOException;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java Mon May 11 12:39:40 2009
@@ -61,6 +61,7 @@
private Collection<String> activeTrackers = new ArrayList<String>();
private Collection<String> blacklistedTrackers = new ArrayList<String>();
private int numBlacklistedTrackers;
+ private int numExcludedNodes;
private long ttExpiryInterval;
private int map_tasks;
private int reduce_tasks;
@@ -87,8 +88,19 @@
ClusterStatus(int trackers, int blacklists, long ttExpiryInterval,
int maps, int reduces,
int maxMaps, int maxReduces, JobTracker.State state) {
+ this(trackers, blacklists, ttExpiryInterval, maps, reduces, maxMaps,
+ maxReduces, state, 0);
+ }
+
+ /**
+ * @param numDecommissionedNodes number of decommission trackers
+ */
+ ClusterStatus(int trackers, int blacklists, long ttExpiryInterval,
+ int maps, int reduces, int maxMaps, int maxReduces,
+ JobTracker.State state, int numDecommissionedNodes) {
numActiveTrackers = trackers;
numBlacklistedTrackers = blacklists;
+ this.numExcludedNodes = numDecommissionedNodes;
this.ttExpiryInterval = ttExpiryInterval;
map_tasks = maps;
reduce_tasks = reduces;
@@ -116,8 +128,19 @@
long ttExpiryInterval,
int maps, int reduces, int maxMaps, int maxReduces,
JobTracker.State state) {
+ this(activeTrackers, blacklistedTrackers, ttExpiryInterval, maps, reduces,
+ maxMaps, maxReduces, state, 0);
+ }
+
+ /**
+ * @param numDecommissionNodes number of decommission trackers
+ */
+ ClusterStatus(Collection<String> activeTrackers,
+ Collection<String> blacklistedTrackers, long ttExpiryInterval,
+ int maps, int reduces, int maxMaps, int maxReduces,
+ JobTracker.State state, int numDecommissionNodes) {
this(activeTrackers.size(), blacklistedTrackers.size(), ttExpiryInterval,
- maps, reduces, maxMaps, maxReduces, state);
+ maps, reduces, maxMaps, maxReduces, state, numDecommissionNodes);
this.activeTrackers = activeTrackers;
this.blacklistedTrackers = blacklistedTrackers;
}
@@ -160,6 +183,14 @@
}
/**
+ * Get the number of excluded hosts in the cluster.
+ * @return the number of excluded hosts in the cluster.
+ */
+ public int getNumExcludedNodes() {
+ return numExcludedNodes;
+ }
+
+ /**
* Get the tasktracker expiry interval for the cluster
* @return the expiry interval in msec
*/
@@ -252,6 +283,7 @@
Text.writeString(out, tracker);
}
}
+ out.writeInt(numExcludedNodes);
out.writeLong(ttExpiryInterval);
out.writeInt(map_tasks);
out.writeInt(reduce_tasks);
@@ -279,6 +311,7 @@
blacklistedTrackers.add(name);
}
}
+ numExcludedNodes = in.readInt();
ttExpiryInterval = in.readLong();
map_tasks = in.readInt();
reduce_tasks = in.readInt();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon May 11 12:39:40 2009
@@ -43,6 +43,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import javax.security.auth.login.LoginException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -68,7 +70,9 @@
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.PermissionChecker;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ConfiguredPolicy;
@@ -356,6 +360,10 @@
faultyTrackers.numBlacklistedTrackers -= 1;
}
updateTaskTrackerStatus(trackerName, null);
+
+ // remove the mapping from the hosts list
+ String hostname = newProfile.getHost();
+ hostnameToTrackerName.get(hostname).remove(trackerName);
} else {
// Update time by inserting latest profile
trackerExpiryQueue.add(newProfile);
@@ -1446,6 +1454,12 @@
Map<String, Node> hostnameToNodeMap =
Collections.synchronizedMap(new TreeMap<String, Node>());
+ // (hostname --> Set(trackername))
+ // This is used to keep track of all trackers running on one host. While
+ // decommissioning the host, all the trackers on the host will be lost.
+ Map<String, Set<String>> hostnameToTrackerName =
+ Collections.synchronizedMap(new TreeMap<String, Set<String>>());
+
// Number of resolved entries
int numResolved;
@@ -1508,6 +1522,8 @@
FileSystem fs = null;
Path systemDir = null;
private JobConf conf;
+ private final UserGroupInformation mrOwner;
+ private final String supergroup;
private QueueManager queueManager;
@@ -1515,6 +1531,16 @@
* Start the JobTracker process, listen on the indicated port
*/
JobTracker(JobConf conf) throws IOException, InterruptedException {
+ // find the owner of the process
+ try {
+ mrOwner = UnixUserGroupInformation.login(conf);
+ } catch (LoginException e) {
+ throw new IOException(StringUtils.stringifyException(e));
+ }
+ supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
+ LOG.info("Starting jobtracker with owner as " + mrOwner.getUserName()
+ + " and supergroup as " + supergroup);
+
//
// Grab some static constants
//
@@ -2336,11 +2362,23 @@
*/
private void addNewTracker(TaskTrackerStatus status) {
trackerExpiryQueue.add(status);
+
// Register the tracker if its not registered
+ String hostname = status.getHost();
if (getNode(status.getTrackerName()) == null) {
// Making the network location resolution inline ..
- resolveAndAddToTopology(status.getHost());
+ resolveAndAddToTopology(hostname);
}
+
+ // add it to the set of tracker per host
+ Set<String> trackers = hostnameToTrackerName.get(hostname);
+ if (trackers == null) {
+ trackers = Collections.synchronizedSet(new HashSet<String>());
+ hostnameToTrackerName.put(hostname, trackers);
+ }
+ LOG.info("Adding tracker " + status.getTrackerName() + " to host "
+ + hostname);
+ trackers.add(status.getTrackerName());
}
public Node resolveAndAddToTopology(String name) {
@@ -3039,7 +3077,8 @@
totalReduces,
totalMapTaskCapacity,
totalReduceTaskCapacity,
- state);
+ state, getExcludedNodes().size()
+ );
} else {
return new ClusterStatus(taskTrackers.size() -
getBlacklistedTrackerCount(),
@@ -3049,7 +3088,7 @@
totalReduces,
totalMapTaskCapacity,
totalReduceTaskCapacity,
- state);
+ state, getExcludedNodes().size());
}
}
}
@@ -3537,6 +3576,64 @@
}
}
+ /**
+ * Rereads the config to get hosts and exclude list file names.
+ * Rereads the files to update the hosts and exclude lists.
+ */
+ public synchronized void refreshNodes() throws IOException {
+ // check access
+ PermissionChecker.checkSuperuserPrivilege(mrOwner, supergroup);
+
+ // Reread the config to get mapred.hosts and mapred.hosts.exclude filenames.
+ // Update the file names and refresh internal includes and excludes list
+ LOG.info("Refreshing hosts information");
+ Configuration conf = new Configuration();
+
+ hostsReader.updateFileNames(conf.get("mapred.hosts",""),
+ conf.get("mapred.hosts.exclude", ""));
+ hostsReader.refresh();
+
+ Set<String> excludeSet = new HashSet<String>();
+ for(Map.Entry<String, TaskTrackerStatus> eSet : taskTrackers.entrySet()) {
+ String trackerName = eSet.getKey();
+ TaskTrackerStatus status = eSet.getValue();
+ // Check if not include i.e not in host list or in hosts list but excluded
+ if (!inHostsList(status) || inExcludedHostsList(status)) {
+ excludeSet.add(status.getHost()); // add to rejected trackers
+ }
+ }
+ decommissionNodes(excludeSet);
+ }
+
+ // main decommission
+ private synchronized void decommissionNodes(Set<String> hosts)
+ throws IOException {
+ LOG.info("Decommissioning " + hosts.size() + " nodes");
+ // create a list of tracker hostnames
+ synchronized (taskTrackers) {
+ synchronized (trackerExpiryQueue) {
+ for (String host : hosts) {
+ LOG.info("Decommissioning host " + host);
+ Set<String> trackers = hostnameToTrackerName.remove(host);
+ if (trackers != null) {
+ for (String tracker : trackers) {
+ LOG.info("Losing tracker " + tracker + " on host " + host);
+ lostTaskTracker(tracker); // lose the tracker
+ updateTaskTrackerStatus(tracker, null);
+ }
+ }
+ LOG.info("Host " + host + " is ready for decommissioning");
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns a set of excluded nodes.
+ */
+ Collection<String> getExcludedNodes() {
+ return hostsReader.getExcludedHosts();
+ }
/**
* Get the localized job file path on the job trackers local file system
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java Mon May 11 12:39:40 2009
@@ -30,6 +30,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -53,7 +54,8 @@
private static void printHelp(String cmd) {
String summary = "hadoop mradmin is the command to execute Map-Reduce administrative commands.\n" +
"The full syntax is: \n\n" +
- "hadoop mradmin [-refreshServiceAcl] [-refreshQueueAcls] [-help [cmd]]\n";
+ "hadoop mradmin [-refreshServiceAcl] [-refreshQueueAcls] [-help [cmd]] "
+ + "[-refreshNodes]\n";
String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
"\t\tJobtracker will reload the authorization policy file.\n";
@@ -62,6 +64,9 @@
"-refreshQueueAcls: Reload the queue acls\n"
+ "\t\tJobTracker will reload the mapred-queue-acls.xml file.\n";
+ String refreshNodes =
+ "-refreshNodes: Refresh the hosts information at the jobtracker.\n";
+
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
@@ -69,12 +74,15 @@
System.out.println(refreshServiceAcl);
} else if ("refreshQueueAcls".equals(cmd)) {
System.out.println(refreshQueueAcls);
+ } else if ("refreshNodes".equals(cmd)) {
+ System.out.println(refreshNodes);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
System.out.println(summary);
System.out.println(refreshServiceAcl);
System.out.println(refreshQueueAcls);
+ System.out.println(refreshNodes);
System.out.println(help);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
@@ -91,10 +99,13 @@
System.err.println("Usage: java MRAdmin" + " [-refreshServiceAcl]");
} else if ("-refreshQueueAcls".equals(cmd)) {
System.err.println("Usage: java MRAdmin" + " [-refreshQueueAcls]");
+ } else if ("-refreshNodes".equals(cmd)) {
+ System.err.println("Usage: java MRAdmin" + " [-refreshNodes]");
} else {
System.err.println("Usage: java MRAdmin");
System.err.println(" [-refreshServiceAcl]");
System.err.println(" [-refreshQueueAcls]");
+ System.err.println(" [-refreshNodes]");
System.err.println(" [-help [cmd]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
@@ -151,6 +162,31 @@
return 0;
}
+ /**
+ * Command to ask the jobtracker to reread the hosts and excluded hosts
+ * file.
+ * Usage: java MRAdmin -refreshNodes
+ * @exception IOException
+ */
+ private int refreshNodes() throws IOException {
+ // Get the current configuration
+ Configuration conf = getConf();
+
+ // Create the client
+ AdminOperationsProtocol adminOperationsProtocol =
+ (AdminOperationsProtocol)
+ RPC.getProxy(AdminOperationsProtocol.class,
+ AdminOperationsProtocol.versionID,
+ JobTracker.getAddress(conf), getUGI(conf), conf,
+ NetUtils.getSocketFactory(conf,
+ AdminOperationsProtocol.class));
+
+ // Refresh the queue properties
+ adminOperationsProtocol.refreshNodes();
+
+ return 0;
+ }
+
@Override
public int run(String[] args) throws Exception {
if (args.length < 1) {
@@ -165,7 +201,8 @@
//
// verify that we have enough command line parameters
//
- if ("-refreshServiceAcl".equals(cmd) || "-refreshQueueAcls".equals(cmd)) {
+ if ("-refreshServiceAcl".equals(cmd) || "-refreshQueueAcls".equals(cmd)
+ || "-refreshNodes".equals(cmd)) {
if (args.length != 1) {
printUsage(cmd);
return exitCode;
@@ -178,6 +215,8 @@
exitCode = refreshAuthorizationPolicy();
} else if ("-refreshQueueAcls".equals(cmd)) {
exitCode = refreshQueueAcls();
+ } else if ("-refreshNodes".equals(cmd)) {
+ exitCode = refreshNodes();
} else if ("-help".equals(cmd)) {
if (i < args.length) {
printUsage(args[i]);
Modified: hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java (original)
+++ hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java Mon May 11 12:39:40 2009
@@ -55,11 +55,11 @@
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
PrintStream newOut = new PrintStream(bStream, true);
System.setOut(newOut);
- ((Log4JLogger)PermissionChecker.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)FSPermissionChecker.LOG).getLogger().setLevel(Level.ALL);
int errCode = ToolRunner.run(new DFSck(conf), path);
if (checkErrorCode)
assertEquals(expectedErrCode, errCode);
- ((Log4JLogger)PermissionChecker.LOG).getLogger().setLevel(Level.INFO);
+ ((Log4JLogger)FSPermissionChecker.LOG).getLogger().setLevel(Level.INFO);
System.setOut(oldOut);
return bStream.toString();
}
Modified: hadoop/core/trunk/src/test/mapred-site.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred-site.xml?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred-site.xml (original)
+++ hadoop/core/trunk/src/test/mapred-site.xml Mon May 11 12:39:40 2009
@@ -9,5 +9,10 @@
<name>io.sort.mb</name>
<value>10</value>
</property>
+<property>
+ <name>mapred.hosts.exclude</name>
+ <value>hosts.exclude</value>
+ <description></description>
+</property>
</configuration>
Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Mon May 11 12:39:40 2009
@@ -54,6 +54,7 @@
private String namenode;
private UnixUserGroupInformation ugi = null;
private JobConf conf;
+ private int numTrackerToExclude;
private JobConf job;
@@ -262,13 +263,16 @@
JobClient client;
try {
client = new JobClient(job);
- while(client.getClusterStatus().getTaskTrackers()<taskTrackerList.size()) {
+ ClusterStatus status = client.getClusterStatus();
+ while(status.getTaskTrackers() + numTrackerToExclude
+ < taskTrackerList.size()) {
for(TaskTrackerRunner runner : taskTrackerList) {
if(runner.isDead) {
throw new RuntimeException("TaskTracker is dead");
}
}
Thread.sleep(1000);
+ status = client.getClusterStatus();
}
}
catch (IOException ex) {
@@ -409,6 +413,14 @@
int numTaskTrackers, String namenode,
int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi,
JobConf conf) throws IOException {
+ this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+ racks, hosts, ugi, conf, 0);
+ }
+
+ public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+ int numTaskTrackers, String namenode,
+ int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi,
+ JobConf conf, int numTrackerToExclude) throws IOException {
if (racks != null && racks.length < numTaskTrackers) {
LOG.error("Invalid number of racks specified. It should be at least " +
"equal to the number of tasktrackers");
@@ -443,6 +455,7 @@
this.namenode = namenode;
this.ugi = ugi;
this.conf = conf; // this is the conf the mr starts with
+ this.numTrackerToExclude = numTrackerToExclude;
// start the jobtracker
startJobTracker();
Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Mon May 11 12:39:40 2009
@@ -32,16 +32,14 @@
* A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
*/
public class TestMiniMRWithDFSWithDistinctUsers extends TestCase {
- static final long now = System.currentTimeMillis();
static final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true);
static final UnixUserGroupInformation PI_UGI = createUGI("pi", false);
static final UnixUserGroupInformation WC_UGI = createUGI("wc", false);
static UnixUserGroupInformation createUGI(String name, boolean issuper) {
- String username = name + now;
- String group = issuper? "supergroup": username;
+ String group = issuper? "supergroup": name;
return UnixUserGroupInformation.createImmutable(
- new String[]{username, group});
+ new String[]{name, group});
}
static JobConf createJobConf(MiniMRCluster mr, UnixUserGroupInformation ugi) {
Modified: hadoop/core/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobtracker.jsp?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/jobtracker.jsp Mon May 11 12:39:40 2009
@@ -51,7 +51,8 @@
"<th>Total Submissions</th>" +
"<th>Nodes</th><th>Map Task Capacity</th>" +
"<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th>" +
- "<th>Blacklisted Nodes</th></tr>\n");
+ "<th>Blacklisted Nodes</th>" +
+ "<th>Excluded Nodes</th></tr>\n");
out.print("<tr><td>" + status.getMapTasks() + "</td><td>" +
status.getReduceTasks() + "</td><td>" +
tracker.getTotalSubmissions() +
@@ -62,6 +63,8 @@
"</td><td>" + tasksPerNode +
"</td><td><a href=\"machines.jsp?type=blacklisted\">" +
status.getBlacklistedTrackers() + "</a>" +
+ "</td><td><a href=\"machines.jsp?type=excluded\">" +
+ status.getNumExcludedNodes() + "</a>" +
"</td></tr></table>\n");
out.print("<br>");
Modified: hadoop/core/trunk/src/webapps/job/machines.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/machines.jsp?rev=773542&r1=773541&r2=773542&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/machines.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/machines.jsp Mon May 11 12:39:40 2009
@@ -39,10 +39,13 @@
JobTracker tracker) throws IOException {
Collection c;
if (("blacklisted").equals(type)) {
+ out.println("<h2>Blacklisted Task Trackers</h2>");
c = tracker.blacklistedTaskTrackers();
} else if (("active").equals(type)) {
+ out.println("<h2>Active Task Trackers</h2>");
c = tracker.activeTaskTrackers();
} else {
+ out.println("<h2>Task Trackers</h2>");
c = tracker.taskTrackers();
}
if (c.size() == 0) {
@@ -93,6 +96,26 @@
}
}
+ public void generateTableForExcludedNodes(JspWriter out, JobTracker tracker)
+ throws IOException {
+ // excluded nodes
+ out.println("<h2>Excluded Nodes</h2>");
+ Collection<String> d = tracker.getExcludedNodes();
+ if (d.size() == 0) {
+ out.print("There are currently no excluded hosts.");
+ } else {
+ out.print("<center>\n");
+ out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
+ out.print("<tr>");
+ out.print("<td><b>Host Name</b></td></tr>\n");
+ for (Iterator it = d.iterator(); it.hasNext(); ) {
+ String dt = (String)it.next();
+ out.print("<td>" + dt + "</td></tr>\n");
+ }
+ out.print("</table>\n");
+ out.print("</center>\n");
+ }
+ }
%>
<html>
@@ -102,9 +125,12 @@
<body>
<h1><a href="jobtracker.jsp"><%=trackerName%></a> Hadoop Machine List</h1>
-<h2>Task Trackers</h2>
<%
- generateTaskTrackerTable(out, type, tracker);
+ if (("excluded").equals(type)) {
+ generateTableForExcludedNodes(out, tracker);
+ } else {
+ generateTaskTrackerTable(out, type, tracker);
+ }
%>
<%