You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2014/07/12 04:24:50 UTC
svn commit: r1609878 [2/4] - in
/hadoop/common/branches/YARN-1051/hadoop-common-project: hadoop-auth/
hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/
hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server...
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java Sat Jul 12 02:24:40 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.ftp;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.net.ConnectException;
import java.net.URI;
import org.apache.commons.logging.Log;
@@ -33,11 +34,14 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progressable;
/**
@@ -56,6 +60,12 @@ public class FTPFileSystem extends FileS
public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
+ public static final String FS_FTP_USER_PREFIX = "fs.ftp.user.";
+ public static final String FS_FTP_HOST = "fs.ftp.host";
+ public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
+ public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
+ public static final String E_SAME_DIRECTORY_ONLY =
+ "only same directory renames are supported";
private URI uri;
@@ -75,11 +85,11 @@ public class FTPFileSystem extends FileS
super.initialize(uri, conf);
// get host information from uri (overrides info in conf)
String host = uri.getHost();
- host = (host == null) ? conf.get("fs.ftp.host", null) : host;
+ host = (host == null) ? conf.get(FS_FTP_HOST, null) : host;
if (host == null) {
throw new IOException("Invalid host specified");
}
- conf.set("fs.ftp.host", host);
+ conf.set(FS_FTP_HOST, host);
// get port information from uri, (overrides info in conf)
int port = uri.getPort();
@@ -96,11 +106,11 @@ public class FTPFileSystem extends FileS
}
}
String[] userPasswdInfo = userAndPassword.split(":");
- conf.set("fs.ftp.user." + host, userPasswdInfo[0]);
+ conf.set(FS_FTP_USER_PREFIX + host, userPasswdInfo[0]);
if (userPasswdInfo.length > 1) {
- conf.set("fs.ftp.password." + host, userPasswdInfo[1]);
+ conf.set(FS_FTP_PASSWORD_PREFIX + host, userPasswdInfo[1]);
} else {
- conf.set("fs.ftp.password." + host, null);
+ conf.set(FS_FTP_PASSWORD_PREFIX + host, null);
}
setConf(conf);
this.uri = uri;
@@ -115,23 +125,24 @@ public class FTPFileSystem extends FileS
private FTPClient connect() throws IOException {
FTPClient client = null;
Configuration conf = getConf();
- String host = conf.get("fs.ftp.host");
- int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT);
- String user = conf.get("fs.ftp.user." + host);
- String password = conf.get("fs.ftp.password." + host);
+ String host = conf.get(FS_FTP_HOST);
+ int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
+ String user = conf.get(FS_FTP_USER_PREFIX + host);
+ String password = conf.get(FS_FTP_PASSWORD_PREFIX + host);
client = new FTPClient();
client.connect(host, port);
int reply = client.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
- throw new IOException("Server - " + host
- + " refused connection on port - " + port);
+ throw NetUtils.wrapException(host, port,
+ NetUtils.UNKNOWN_HOST, 0,
+ new ConnectException("Server response " + reply));
} else if (client.login(user, password)) {
client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
client.setFileType(FTP.BINARY_FILE_TYPE);
client.setBufferSize(DEFAULT_BUFFER_SIZE);
} else {
throw new IOException("Login failed on server - " + host + ", port - "
- + port);
+ + port + " as user '" + user + "'");
}
return client;
@@ -179,7 +190,7 @@ public class FTPFileSystem extends FileS
FileStatus fileStat = getFileStatus(client, absolute);
if (fileStat.isDirectory()) {
disconnect(client);
- throw new IOException("Path " + file + " is a directory.");
+ throw new FileNotFoundException("Path " + file + " is a directory.");
}
client.allocate(bufferSize);
Path parent = absolute.getParent();
@@ -214,12 +225,18 @@ public class FTPFileSystem extends FileS
final FTPClient client = connect();
Path workDir = new Path(client.printWorkingDirectory());
Path absolute = makeAbsolute(workDir, file);
- if (exists(client, file)) {
- if (overwrite) {
- delete(client, file);
+ FileStatus status;
+ try {
+ status = getFileStatus(client, file);
+ } catch (FileNotFoundException fnfe) {
+ status = null;
+ }
+ if (status != null) {
+ if (overwrite && !status.isDirectory()) {
+ delete(client, file, false);
} else {
disconnect(client);
- throw new IOException("File already exists: " + file);
+ throw new FileAlreadyExistsException("File already exists: " + file);
}
}
@@ -272,14 +289,13 @@ public class FTPFileSystem extends FileS
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
* the overhead of opening/closing a TCP connection.
+ * @throws IOException on IO problems other than FileNotFoundException
*/
- private boolean exists(FTPClient client, Path file) {
+ private boolean exists(FTPClient client, Path file) throws IOException {
try {
return getFileStatus(client, file) != null;
} catch (FileNotFoundException fnfe) {
return false;
- } catch (IOException ioe) {
- throw new FTPException("Failed to get file status", ioe);
}
}
@@ -294,12 +310,6 @@ public class FTPFileSystem extends FileS
}
}
- /** @deprecated Use delete(Path, boolean) instead */
- @Deprecated
- private boolean delete(FTPClient client, Path file) throws IOException {
- return delete(client, file, false);
- }
-
/**
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
@@ -310,9 +320,14 @@ public class FTPFileSystem extends FileS
Path workDir = new Path(client.printWorkingDirectory());
Path absolute = makeAbsolute(workDir, file);
String pathName = absolute.toUri().getPath();
- FileStatus fileStat = getFileStatus(client, absolute);
- if (fileStat.isFile()) {
- return client.deleteFile(pathName);
+ try {
+ FileStatus fileStat = getFileStatus(client, absolute);
+ if (fileStat.isFile()) {
+ return client.deleteFile(pathName);
+ }
+ } catch (FileNotFoundException e) {
+ //the file is not there
+ return false;
}
FileStatus[] dirEntries = listStatus(client, absolute);
if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
@@ -491,7 +506,7 @@ public class FTPFileSystem extends FileS
created = created && client.makeDirectory(pathName);
}
} else if (isFile(client, absolute)) {
- throw new IOException(String.format(
+ throw new ParentNotDirectoryException(String.format(
"Can't make directory for path %s since it is a file.", absolute));
}
return created;
@@ -528,6 +543,23 @@ public class FTPFileSystem extends FileS
}
/**
+ * Probe for a path being a parent of another
+ * @param parent parent path
+ * @param child possible child path
+ * @return true if the parent's path matches the start of the child's
+ */
+ private boolean isParentOf(Path parent, Path child) {
+ URI parentURI = parent.toUri();
+ String parentPath = parentURI.getPath();
+ if (!parentPath.endsWith("/")) {
+ parentPath += "/";
+ }
+ URI childURI = child.toUri();
+ String childPath = childURI.getPath();
+ return childPath.startsWith(parentPath);
+ }
+
+ /**
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
* the overhead of opening/closing a TCP connection.
@@ -544,20 +576,31 @@ public class FTPFileSystem extends FileS
Path absoluteSrc = makeAbsolute(workDir, src);
Path absoluteDst = makeAbsolute(workDir, dst);
if (!exists(client, absoluteSrc)) {
- throw new IOException("Source path " + src + " does not exist");
+ throw new FileNotFoundException("Source path " + src + " does not exist");
+ }
+ if (isDirectory(absoluteDst)) {
+ // destination is a directory: rename goes underneath it with the
+ // source name
+ absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
}
if (exists(client, absoluteDst)) {
- throw new IOException("Destination path " + dst
- + " already exist, cannot rename!");
+ throw new FileAlreadyExistsException("Destination path " + dst
+ + " already exists");
}
String parentSrc = absoluteSrc.getParent().toUri().toString();
String parentDst = absoluteDst.getParent().toUri().toString();
- String from = src.getName();
- String to = dst.getName();
+ if (isParentOf(absoluteSrc, absoluteDst)) {
+ throw new IOException("Cannot rename " + absoluteSrc + " under itself"
+ + " : "+ absoluteDst);
+ }
+
if (!parentSrc.equals(parentDst)) {
- throw new IOException("Cannot rename parent(source): " + parentSrc
- + ", parent(destination): " + parentDst);
+ throw new IOException("Cannot rename source: " + absoluteSrc
+ + " to " + absoluteDst
+ + " -"+ E_SAME_DIRECTORY_ONLY);
}
+ String from = absoluteSrc.getName();
+ String to = absoluteDst.getName();
client.changeWorkingDirectory(parentSrc);
boolean renamed = client.rename(from, to);
return renamed;
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPInputStream.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPInputStream.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPInputStream.java Sat Jul 12 02:24:40 2014
@@ -103,7 +103,7 @@ public class FTPInputStream extends FSIn
@Override
public synchronized void close() throws IOException {
if (closed) {
- throw new IOException("Stream closed");
+ return;
}
super.close();
closed = true;
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/AclEntry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/AclEntry.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/AclEntry.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/AclEntry.java Sat Jul 12 02:24:40 2014
@@ -278,7 +278,7 @@ public class AclEntry {
}
if (includePermission) {
- if (split.length < index) {
+ if (split.length <= index) {
throw new HadoopIllegalArgumentException("Invalid <aclSpec> : "
+ aclStr);
}
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java Sat Jul 12 02:24:40 2014
@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -226,7 +227,7 @@ public class S3FileSystem extends FileSy
if (overwrite) {
delete(file, true);
} else {
- throw new IOException("File already exists: " + file);
+ throw new FileAlreadyExistsException("File already exists: " + file);
}
} else {
Path parent = file.getParent();
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java Sat Jul 12 02:24:40 2014
@@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.s3nat
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -32,17 +33,19 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.s3.S3Credentials;
import org.apache.hadoop.fs.s3.S3Exception;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.AccessControlException;
import org.jets3t.service.S3Service;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
+import org.jets3t.service.impl.rest.HttpException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.MultipartPart;
import org.jets3t.service.model.MultipartUpload;
@@ -51,6 +54,8 @@ import org.jets3t.service.model.S3Object
import org.jets3t.service.model.StorageObject;
import org.jets3t.service.security.AWSCredentials;
import org.jets3t.service.utils.MultipartUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -66,8 +71,8 @@ class Jets3tNativeFileSystemStore implem
private String serverSideEncryptionAlgorithm;
- public static final Log LOG =
- LogFactory.getLog(Jets3tNativeFileSystemStore.class);
+ public static final Logger LOG =
+ LoggerFactory.getLogger(Jets3tNativeFileSystemStore.class);
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
@@ -79,7 +84,7 @@ class Jets3tNativeFileSystemStore implem
s3Credentials.getSecretAccessKey());
this.s3Service = new RestS3Service(awsCredentials);
} catch (S3ServiceException e) {
- handleS3ServiceException(e);
+ handleException(e);
}
multipartEnabled =
conf.getBoolean("fs.s3n.multipart.uploads.enabled", false);
@@ -115,16 +120,10 @@ class Jets3tNativeFileSystemStore implem
object.setMd5Hash(md5Hash);
}
s3Service.putObject(bucket, object);
- } catch (S3ServiceException e) {
- handleS3ServiceException(e);
+ } catch (ServiceException e) {
+ handleException(e, key);
} finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- // ignore
- }
- }
+ IOUtils.closeStream(in);
}
}
@@ -147,10 +146,8 @@ class Jets3tNativeFileSystemStore implem
try {
mpUtils.uploadObjects(bucket.getName(), s3Service,
objectsToUploadAsMultipart, null);
- } catch (ServiceException e) {
- handleServiceException(e);
} catch (Exception e) {
- throw new S3Exception(e);
+ handleException(e, key);
}
}
@@ -163,8 +160,8 @@ class Jets3tNativeFileSystemStore implem
object.setContentLength(0);
object.setServerSideEncryptionAlgorithm(serverSideEncryptionAlgorithm);
s3Service.putObject(bucket, object);
- } catch (S3ServiceException e) {
- handleS3ServiceException(e);
+ } catch (ServiceException e) {
+ handleException(e, key);
}
}
@@ -172,20 +169,21 @@ class Jets3tNativeFileSystemStore implem
public FileMetadata retrieveMetadata(String key) throws IOException {
StorageObject object = null;
try {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Getting metadata for key: " + key + " from bucket:" + bucket.getName());
- }
+ LOG.debug("Getting metadata for key: {} from bucket: {}",
+ key, bucket.getName());
object = s3Service.getObjectDetails(bucket.getName(), key);
return new FileMetadata(key, object.getContentLength(),
object.getLastModifiedDate().getTime());
} catch (ServiceException e) {
- // Following is brittle. Is there a better way?
- if ("NoSuchKey".equals(e.getErrorCode())) {
- return null; //return null if key not found
+ try {
+ // process
+ handleException(e, key);
+ return null;
+ } catch (FileNotFoundException fnfe) {
+ // and downgrade missing files
+ return null;
}
- handleServiceException(e);
- return null; //never returned - keep compiler happy
} finally {
if (object != null) {
object.closeDataInputStream();
@@ -204,13 +202,12 @@ class Jets3tNativeFileSystemStore implem
@Override
public InputStream retrieve(String key) throws IOException {
try {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Getting key: " + key + " from bucket:" + bucket.getName());
- }
+ LOG.debug("Getting key: {} from bucket: {}",
+ key, bucket.getName());
S3Object object = s3Service.getObject(bucket.getName(), key);
return object.getDataInputStream();
} catch (ServiceException e) {
- handleServiceException(key, e);
+ handleException(e, key);
return null; //return null if key not found
}
}
@@ -228,15 +225,14 @@ class Jets3tNativeFileSystemStore implem
public InputStream retrieve(String key, long byteRangeStart)
throws IOException {
try {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Getting key: " + key + " from bucket:" + bucket.getName() + " with byteRangeStart: " + byteRangeStart);
- }
+ LOG.debug("Getting key: {} from bucket: {} with byteRangeStart: {}",
+ key, bucket.getName(), byteRangeStart);
S3Object object = s3Service.getObject(bucket, key, null, null, null,
null, byteRangeStart, null);
return object.getDataInputStream();
} catch (ServiceException e) {
- handleServiceException(key, e);
- return null; //return null if key not found
+ handleException(e, key);
+ return null;
}
}
@@ -254,17 +250,19 @@ class Jets3tNativeFileSystemStore implem
}
/**
- *
- * @return
- * This method returns null if the list could not be populated
- * due to S3 giving ServiceException
- * @throws IOException
+ * list objects
+ * @param prefix prefix
+ * @param delimiter delimiter
+ * @param maxListingLength max no. of entries
+ * @param priorLastKey last key in any previous search
+ * @return a list of matches
+ * @throws IOException on any reported failure
*/
private PartialListing list(String prefix, String delimiter,
int maxListingLength, String priorLastKey) throws IOException {
try {
- if (prefix.length() > 0 && !prefix.endsWith(PATH_DELIMITER)) {
+ if (!prefix.isEmpty() && !prefix.endsWith(PATH_DELIMITER)) {
prefix += PATH_DELIMITER;
}
StorageObjectsChunk chunk = s3Service.listObjectsChunked(bucket.getName(),
@@ -279,24 +277,20 @@ class Jets3tNativeFileSystemStore implem
}
return new PartialListing(chunk.getPriorLastKey(), fileMetadata,
chunk.getCommonPrefixes());
- } catch (S3ServiceException e) {
- handleS3ServiceException(e);
- return null; //never returned - keep compiler happy
} catch (ServiceException e) {
- handleServiceException(e);
- return null; //return null if list could not be populated
+ handleException(e, prefix);
+ return null; // never returned - keep compiler happy
}
}
@Override
public void delete(String key) throws IOException {
try {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName());
- }
+ LOG.debug("Deleting key: {} from bucket: {}",
+ key, bucket.getName());
s3Service.deleteObject(bucket, key);
} catch (ServiceException e) {
- handleServiceException(key, e);
+ handleException(e, key);
}
}
@@ -304,7 +298,7 @@ class Jets3tNativeFileSystemStore implem
try {
s3Service.renameObject(bucket.getName(), srcKey, new S3Object(dstKey));
} catch (ServiceException e) {
- handleServiceException(e);
+ handleException(e, srcKey);
}
}
@@ -329,7 +323,7 @@ class Jets3tNativeFileSystemStore implem
s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
dstObject, false);
} catch (ServiceException e) {
- handleServiceException(srcKey, e);
+ handleException(e, srcKey);
}
}
@@ -364,19 +358,22 @@ class Jets3tNativeFileSystemStore implem
Collections.reverse(listedParts);
s3Service.multipartCompleteUpload(multipartUpload, listedParts);
} catch (ServiceException e) {
- handleServiceException(e);
+ handleException(e, srcObject.getKey());
}
}
@Override
public void purge(String prefix) throws IOException {
+ String key = "";
try {
- S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null);
+ S3Object[] objects =
+ s3Service.listObjects(bucket.getName(), prefix, null);
for (S3Object object : objects) {
- s3Service.deleteObject(bucket, object.getKey());
+ key = object.getKey();
+ s3Service.deleteObject(bucket, key);
}
} catch (S3ServiceException e) {
- handleS3ServiceException(e);
+ handleException(e, key);
}
}
@@ -390,39 +387,97 @@ class Jets3tNativeFileSystemStore implem
sb.append(object.getKey()).append("\n");
}
} catch (S3ServiceException e) {
- handleS3ServiceException(e);
+ handleException(e);
}
System.out.println(sb);
}
- private void handleServiceException(String key, ServiceException e) throws IOException {
- if ("NoSuchKey".equals(e.getErrorCode())) {
- throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
- } else {
- handleServiceException(e);
- }
+ /**
+ * Handle any service exception by translating it into an IOException
+ * @param e exception
+ * @throws IOException exception -always
+ */
+ private void handleException(Exception e) throws IOException {
+ throw processException(e, e, "");
}
+ /**
+ * Handle any service exception by translating it into an IOException
+ * @param e exception
+ * @param key key sought from object store
- private void handleS3ServiceException(S3ServiceException e) throws IOException {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("S3 Error code: " + e.getS3ErrorCode() + "; S3 Error message: " + e.getS3ErrorMessage());
- }
- throw new S3Exception(e);
- }
+ * @throws IOException exception -always
+ */
+ private void handleException(Exception e, String key) throws IOException {
+ throw processException(e, e, key);
}
- private void handleServiceException(ServiceException e) throws IOException {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Got ServiceException with Error code: " + e.getErrorCode() + ";and Error message: " + e.getErrorMessage());
- }
+ /**
+ * Handle any service exception by translating it into an IOException
+ * @param thrown exception
+ * @param original original exception -thrown if no other translation could
+ * be made
+ * @param key key sought from object store or "" for undefined
+ * @return an exception to throw. If isProcessingCause==true this may be null.
+ */
+ private IOException processException(Throwable thrown, Throwable original,
+ String key) {
+ IOException result;
+ if (thrown.getCause() != null) {
+ // recurse down
+ result = processException(thrown.getCause(), original, key);
+ } else if (thrown instanceof HttpException) {
+ // nested HttpException - examine error code and react
+ HttpException httpException = (HttpException) thrown;
+ String responseMessage = httpException.getResponseMessage();
+ int responseCode = httpException.getResponseCode();
+ String bucketName = "s3n://" + bucket.getName();
+ String text = String.format("%s : %03d : %s",
+ bucketName,
+ responseCode,
+ responseMessage);
+ String filename = !key.isEmpty() ? (bucketName + "/" + key) : text;
+ IOException ioe;
+ switch (responseCode) {
+ case 404:
+ result = new FileNotFoundException(filename);
+ break;
+ case 416: // invalid range
+ result = new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF
+ +": " + filename);
+ break;
+ case 403: //forbidden
+ result = new AccessControlException("Permission denied"
+ +": " + filename);
+ break;
+ default:
+ result = new IOException(text);
+ }
+ result.initCause(thrown);
+ } else if (thrown instanceof S3ServiceException) {
+ S3ServiceException se = (S3ServiceException) thrown;
+ LOG.debug(
+ "S3ServiceException: {}: {} : {}",
+ se.getS3ErrorCode(), se.getS3ErrorMessage(), se, se);
+ if ("InvalidRange".equals(se.getS3ErrorCode())) {
+ result = new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ } else {
+ result = new S3Exception(se);
+ }
+ } else if (thrown instanceof ServiceException) {
+ ServiceException se = (ServiceException) thrown;
+ LOG.debug("S3ServiceException: {}: {} : {}",
+ se.getErrorCode(), se.toString(), se, se);
+ result = new S3Exception(se);
+ } else if (thrown instanceof IOException) {
+ result = (IOException) thrown;
+ } else {
+ // here there is no exception derived yet.
+ // this means no inner cause, and no translation made yet.
+ // convert the original to an IOException -rather than just the
+ // exception at the base of the tree
+ result = new S3Exception(original);
}
+
+ return result;
}
}
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java Sat Jul 12 02:24:40 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3native;
import java.io.BufferedOutputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
@@ -37,15 +38,16 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -55,6 +57,8 @@ import org.apache.hadoop.io.retry.RetryP
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* <p>
@@ -81,8 +85,8 @@ import org.apache.hadoop.util.Progressab
@InterfaceStability.Stable
public class NativeS3FileSystem extends FileSystem {
- public static final Log LOG =
- LogFactory.getLog(NativeS3FileSystem.class);
+ public static final Logger LOG =
+ LoggerFactory.getLogger(NativeS3FileSystem.class);
private static final String FOLDER_SUFFIX = "_$folder$";
static final String PATH_DELIMITER = Path.SEPARATOR;
@@ -97,6 +101,7 @@ public class NativeS3FileSystem extends
private long pos = 0;
public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
+ Preconditions.checkNotNull(in, "Null input stream");
this.store = store;
this.statistics = statistics;
this.in = in;
@@ -105,13 +110,20 @@ public class NativeS3FileSystem extends
@Override
public synchronized int read() throws IOException {
- int result = -1;
+ int result;
try {
result = in.read();
} catch (IOException e) {
- LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
- seek(pos);
- result = in.read();
+ LOG.info("Received IOException while reading '{}', attempting to reopen",
+ key);
+ LOG.debug("{}", e, e);
+ try {
+ seek(pos);
+ result = in.read();
+ } catch (EOFException eof) {
+ LOG.debug("EOF on input stream read: {}", eof, eof);
+ result = -1;
+ }
}
if (result != -1) {
pos++;
@@ -124,12 +136,17 @@ public class NativeS3FileSystem extends
@Override
public synchronized int read(byte[] b, int off, int len)
throws IOException {
-
+ if (in == null) {
+ throw new EOFException("Cannot read closed stream");
+ }
int result = -1;
try {
result = in.read(b, off, len);
+ } catch (EOFException eof) {
+ throw eof;
} catch (IOException e) {
- LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
+ LOG.info( "Received IOException while reading '{}'," +
+ " attempting to reopen.", key);
seek(pos);
result = in.read(b, off, len);
}
@@ -143,17 +160,53 @@ public class NativeS3FileSystem extends
}
@Override
- public void close() throws IOException {
- in.close();
+ public synchronized void close() throws IOException {
+ closeInnerStream();
+ }
+
+ /**
+ * Close the inner stream if not null. Even if an exception
+ * is raised during the close, the field is set to null
+ * @throws IOException if raised by the close() operation.
+ */
+ private void closeInnerStream() throws IOException {
+ if (in != null) {
+ try {
+ in.close();
+ } finally {
+ in = null;
+ }
+ }
+ }
+
+ /**
+ * Update inner stream with a new stream and position
+ * @param newStream new stream -must not be null
+ * @param newpos new position
+ * @throws IOException IO exception on a failure to close the existing
+ * stream.
+ */
+ private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException {
+ Preconditions.checkNotNull(newStream, "Null newstream argument");
+ closeInnerStream();
+ in = newStream;
+ this.pos = newpos;
}
@Override
- public synchronized void seek(long pos) throws IOException {
- in.close();
- LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'");
- in = store.retrieve(key, pos);
- this.pos = pos;
+ public synchronized void seek(long newpos) throws IOException {
+ if (newpos < 0) {
+ throw new EOFException(
+ FSExceptionMessages.NEGATIVE_SEEK);
+ }
+ if (pos != newpos) {
+ // the seek is attempting to move the current position
+ LOG.debug("Opening key '{}' for reading at position '{}", key, newpos);
+ InputStream newStream = store.retrieve(key, newpos);
+ updateInnerStream(newStream, newpos);
+ }
}
+
@Override
public synchronized long getPos() throws IOException {
return pos;
@@ -214,7 +267,7 @@ public class NativeS3FileSystem extends
}
backupStream.close();
- LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
+ LOG.info("OutputStream for key '{}' closed. Now beginning upload", key);
try {
byte[] md5Hash = digest == null ? null : digest.digest();
@@ -226,7 +279,7 @@ public class NativeS3FileSystem extends
super.close();
closed = true;
}
- LOG.info("OutputStream for key '" + key + "' upload complete");
+ LOG.info("OutputStream for key '{}' upload complete", key);
}
@Override
@@ -339,7 +392,7 @@ public class NativeS3FileSystem extends
Progressable progress) throws IOException {
if (exists(f) && !overwrite) {
- throw new IOException("File already exists:"+f);
+ throw new FileAlreadyExistsException("File already exists: " + f);
}
if(LOG.isDebugEnabled()) {
@@ -367,7 +420,7 @@ public class NativeS3FileSystem extends
String key = pathToKey(absolutePath);
if (status.isDirectory()) {
if (!recurse && listStatus(f).length > 0) {
- throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false");
+ throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false");
}
createParent(f);
@@ -538,7 +591,7 @@ public class NativeS3FileSystem extends
try {
FileStatus fileStatus = getFileStatus(f);
if (fileStatus.isFile()) {
- throw new IOException(String.format(
+ throw new FileAlreadyExistsException(String.format(
"Can't make directory for path '%s' since it is a file.", f));
}
@@ -556,7 +609,7 @@ public class NativeS3FileSystem extends
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
if (fs.isDirectory()) {
- throw new IOException("'" + f + "' is a directory");
+ throw new FileNotFoundException("'" + f + "' is a directory");
}
LOG.info("Opening '" + f + "' for reading");
Path absolutePath = makeAbsolute(f);
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java Sat Jul 12 02:24:40 2014
@@ -267,6 +267,9 @@ abstract class CommandWithDestination ex
dst.refreshStatus(); // need to update stat to know it exists now
}
super.recursePath(src);
+ if (dst.stat.isDirectory()) {
+ preserveAttributes(src, dst);
+ }
} finally {
dst = savedDst;
}
@@ -298,44 +301,7 @@ abstract class CommandWithDestination ex
try {
in = src.fs.open(src.path);
copyStreamToTarget(in, target);
- if (shouldPreserve(FileAttribute.TIMESTAMPS)) {
- target.fs.setTimes(
- target.path,
- src.stat.getModificationTime(),
- src.stat.getAccessTime());
- }
- if (shouldPreserve(FileAttribute.OWNERSHIP)) {
- target.fs.setOwner(
- target.path,
- src.stat.getOwner(),
- src.stat.getGroup());
- }
- if (shouldPreserve(FileAttribute.PERMISSION) ||
- shouldPreserve(FileAttribute.ACL)) {
- target.fs.setPermission(
- target.path,
- src.stat.getPermission());
- }
- if (shouldPreserve(FileAttribute.ACL)) {
- FsPermission perm = src.stat.getPermission();
- if (perm.getAclBit()) {
- List<AclEntry> srcEntries =
- src.fs.getAclStatus(src.path).getEntries();
- List<AclEntry> srcFullEntries =
- AclUtil.getAclFromPermAndEntries(perm, srcEntries);
- target.fs.setAcl(target.path, srcFullEntries);
- }
- }
- if (shouldPreserve(FileAttribute.XATTR)) {
- Map<String, byte[]> srcXAttrs = src.fs.getXAttrs(src.path);
- if (srcXAttrs != null) {
- Iterator<Entry<String, byte[]>> iter = srcXAttrs.entrySet().iterator();
- while (iter.hasNext()) {
- Entry<String, byte[]> entry = iter.next();
- target.fs.setXAttr(target.path, entry.getKey(), entry.getValue());
- }
- }
- }
+ preserveAttributes(src, target);
} finally {
IOUtils.closeStream(in);
}
@@ -365,6 +331,56 @@ abstract class CommandWithDestination ex
}
}
+ /**
+ * Preserve the attributes of the source to the target.
+ * The method calls {@link #shouldPreserve(FileAttribute)} to check what
+ * attribute to preserve.
+ * @param src source to preserve
+ * @param target where to preserve attributes
+ * @throws IOException if fails to preserve attributes
+ */
+ protected void preserveAttributes(PathData src, PathData target)
+ throws IOException {
+ if (shouldPreserve(FileAttribute.TIMESTAMPS)) {
+ target.fs.setTimes(
+ target.path,
+ src.stat.getModificationTime(),
+ src.stat.getAccessTime());
+ }
+ if (shouldPreserve(FileAttribute.OWNERSHIP)) {
+ target.fs.setOwner(
+ target.path,
+ src.stat.getOwner(),
+ src.stat.getGroup());
+ }
+ if (shouldPreserve(FileAttribute.PERMISSION) ||
+ shouldPreserve(FileAttribute.ACL)) {
+ target.fs.setPermission(
+ target.path,
+ src.stat.getPermission());
+ }
+ if (shouldPreserve(FileAttribute.ACL)) {
+ FsPermission perm = src.stat.getPermission();
+ if (perm.getAclBit()) {
+ List<AclEntry> srcEntries =
+ src.fs.getAclStatus(src.path).getEntries();
+ List<AclEntry> srcFullEntries =
+ AclUtil.getAclFromPermAndEntries(perm, srcEntries);
+ target.fs.setAcl(target.path, srcFullEntries);
+ }
+ }
+ if (shouldPreserve(FileAttribute.XATTR)) {
+ Map<String, byte[]> srcXAttrs = src.fs.getXAttrs(src.path);
+ if (srcXAttrs != null) {
+ Iterator<Entry<String, byte[]>> iter = srcXAttrs.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<String, byte[]> entry = iter.next();
+ target.fs.setXAttr(target.path, entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+
// Helper filter filesystem that registers created files as temp files to
// be deleted on exit unless successfully renamed
private static class TargetFileSystem extends FilterFileSystem {
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java Sat Jul 12 02:24:40 2014
@@ -19,21 +19,16 @@
package org.apache.hadoop.fs.shell;
import java.io.IOException;
-import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
-import java.util.Set;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import com.google.common.collect.Sets;
-
/**
* Get a listing of all files in that match the file patterns.
*/
@@ -70,7 +65,6 @@ class Ls extends FsCommand {
protected boolean dirRecurse;
protected boolean humanReadable = false;
- private Set<URI> aclNotSupportedFsSet = Sets.newHashSet();
protected String formatSize(long size) {
return humanReadable
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java Sat Jul 12 02:24:40 2014
@@ -104,6 +104,9 @@ class MoveCommands {
throw new PathIOException(src.toString(),
"Does not match target filesystem");
}
+ if (target.exists) {
+ throw new PathExistsException(target.toString());
+ }
if (!target.fs.rename(src.path, target.path)) {
// we have no way to know the actual error...
throw new PathIOException(src.toString());
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java Sat Jul 12 02:24:40 2014
@@ -19,12 +19,13 @@
package org.apache.hadoop.io.compress.zlib;
import java.io.IOException;
+import java.util.zip.Checksum;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
-import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DoNotPool;
+import org.apache.hadoop.util.DataChecksum;
/**
* A {@link Decompressor} based on the popular gzip compressed file format.
@@ -54,7 +55,7 @@ public class BuiltInGzipDecompressor imp
private int headerBytesRead = 0;
private int trailerBytesRead = 0;
private int numExtraFieldBytesRemaining = -1;
- private PureJavaCrc32 crc = new PureJavaCrc32();
+ private Checksum crc = DataChecksum.newCrc32();
private boolean hasExtraField = false;
private boolean hasFilename = false;
private boolean hasComment = false;
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java Sat Jul 12 02:24:40 2014
@@ -292,8 +292,6 @@ public class NativeIO {
static native void mlock_native(
ByteBuffer buffer, long len) throws NativeIOException;
- static native void munlock_native(
- ByteBuffer buffer, long len) throws NativeIOException;
/**
* Locks the provided direct ByteBuffer into memory, preventing it from
@@ -312,23 +310,6 @@ public class NativeIO {
}
mlock_native(buffer, len);
}
-
- /**
- * Unlocks a locked direct ByteBuffer, allowing it to swap out of memory.
- * This is a no-op if the ByteBuffer was not previously locked.
- *
- * See the munlock(2) man page for more information.
- *
- * @throws NativeIOException
- */
- public static void munlock(ByteBuffer buffer, long len)
- throws IOException {
- assertCodeLoaded();
- if (!buffer.isDirect()) {
- throw new IOException("Cannot munlock a non-direct ByteBuffer");
- }
- munlock_native(buffer, len);
- }
/**
* Unmaps the block from memory. See munmap(2).
@@ -570,6 +551,19 @@ public class NativeIO {
return access0(path, desiredAccess.accessRight());
}
+ /**
+ * Extends both the minimum and maximum working set size of the current
+ * process. This method gets the current minimum and maximum working set
+ * size, adds the requested amount to each and then sets the minimum and
+ * maximum working set size to the new values. Controlling the working set
+ * size of the process also controls the amount of memory it can lock.
+ *
+ * @param delta amount to increment minimum and maximum working set size
+ * @throws IOException for any error
+ * @see POSIX#mlock(ByteBuffer, long)
+ */
+ public static native void extendWorkingSetSize(long delta) throws IOException;
+
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
try {
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Sat Jul 12 02:24:40 2014
@@ -379,6 +379,7 @@ public class Client {
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private final RetryPolicy connectionRetryPolicy;
+ private final int maxRetriesOnSasl;
private int maxRetriesOnSocketTimeouts;
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private boolean doPing; //do we need to send ping message
@@ -406,6 +407,7 @@ public class Client {
this.rpcTimeout = remoteId.getRpcTimeout();
this.maxIdleTime = remoteId.getMaxIdleTime();
this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
+ this.maxRetriesOnSasl = remoteId.getMaxRetriesOnSasl();
this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
this.tcpNoDelay = remoteId.getTcpNoDelay();
this.doPing = remoteId.getDoPing();
@@ -693,7 +695,6 @@ public class Client {
LOG.debug("Connecting to "+server);
}
short numRetries = 0;
- final short MAX_RETRIES = 5;
Random rand = null;
while (true) {
setupConnection();
@@ -721,8 +722,8 @@ public class Client {
if (rand == null) {
rand = new Random();
}
- handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
- ticket);
+ handleSaslConnectionFailure(numRetries++, maxRetriesOnSasl, ex,
+ rand, ticket);
continue;
}
if (authMethod != AuthMethod.SIMPLE) {
@@ -1478,6 +1479,7 @@ public class Client {
private final int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private final RetryPolicy connectionRetryPolicy;
+ private final int maxRetriesOnSasl;
// the max. no. of retries for socket connections on time out exceptions
private final int maxRetriesOnSocketTimeouts;
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
@@ -1498,6 +1500,9 @@ public class Client {
this.maxIdleTime = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
+ this.maxRetriesOnSasl = conf.getInt(
+ CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT);
this.maxRetriesOnSocketTimeouts = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
@@ -1531,6 +1536,10 @@ public class Client {
return maxIdleTime;
}
+ public int getMaxRetriesOnSasl() {
+ return maxRetriesOnSasl;
+ }
+
/** max connection retries on socket time outs */
public int getMaxRetriesOnSocketTimeouts() {
return maxRetriesOnSocketTimeouts;
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java Sat Jul 12 02:24:40 2014
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
import javax.management.ReflectionException;
import static com.google.common.base.Preconditions.*;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -226,7 +227,13 @@ class MetricsSourceAdapter implements Dy
mbeanName = null;
}
}
+
+ @VisibleForTesting
+ ObjectName getMBeanName() {
+ return mbeanName;
+ }
+
private void updateInfoCache() {
LOG.debug("Updating info cache...");
infoCache = infoBuilder.reset(lastRecs).get();
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Sat Jul 12 02:24:40 2014
@@ -32,6 +32,7 @@ import javax.management.ObjectName;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Locale;
import static com.google.common.base.Preconditions.*;
@@ -573,6 +574,11 @@ public class MetricsSystemImpl extends M
return allSources.get(name);
}
+ @VisibleForTesting
+ MetricsSourceAdapter getSourceAdapter(String name) {
+ return sources.get(name);
+ }
+
private InitMode initMode() {
LOG.debug("from system property: "+ System.getProperty(MS_INIT_MODE_KEY));
LOG.debug("from environment variable: "+ System.getenv(MS_INIT_MODE_KEY));
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java Sat Jul 12 02:24:40 2014
@@ -111,6 +111,11 @@ public enum DefaultMetricsSystem {
}
@InterfaceAudience.Private
+ public static void removeMBeanName(ObjectName name) {
+ INSTANCE.removeObjectName(name.toString());
+ }
+
+ @InterfaceAudience.Private
public static String sourceName(String name, boolean dupOK) {
return INSTANCE.newSourceName(name, dupOK);
}
@@ -126,6 +131,10 @@ public enum DefaultMetricsSystem {
}
}
+ synchronized void removeObjectName(String name) {
+ mBeanNames.map.remove(name);
+ }
+
synchronized String newSourceName(String name, boolean dupOK) {
if (sourceNames.map.containsKey(name)) {
if (dupOK) {
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java Sat Jul 12 02:24:40 2014
@@ -18,13 +18,18 @@
package org.apache.hadoop.metrics2.sink;
+import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
+import java.io.Closeable;
import java.net.Socket;
import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsRecord;
@@ -36,16 +41,14 @@ import org.apache.hadoop.metrics2.Metric
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public class GraphiteSink implements MetricsSink {
+public class GraphiteSink implements MetricsSink, Closeable {
+ private static final Log LOG = LogFactory.getLog(GraphiteSink.class);
private static final String SERVER_HOST_KEY = "server_host";
private static final String SERVER_PORT_KEY = "server_port";
private static final String METRICS_PREFIX = "metrics_prefix";
private Writer writer = null;
private String metricsPrefix = null;
-
- public void setWriter(Writer writer) {
- this.writer = writer;
- }
+ private Socket socket = null;
@Override
public void init(SubsetConfiguration conf) {
@@ -60,8 +63,8 @@ public class GraphiteSink implements Met
try {
// Open an connection to Graphite server.
- Socket socket = new Socket(serverHost, serverPort);
- setWriter(new OutputStreamWriter(socket.getOutputStream()));
+ socket = new Socket(serverHost, serverPort);
+ writer = new OutputStreamWriter(socket.getOutputStream());
} catch (Exception e) {
throw new MetricsException("Error creating connection, "
+ serverHost + ":" + serverPort, e);
@@ -99,7 +102,11 @@ public class GraphiteSink implements Met
}
try {
- writer.write(lines.toString());
+ if(writer != null){
+ writer.write(lines.toString());
+ } else {
+ throw new MetricsException("Writer in GraphiteSink is null!");
+ }
} catch (Exception e) {
throw new MetricsException("Error sending metrics", e);
}
@@ -113,4 +120,21 @@ public class GraphiteSink implements Met
throw new MetricsException("Error flushing metrics", e);
}
}
+
+ @Override
+ public void close() throws IOException {
+ try {
+ IOUtils.closeStream(writer);
+ writer = null;
+ LOG.info("writer in GraphiteSink is closed!");
+ } catch (Throwable e){
+ throw new MetricsException("Error closing writer", e);
+ } finally {
+ if (socket != null && !socket.isClosed()) {
+ socket.close();
+ socket = null;
+ LOG.info("socket in GraphiteSink is closed!");
+ }
+ }
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetrics.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetrics.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetrics.java Sat Jul 12 02:24:40 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.lib.De
import org.apache.hadoop.metrics2.lib.Interns;
import static org.apache.hadoop.metrics2.source.JvmMetricsInfo.*;
import static org.apache.hadoop.metrics2.impl.MsInfo.*;
+import org.apache.hadoop.util.JvmPauseMonitor;
/**
* JVM and logging related metrics.
@@ -65,6 +66,7 @@ public class JvmMetrics implements Metri
ManagementFactory.getGarbageCollectorMXBeans();
final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
final String processName, sessionId;
+ private JvmPauseMonitor pauseMonitor = null;
final ConcurrentHashMap<String, MetricsInfo[]> gcInfoCache =
new ConcurrentHashMap<String, MetricsInfo[]>();
@@ -73,6 +75,10 @@ public class JvmMetrics implements Metri
this.sessionId = sessionId;
}
+ public void setPauseMonitor(final JvmPauseMonitor pauseMonitor) {
+ this.pauseMonitor = pauseMonitor;
+ }
+
public static JvmMetrics create(String processName, String sessionId,
MetricsSystem ms) {
return ms.register(JvmMetrics.name(), JvmMetrics.description(),
@@ -120,6 +126,15 @@ public class JvmMetrics implements Metri
}
rb.addCounter(GcCount, count)
.addCounter(GcTimeMillis, timeMillis);
+
+ if (pauseMonitor != null) {
+ rb.addCounter(GcNumWarnThresholdExceeded,
+ pauseMonitor.getNumGcWarnThreadholdExceeded());
+ rb.addCounter(GcNumInfoThresholdExceeded,
+ pauseMonitor.getNumGcInfoThresholdExceeded());
+ rb.addCounter(GcTotalExtraSleepTime,
+ pauseMonitor.getTotalGcExtraSleepTime());
+ }
}
private MetricsInfo[] getGcInfo(String gcName) {
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetricsInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetricsInfo.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetricsInfo.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetricsInfo.java Sat Jul 12 02:24:40 2014
@@ -48,7 +48,10 @@ public enum JvmMetricsInfo implements Me
LogFatal("Total number of fatal log events"),
LogError("Total number of error log events"),
LogWarn("Total number of warning log events"),
- LogInfo("Total number of info log events");
+ LogInfo("Total number of info log events"),
+ GcNumWarnThresholdExceeded("Number of times that the GC warn threshold is exceeded"),
+ GcNumInfoThresholdExceeded("Number of times that the GC info threshold is exceeded"),
+ GcTotalExtraSleepTime("Total GC extra sleep time in milliseconds");
private final String desc;
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/MBeans.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/MBeans.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/MBeans.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/MBeans.java Sat Jul 12 02:24:40 2014
@@ -84,6 +84,7 @@ public class MBeans {
} catch (Exception e) {
LOG.warn("Error unregistering "+ mbeanName, e);
}
+ DefaultMetricsSystem.removeMBeanName(mbeanName);
}
static private ObjectName getMBeanName(String serviceName, String nameName) {
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/GroupMappingServiceProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/GroupMappingServiceProvider.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/GroupMappingServiceProvider.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/GroupMappingServiceProvider.java Sat Jul 12 02:24:40 2014
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
/**
* An interface for the implementation of a user-to-groups mapping service
@@ -30,6 +31,7 @@ import org.apache.hadoop.classification.
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface GroupMappingServiceProvider {
+ public static final String GROUP_MAPPING_CONFIG_PREFIX = CommonConfigurationKeysPublic.HADOOP_SECURITY_GROUP_MAPPING;
/**
* Get all various group memberships of a given user.
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/AccessControlList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/AccessControlList.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/AccessControlList.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/AccessControlList.java Sat Jul 12 02:24:40 2014
@@ -20,22 +20,21 @@ package org.apache.hadoop.security.autho
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.Arrays;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedList;
-import java.util.ListIterator;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.Groups;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
/**
* Class representing a configured access control list.
@@ -58,9 +57,9 @@ public class AccessControlList implement
private static final int INITIAL_CAPACITY = 256;
// Set of users who are granted access.
- private Set<String> users;
+ private Collection<String> users;
// Set of groups which are granted access
- private Set<String> groups;
+ private Collection<String> groups;
// Whether all users are granted access.
private boolean allAllowed;
@@ -82,37 +81,44 @@ public class AccessControlList implement
* @param aclString String representation of the ACL
*/
public AccessControlList(String aclString) {
- buildACL(aclString);
+ buildACL(aclString.split(" ", 2));
+ }
+
+ /**
+ * Construct a new ACL from String representation of users and groups
+ *
+ * The arguments are comma separated lists
+ *
+ * @param users comma separated list of users
+ * @param groups comma separated list of groups
+ */
+ public AccessControlList(String users, String groups) {
+ buildACL(new String[] {users, groups});
}
/**
- * Build ACL from the given string, format of the string is
- * user1,...,userN group1,...,groupN
+ * Build ACL from the given two Strings.
+ * The Strings contain comma separated values.
*
- * @param aclString build ACL from this string
+ * @param aclString build ACL from array of Strings
*/
- private void buildACL(String aclString) {
- users = new TreeSet<String>();
- groups = new TreeSet<String>();
- if (isWildCardACLValue(aclString)) {
- allAllowed = true;
- } else {
- allAllowed = false;
- String[] userGroupStrings = aclString.split(" ", 2);
-
- if (userGroupStrings.length >= 1) {
- List<String> usersList = new LinkedList<String>(
- Arrays.asList(userGroupStrings[0].split(",")));
- cleanupList(usersList);
- addToSet(users, usersList);
+ private void buildACL(String[] userGroupStrings) {
+ users = new HashSet<String>();
+ groups = new HashSet<String>();
+ for (String aclPart : userGroupStrings) {
+ if (aclPart != null && isWildCardACLValue(aclPart)) {
+ allAllowed = true;
+ break;
}
+ }
+ if (!allAllowed) {
+ if (userGroupStrings.length >= 1 && userGroupStrings[0] != null) {
+ users = StringUtils.getTrimmedStringCollection(userGroupStrings[0]);
+ }
- if (userGroupStrings.length == 2) {
- List<String> groupsList = new LinkedList<String>(
- Arrays.asList(userGroupStrings[1].split(",")));
- cleanupList(groupsList);
- addToSet(groups, groupsList);
- groupsMapping.cacheGroupsAdd(groupsList);
+ if (userGroupStrings.length == 2 && userGroupStrings[1] != null) {
+ groups = StringUtils.getTrimmedStringCollection(userGroupStrings[1]);
+ groupsMapping.cacheGroupsAdd(new LinkedList<String>(groups));
}
}
}
@@ -203,7 +209,7 @@ public class AccessControlList implement
* Get the names of users allowed for this service.
* @return the set of user names. the set must not be modified.
*/
- Set<String> getUsers() {
+ Collection<String> getUsers() {
return users;
}
@@ -211,7 +217,7 @@ public class AccessControlList implement
* Get the names of user groups allowed for this service.
* @return the set of group names. the set must not be modified.
*/
- Set<String> getGroups() {
+ Collection<String> getGroups() {
return groups;
}
@@ -229,36 +235,6 @@ public class AccessControlList implement
}
/**
- * Cleanup list, remove empty strings, trim leading/trailing spaces
- *
- * @param list clean this list
- */
- private static final void cleanupList(List<String> list) {
- ListIterator<String> i = list.listIterator();
- while(i.hasNext()) {
- String s = i.next();
- if(s.length() == 0) {
- i.remove();
- } else {
- s = s.trim();
- i.set(s);
- }
- }
- }
-
- /**
- * Add list to a set
- *
- * @param set add list to this set
- * @param list add items of this list to the set
- */
- private static final void addToSet(Set<String> set, List<String> list) {
- for(String s : list) {
- set.add(s);
- }
- }
-
- /**
* Returns descriptive way of users and groups that are part of this ACL.
* Use {@link #getAclString()} to get the exact String that can be given to
* the constructor of AccessControlList to create a new instance.
@@ -331,7 +307,7 @@ public class AccessControlList implement
@Override
public void readFields(DataInput in) throws IOException {
String aclString = Text.readString(in);
- buildACL(aclString);
+ buildACL(aclString.split(" ", 2));
}
/**
@@ -358,7 +334,7 @@ public class AccessControlList implement
*
* @param strings set of strings to concatenate
*/
- private String getString(Set<String> strings) {
+ private String getString(Collection<String> strings) {
StringBuilder sb = new StringBuilder(INITIAL_CAPACITY);
boolean first = true;
for(String str: strings) {
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java Sat Jul 12 02:24:40 2014
@@ -18,18 +18,15 @@
package org.apache.hadoop.security.authorize;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.MachineList;
import com.google.common.annotations.VisibleForTesting;
@@ -39,44 +36,39 @@ public class DefaultImpersonationProvide
private static final String CONF_GROUPS = ".groups";
private static final String CONF_HADOOP_PROXYUSER = "hadoop.proxyuser.";
private static final String CONF_HADOOP_PROXYUSER_RE = "hadoop\\.proxyuser\\.";
- // list of users, groups and hosts per proxyuser
- private Map<String, Collection<String>> proxyUsers =
- new HashMap<String, Collection<String>>();
- private Map<String, Collection<String>> proxyGroups =
- new HashMap<String, Collection<String>>();
- private Map<String, Collection<String>> proxyHosts =
- new HashMap<String, Collection<String>>();
+ private static final String CONF_HADOOP_PROXYUSER_RE_USERS_GROUPS =
+ CONF_HADOOP_PROXYUSER_RE+"[^.]*(" + Pattern.quote(CONF_USERS) +
+ "|" + Pattern.quote(CONF_GROUPS) + ")";
+ private static final String CONF_HADOOP_PROXYUSER_RE_HOSTS =
+ CONF_HADOOP_PROXYUSER_RE+"[^.]*"+ Pattern.quote(CONF_HOSTS);
+ // acl and list of hosts per proxyuser
+ private Map<String, AccessControlList> proxyUserAcl =
+ new HashMap<String, AccessControlList>();
+ private static Map<String, MachineList> proxyHosts =
+ new HashMap<String, MachineList>();
private Configuration conf;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
- // get all the new keys for users
- String regex = CONF_HADOOP_PROXYUSER_RE+"[^.]*\\"+CONF_USERS;
- Map<String,String> allMatchKeys = conf.getValByRegex(regex);
+ // get list of users and groups per proxyuser
+ Map<String,String> allMatchKeys =
+ conf.getValByRegex(CONF_HADOOP_PROXYUSER_RE_USERS_GROUPS);
for(Entry<String, String> entry : allMatchKeys.entrySet()) {
- Collection<String> users = StringUtils.getTrimmedStringCollection(entry.getValue());
- proxyUsers.put(entry.getKey(), users);
- }
-
- // get all the new keys for groups
- regex = CONF_HADOOP_PROXYUSER_RE+"[^.]*\\"+CONF_GROUPS;
- allMatchKeys = conf.getValByRegex(regex);
- for(Entry<String, String> entry : allMatchKeys.entrySet()) {
- Collection<String> groups = StringUtils.getTrimmedStringCollection(entry.getValue());
- proxyGroups.put(entry.getKey(), groups);
- //cache the groups. This is needed for NetGroups
- Groups.getUserToGroupsMappingService(conf).cacheGroupsAdd(
- new ArrayList<String>(groups));
+ String aclKey = getAclKey(entry.getKey());
+ if (!proxyUserAcl.containsKey(aclKey)) {
+ proxyUserAcl.put(aclKey, new AccessControlList(
+ allMatchKeys.get(aclKey + CONF_USERS) ,
+ allMatchKeys.get(aclKey + CONF_GROUPS)));
+ }
}
- // now hosts
- regex = CONF_HADOOP_PROXYUSER_RE+"[^.]*\\"+CONF_HOSTS;
- allMatchKeys = conf.getValByRegex(regex);
+ // get hosts per proxyuser
+ allMatchKeys = conf.getValByRegex(CONF_HADOOP_PROXYUSER_RE_HOSTS);
for(Entry<String, String> entry : allMatchKeys.entrySet()) {
proxyHosts.put(entry.getKey(),
- StringUtils.getTrimmedStringCollection(entry.getValue()));
+ new MachineList(entry.getValue()));
}
}
@@ -88,79 +80,34 @@ public class DefaultImpersonationProvide
@Override
public void authorize(UserGroupInformation user,
String remoteAddress) throws AuthorizationException {
-
- if (user.getRealUser() == null) {
+
+ UserGroupInformation realUser = user.getRealUser();
+ if (realUser == null) {
return;
}
- boolean userAuthorized = false;
- boolean ipAuthorized = false;
- UserGroupInformation superUser = user.getRealUser();
-
- Collection<String> allowedUsers = proxyUsers.get(
- getProxySuperuserUserConfKey(superUser.getShortUserName()));
-
- if (isWildcardList(allowedUsers)) {
- userAuthorized = true;
- } else if (allowedUsers != null && !allowedUsers.isEmpty()) {
- if (allowedUsers.contains(user.getShortUserName())) {
- userAuthorized = true;
- }
- }
-
- if (!userAuthorized){
- Collection<String> allowedUserGroups = proxyGroups.get(
- getProxySuperuserGroupConfKey(superUser.getShortUserName()));
-
- if (isWildcardList(allowedUserGroups)) {
- userAuthorized = true;
- } else if (allowedUserGroups != null && !allowedUserGroups.isEmpty()) {
- for (String group : user.getGroupNames()) {
- if (allowedUserGroups.contains(group)) {
- userAuthorized = true;
- break;
- }
- }
- }
-
- if (!userAuthorized) {
- throw new AuthorizationException("User: " + superUser.getUserName()
- + " is not allowed to impersonate " + user.getUserName());
- }
+
+ AccessControlList acl = proxyUserAcl.get(
+ CONF_HADOOP_PROXYUSER+realUser.getShortUserName());
+ if (acl == null || !acl.isUserAllowed(user)) {
+ throw new AuthorizationException("User: " + realUser.getUserName()
+ + " is not allowed to impersonate " + user.getUserName());
}
- Collection<String> ipList = proxyHosts.get(
- getProxySuperuserIpConfKey(superUser.getShortUserName()));
+ MachineList MachineList = proxyHosts.get(
+ getProxySuperuserIpConfKey(realUser.getShortUserName()));
- if (isWildcardList(ipList)) {
- ipAuthorized = true;
- } else if (ipList != null && !ipList.isEmpty()) {
- for (String allowedHost : ipList) {
- InetAddress hostAddr;
- try {
- hostAddr = InetAddress.getByName(allowedHost);
- } catch (UnknownHostException e) {
- continue;
- }
- if (hostAddr.getHostAddress().equals(remoteAddress)) {
- // Authorization is successful
- ipAuthorized = true;
- }
- }
- }
- if(!ipAuthorized) {
+ if(!MachineList.includes(remoteAddress)) {
throw new AuthorizationException("Unauthorized connection for super-user: "
- + superUser.getUserName() + " from IP " + remoteAddress);
+ + realUser.getUserName() + " from IP " + remoteAddress);
}
}
-
- /**
- * Return true if the configuration specifies the special configuration value
- * "*", indicating that any group or host list is allowed to use this configuration.
- */
- private boolean isWildcardList(Collection<String> list) {
- return (list != null) &&
- (list.size() == 1) &&
- (list.contains("*"));
+
+ private String getAclKey(String key) {
+ int endIndex = key.lastIndexOf(".");
+ if (endIndex != -1) {
+ return key.substring(0, endIndex);
+ }
+ return key;
}
/**
@@ -194,17 +141,22 @@ public class DefaultImpersonationProvide
}
@VisibleForTesting
- public Map<String, Collection<String>> getProxyUsers() {
- return proxyUsers;
- }
-
- @VisibleForTesting
public Map<String, Collection<String>> getProxyGroups() {
- return proxyGroups;
+ Map<String,Collection<String>> proxyGroups = new HashMap<String,Collection<String>>();
+ for(Entry<String, AccessControlList> entry : proxyUserAcl.entrySet()) {
+ proxyGroups.put(entry.getKey() + CONF_GROUPS, entry.getValue().getGroups());
+ }
+ return proxyGroups;
}
@VisibleForTesting
public Map<String, Collection<String>> getProxyHosts() {
- return proxyHosts;
+ Map<String, Collection<String>> tmpProxyHosts =
+ new HashMap<String, Collection<String>>();
+ for (Map.Entry<String, MachineList> proxyHostEntry :proxyHosts.entrySet()) {
+ tmpProxyHosts.put(proxyHostEntry.getKey(),
+ proxyHostEntry.getValue().getCollection());
+ }
+ return tmpProxyHosts;
}
}
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java Sat Jul 12 02:24:40 2014
@@ -45,7 +45,7 @@ import com.google.common.annotations.Vis
public class ServiceAuthorizationManager {
private static final String HADOOP_POLICY_FILE = "hadoop-policy.xml";
- private Map<Class<?>, AccessControlList> protocolToAcl =
+ private volatile Map<Class<?>, AccessControlList> protocolToAcl =
new IdentityHashMap<Class<?>, AccessControlList>();
/**
@@ -114,7 +114,7 @@ public class ServiceAuthorizationManager
AUDITLOG.info(AUTHZ_SUCCESSFUL_FOR + user + " for protocol="+protocol);
}
- public synchronized void refresh(Configuration conf,
+ public void refresh(Configuration conf,
PolicyProvider provider) {
// Get the system property 'hadoop.policy.file'
String policyFile =
@@ -127,10 +127,14 @@ public class ServiceAuthorizationManager
}
@Private
- public synchronized void refreshWithLoadedConfiguration(Configuration conf,
+ public void refreshWithLoadedConfiguration(Configuration conf,
PolicyProvider provider) {
final Map<Class<?>, AccessControlList> newAcls =
new IdentityHashMap<Class<?>, AccessControlList>();
+
+ String defaultAcl = conf.get(
+ CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_DEFAULT_ACL,
+ AccessControlList.WILDCARD_ACL_VALUE);
// Parse the config file
Service[] services = provider.getServices();
@@ -139,7 +143,7 @@ public class ServiceAuthorizationManager
AccessControlList acl =
new AccessControlList(
conf.get(service.getServiceKey(),
- AccessControlList.WILDCARD_ACL_VALUE)
+ defaultAcl)
);
newAcls.put(service.getProtocol(), acl);
}
Modified: hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java Sat Jul 12 02:24:40 2014
@@ -22,6 +22,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -29,7 +30,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.fs.ChecksumException;
/**
- * This class provides inteface and utilities for processing checksums for
+ * This class provides interface and utilities for processing checksums for
* DFS data transfers.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@@ -72,6 +73,13 @@ public class DataChecksum implements Che
}
}
+ /**
+ * Create a Crc32 Checksum object. The implementation of the Crc32 algorithm
+ * is chosen depending on the platform.
+ */
+ public static Checksum newCrc32() {
+ return Shell.isJava7OrAbove()? new CRC32(): new PureJavaCrc32();
+ }
public static DataChecksum newDataChecksum(Type type, int bytesPerChecksum ) {
if ( bytesPerChecksum <= 0 ) {
@@ -82,7 +90,7 @@ public class DataChecksum implements Che
case NULL :
return new DataChecksum(type, new ChecksumNull(), bytesPerChecksum );
case CRC32 :
- return new DataChecksum(type, new PureJavaCrc32(), bytesPerChecksum );
+ return new DataChecksum(type, newCrc32(), bytesPerChecksum );
case CRC32C:
return new DataChecksum(type, new PureJavaCrc32C(), bytesPerChecksum);
default: