You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2013/08/13 23:19:57 UTC
svn commit: r1513658 [3/4] - in
/hadoop/common/branches/HDFS-4949/hadoop-common-project:
hadoop-auth-examples/src/main/webapp/
hadoop-auth-examples/src/main/webapp/annonymous/
hadoop-auth-examples/src/main/webapp/kerberos/
hadoop-auth-examples/src/main...
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/docs/releasenotes.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1509426-1512447
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java Tue Aug 13 21:19:53 2013
@@ -96,7 +96,7 @@ public class CommonConfigurationKeys ext
public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT =
256 * 1024;
- /** Internal buffer size for Snappy compressor/decompressors */
+ /** Internal buffer size for Lz4 compressor/decompressors */
public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY =
"io.compression.codec.lz4.buffersize";
@@ -104,6 +104,14 @@ public class CommonConfigurationKeys ext
public static final int IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT =
256 * 1024;
+ /** Use lz4hc(slow but with high compression ratio) for lz4 compression */
+ public static final String IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY =
+ "io.compression.codec.lz4.use.lz4hc";
+
+ /** Default value for IO_COMPRESSION_CODEC_USELZ4HC_KEY */
+ public static final boolean IO_COMPRESSION_CODEC_LZ4_USELZ4HC_DEFAULT =
+ false;
+
/**
* Service Authorization
*/
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java Tue Aug 13 21:19:53 2013
@@ -258,7 +258,7 @@ public final class FileContext {
* Hence this method is not called makeAbsolute() and
* has been deliberately declared private.
*/
- private Path fixRelativePart(Path p) {
+ Path fixRelativePart(Path p) {
if (p.isUriPathAbsolute()) {
return p;
} else {
@@ -1905,7 +1905,7 @@ public final class FileContext {
public FileStatus[] globStatus(Path pathPattern)
throws AccessControlException, UnsupportedFileSystemException,
IOException {
- return globStatus(pathPattern, DEFAULT_FILTER);
+ return new Globber(FileContext.this, pathPattern, DEFAULT_FILTER).glob();
}
/**
@@ -1934,154 +1934,7 @@ public final class FileContext {
public FileStatus[] globStatus(final Path pathPattern,
final PathFilter filter) throws AccessControlException,
UnsupportedFileSystemException, IOException {
- URI uri = getFSofPath(fixRelativePart(pathPattern)).getUri();
-
- String filename = pathPattern.toUri().getPath();
-
- List<String> filePatterns = GlobExpander.expand(filename);
- if (filePatterns.size() == 1) {
- Path absPathPattern = fixRelativePart(pathPattern);
- return globStatusInternal(uri, new Path(absPathPattern.toUri()
- .getPath()), filter);
- } else {
- List<FileStatus> results = new ArrayList<FileStatus>();
- for (String iFilePattern : filePatterns) {
- Path iAbsFilePattern = fixRelativePart(new Path(iFilePattern));
- FileStatus[] files = globStatusInternal(uri, iAbsFilePattern, filter);
- for (FileStatus file : files) {
- results.add(file);
- }
- }
- return results.toArray(new FileStatus[results.size()]);
- }
- }
-
- /**
- *
- * @param uri for all the inPathPattern
- * @param inPathPattern - without the scheme & authority (take from uri)
- * @param filter
- *
- * @return an array of FileStatus objects
- *
- * @throws AccessControlException If access is denied
- * @throws IOException If an I/O error occurred
- */
- private FileStatus[] globStatusInternal(final URI uri,
- final Path inPathPattern, final PathFilter filter)
- throws AccessControlException, IOException
- {
- Path[] parents = new Path[1];
- int level = 0;
-
- assert(inPathPattern.toUri().getScheme() == null &&
- inPathPattern.toUri().getAuthority() == null &&
- inPathPattern.isUriPathAbsolute());
-
-
- String filename = inPathPattern.toUri().getPath();
-
- // path has only zero component
- if (filename.isEmpty() || Path.SEPARATOR.equals(filename)) {
- Path p = inPathPattern.makeQualified(uri, null);
- return getFileStatus(new Path[]{p});
- }
-
- // path has at least one component
- String[] components = filename.split(Path.SEPARATOR);
-
- // Path is absolute, first component is "/" hence first component
- // is the uri root
- parents[0] = new Path(new Path(uri), new Path("/"));
- level = 1;
-
- // glob the paths that match the parent path, ie. [0, components.length-1]
- boolean[] hasGlob = new boolean[]{false};
- Path[] relParentPaths =
- globPathsLevel(parents, components, level, hasGlob);
- FileStatus[] results;
-
- if (relParentPaths == null || relParentPaths.length == 0) {
- results = null;
- } else {
- // fix the pathes to be abs
- Path[] parentPaths = new Path [relParentPaths.length];
- for(int i=0; i<relParentPaths.length; i++) {
- parentPaths[i] = relParentPaths[i].makeQualified(uri, null);
- }
-
- // Now work on the last component of the path
- GlobFilter fp =
- new GlobFilter(components[components.length - 1], filter);
- if (fp.hasPattern()) { // last component has a pattern
- // list parent directories and then glob the results
- try {
- results = listStatus(parentPaths, fp);
- } catch (FileNotFoundException e) {
- results = null;
- }
- hasGlob[0] = true;
- } else { // last component does not have a pattern
- // get all the path names
- ArrayList<Path> filteredPaths =
- new ArrayList<Path>(parentPaths.length);
- for (int i = 0; i < parentPaths.length; i++) {
- parentPaths[i] = new Path(parentPaths[i],
- components[components.length - 1]);
- if (fp.accept(parentPaths[i])) {
- filteredPaths.add(parentPaths[i]);
- }
- }
- // get all their statuses
- results = getFileStatus(
- filteredPaths.toArray(new Path[filteredPaths.size()]));
- }
- }
-
- // Decide if the pathPattern contains a glob or not
- if (results == null) {
- if (hasGlob[0]) {
- results = new FileStatus[0];
- }
- } else {
- if (results.length == 0) {
- if (!hasGlob[0]) {
- results = null;
- }
- } else {
- Arrays.sort(results);
- }
- }
- return results;
- }
-
- /*
- * For a path of N components, return a list of paths that match the
- * components [<code>level</code>, <code>N-1</code>].
- */
- private Path[] globPathsLevel(Path[] parents, String[] filePattern,
- int level, boolean[] hasGlob) throws AccessControlException,
- FileNotFoundException, IOException {
- if (level == filePattern.length - 1) {
- return parents;
- }
- if (parents == null || parents.length == 0) {
- return null;
- }
- GlobFilter fp = new GlobFilter(filePattern[level]);
- if (fp.hasPattern()) {
- try {
- parents = FileUtil.stat2Paths(listStatus(parents, fp));
- } catch (FileNotFoundException e) {
- parents = null;
- }
- hasGlob[0] = true;
- } else {
- for (int i = 0; i < parents.length; i++) {
- parents[i] = new Path(parents[i], filePattern[level]);
- }
- }
- return globPathsLevel(parents, filePattern, level + 1, hasGlob);
+ return new Globber(FileContext.this, pathPattern, filter).glob();
}
/**
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java Tue Aug 13 21:19:53 2013
@@ -1619,7 +1619,7 @@ public abstract class FileSystem extends
* @throws IOException
*/
public FileStatus[] globStatus(Path pathPattern) throws IOException {
- return globStatus(pathPattern, DEFAULT_FILTER);
+ return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
}
/**
@@ -1637,126 +1637,7 @@ public abstract class FileSystem extends
*/
public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
throws IOException {
- String filename = pathPattern.toUri().getPath();
- List<FileStatus> allMatches = null;
-
- List<String> filePatterns = GlobExpander.expand(filename);
- for (String filePattern : filePatterns) {
- Path path = new Path(filePattern.isEmpty() ? Path.CUR_DIR : filePattern);
- List<FileStatus> matches = globStatusInternal(path, filter);
- if (matches != null) {
- if (allMatches == null) {
- allMatches = matches;
- } else {
- allMatches.addAll(matches);
- }
- }
- }
-
- FileStatus[] results = null;
- if (allMatches != null) {
- results = allMatches.toArray(new FileStatus[allMatches.size()]);
- } else if (filePatterns.size() > 1) {
- // no matches with multiple expansions is a non-matching glob
- results = new FileStatus[0];
- }
- return results;
- }
-
- // sort gripes because FileStatus Comparable isn't parameterized...
- @SuppressWarnings("unchecked")
- private List<FileStatus> globStatusInternal(Path pathPattern,
- PathFilter filter) throws IOException {
- boolean patternHasGlob = false; // pathPattern has any globs
- List<FileStatus> matches = new ArrayList<FileStatus>();
-
- // determine starting point
- int level = 0;
- String baseDir = Path.CUR_DIR;
- if (pathPattern.isAbsolute()) {
- level = 1; // need to skip empty item at beginning of split list
- baseDir = Path.SEPARATOR;
- }
-
- // parse components and determine if it's a glob
- String[] components = null;
- GlobFilter[] filters = null;
- String filename = pathPattern.toUri().getPath();
- if (!filename.isEmpty() && !Path.SEPARATOR.equals(filename)) {
- components = filename.split(Path.SEPARATOR);
- filters = new GlobFilter[components.length];
- for (int i=level; i < components.length; i++) {
- filters[i] = new GlobFilter(components[i]);
- patternHasGlob |= filters[i].hasPattern();
- }
- if (!patternHasGlob) {
- baseDir = unquotePathComponent(filename);
- components = null; // short through to filter check
- }
- }
-
- // seed the parent directory path, return if it doesn't exist
- try {
- matches.add(getFileStatus(new Path(baseDir)));
- } catch (FileNotFoundException e) {
- return patternHasGlob ? matches : null;
- }
-
- // skip if there are no components other than the basedir
- if (components != null) {
- // iterate through each path component
- for (int i=level; (i < components.length) && !matches.isEmpty(); i++) {
- List<FileStatus> children = new ArrayList<FileStatus>();
- for (FileStatus match : matches) {
- // don't look for children in a file matched by a glob
- if (!match.isDirectory()) {
- continue;
- }
- try {
- if (filters[i].hasPattern()) {
- // get all children matching the filter
- FileStatus[] statuses = listStatus(match.getPath(), filters[i]);
- children.addAll(Arrays.asList(statuses));
- } else {
- // the component does not have a pattern
- String component = unquotePathComponent(components[i]);
- Path child = new Path(match.getPath(), component);
- children.add(getFileStatus(child));
- }
- } catch (FileNotFoundException e) {
- // don't care
- }
- }
- matches = children;
- }
- }
- // remove anything that didn't match the filter
- if (!matches.isEmpty()) {
- Iterator<FileStatus> iter = matches.iterator();
- while (iter.hasNext()) {
- if (!filter.accept(iter.next().getPath())) {
- iter.remove();
- }
- }
- }
- // no final paths, if there were any globs return empty list
- if (matches.isEmpty()) {
- return patternHasGlob ? matches : null;
- }
- Collections.sort(matches);
- return matches;
- }
-
- /**
- * The glob filter builds a regexp per path component. If the component
- * does not contain a shell metachar, then it falls back to appending the
- * raw string to the list of built up paths. This raw path needs to have
- * the quoting removed. Ie. convert all occurances of "\X" to "X"
- * @param name of the path component
- * @return the unquoted path component
- */
- private String unquotePathComponent(String name) {
- return name.replaceAll("\\\\(.)", "$1");
+ return new Globber(this, pathPattern, filter).glob();
}
/**
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemLinkResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemLinkResolver.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemLinkResolver.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemLinkResolver.java Tue Aug 13 21:19:53 2013
@@ -73,7 +73,9 @@ public abstract class FileSystemLinkReso
int count = 0;
T in = null;
Path p = path;
- FileSystem fs = FileSystem.getFSofPath(p, filesys.getConf());
+ // Assumes path belongs to this FileSystem.
+ // Callers validate this by passing paths through FileSystem#checkPath
+ FileSystem fs = filesys;
for (boolean isLink = true; isLink;) {
try {
in = doCall(p);
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java Tue Aug 13 21:19:53 2013
@@ -142,7 +142,28 @@ public class FileUtil {
}
return deleteImpl(dir, true);
}
-
+
+ /**
+ * Returns the target of the given symlink. Returns the empty string if
+ * the given path does not refer to a symlink or there is an error
+ * accessing the symlink.
+ * @param f File representing the symbolic link.
+ * @return The target of the symbolic link, empty string on error or if not
+ * a symlink.
+ */
+ public static String readLink(File f) {
+ /* NB: Use readSymbolicLink in java.nio.file.Path once available. Could
+ * use getCanonicalPath in File to get the target of the symlink but that
+ * does not indicate if the given path refers to a symlink.
+ */
+ try {
+ return Shell.execCommand(
+ Shell.getReadlinkCommand(f.toString())).trim();
+ } catch (IOException x) {
+ return "";
+ }
+ }
+
/*
* Pure-Java implementation of "chmod +rwx f".
*/
@@ -737,15 +758,18 @@ public class FileUtil {
* On Windows, when symlink creation fails due to security
* setting, we will log a warning. The return code in this
* case is 2.
+ *
* @param target the target for symlink
* @param linkname the symlink
- * @return value returned by the command
+ * @return 0 on success
*/
public static int symLink(String target, String linkname) throws IOException{
// Run the input paths through Java's File so that they are converted to the
// native OS form
- File targetFile = new File(target);
- File linkFile = new File(linkname);
+ File targetFile = new File(
+ Path.getPathWithoutSchemeAndAuthority(new Path(target)).toString());
+ File linkFile = new File(
+ Path.getPathWithoutSchemeAndAuthority(new Path(linkname)).toString());
// If not on Java7+, copy a file instead of creating a symlink since
// Java6 has close to no support for symlinks on Windows. Specifically
@@ -757,9 +781,16 @@ public class FileUtil {
// is symlinked under userlogs and userlogs are generated afterwards).
if (Shell.WINDOWS && !Shell.isJava7OrAbove() && targetFile.isFile()) {
try {
- LOG.info("FileUtil#symlink: On Java6, copying file instead "
- + linkname + " -> " + target);
- org.apache.commons.io.FileUtils.copyFile(targetFile, linkFile);
+ LOG.warn("FileUtil#symlink: On Windows+Java6, copying file instead " +
+ "of creating a symlink. Copying " + target + " -> " + linkname);
+
+ if (!linkFile.getParentFile().exists()) {
+ LOG.warn("Parent directory " + linkFile.getParent() +
+ " does not exist.");
+ return 1;
+ } else {
+ org.apache.commons.io.FileUtils.copyFile(targetFile, linkFile);
+ }
} catch (IOException ex) {
LOG.warn("FileUtil#symlink failed to copy the file with error: "
+ ex.getMessage());
@@ -769,10 +800,23 @@ public class FileUtil {
return 0;
}
- String[] cmd = Shell.getSymlinkCommand(targetFile.getPath(),
- linkFile.getPath());
- ShellCommandExecutor shExec = new ShellCommandExecutor(cmd);
+ String[] cmd = Shell.getSymlinkCommand(
+ targetFile.toString(),
+ linkFile.toString());
+
+ ShellCommandExecutor shExec;
try {
+ if (Shell.WINDOWS &&
+ linkFile.getParentFile() != null &&
+ !new Path(target).isAbsolute()) {
+ // Relative links on Windows must be resolvable at the time of
+ // creation. To ensure this we run the shell command in the directory
+ // of the link.
+ //
+ shExec = new ShellCommandExecutor(cmd, linkFile.getParentFile());
+ } else {
+ shExec = new ShellCommandExecutor(cmd);
+ }
shExec.execute();
} catch (Shell.ExitCodeException ec) {
int returnVal = ec.getExitCode();
@@ -795,7 +839,7 @@ public class FileUtil {
}
return shExec.getExitCode();
}
-
+
/**
* Change the permissions on a filename.
* @param filename the name of the file to change
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java Tue Aug 13 21:19:53 2013
@@ -682,31 +682,13 @@ public class RawLocalFileSystem extends
if (createParent) {
mkdirs(link.getParent());
}
- // NB: Use createSymbolicLink in java.nio.file.Path once available
- try {
- Shell.execCommand(Shell.getSymlinkCommand(
- Path.getPathWithoutSchemeAndAuthority(target).toString(),
- Path.getPathWithoutSchemeAndAuthority(makeAbsolute(link)).toString()));
- } catch (IOException x) {
- throw new IOException("Unable to create symlink: "+x.getMessage());
- }
- }
- /**
- * Returns the target of the given symlink. Returns the empty string if
- * the given path does not refer to a symlink or there is an error
- * accessing the symlink.
- */
- private String readLink(Path p) {
- /* NB: Use readSymbolicLink in java.nio.file.Path once available. Could
- * use getCanonicalPath in File to get the target of the symlink but that
- * does not indicate if the given path refers to a symlink.
- */
- try {
- final String path = p.toUri().getPath();
- return Shell.execCommand(Shell.READ_LINK_COMMAND, path).trim();
- } catch (IOException x) {
- return "";
+ // NB: Use createSymbolicLink in java.nio.file.Path once available
+ int result = FileUtil.symLink(target.toString(),
+ makeAbsolute(link).toString());
+ if (result != 0) {
+ throw new IOException("Error " + result + " creating symlink " +
+ link + " to " + target);
}
}
@@ -729,7 +711,7 @@ public class RawLocalFileSystem extends
}
private FileStatus getFileLinkStatusInternal(final Path f) throws IOException {
- String target = readLink(f);
+ String target = FileUtil.readLink(new File(f.toString()));
try {
FileStatus fs = getFileStatus(f);
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/RawLocalFs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/RawLocalFs.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/RawLocalFs.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/RawLocalFs.java Tue Aug 13 21:19:53 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs.local;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
@@ -28,12 +29,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.DelegateToFileSystem;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Shell;
/**
* The RawLocalFs implementation of AbstractFileSystem.
@@ -75,47 +76,29 @@ public class RawLocalFs extends Delegate
@Override
public boolean supportsSymlinks() {
return true;
- }
-
+ }
+
@Override
- public void createSymlink(Path target, Path link, boolean createParent)
+ public void createSymlink(Path target, Path link, boolean createParent)
throws IOException {
final String targetScheme = target.toUri().getScheme();
if (targetScheme != null && !"file".equals(targetScheme)) {
throw new IOException("Unable to create symlink to non-local file "+
- "system: "+target.toString());
+ "system: "+target.toString());
}
+
if (createParent) {
mkdir(link.getParent(), FsPermission.getDirDefault(), true);
}
+
// NB: Use createSymbolicLink in java.nio.file.Path once available
- try {
- Shell.execCommand(Shell.getSymlinkCommand(
- Path.getPathWithoutSchemeAndAuthority(target).toString(),
- Path.getPathWithoutSchemeAndAuthority(link).toString()));
- } catch (IOException x) {
- throw new IOException("Unable to create symlink: "+x.getMessage());
+ int result = FileUtil.symLink(target.toString(), link.toString());
+ if (result != 0) {
+ throw new IOException("Error " + result + " creating symlink " +
+ link + " to " + target);
}
}
- /**
- * Returns the target of the given symlink. Returns the empty string if
- * the given path does not refer to a symlink or there is an error
- * acessing the symlink.
- */
- private String readLink(Path p) {
- /* NB: Use readSymbolicLink in java.nio.file.Path once available. Could
- * use getCanonicalPath in File to get the target of the symlink but that
- * does not indicate if the given path refers to a symlink.
- */
- try {
- final String path = p.toUri().getPath();
- return Shell.execCommand(Shell.READ_LINK_COMMAND, path).trim();
- } catch (IOException x) {
- return "";
- }
- }
-
/**
* Return a FileStatus representing the given path. If the path refers
* to a symlink return a FileStatus representing the link rather than
@@ -123,7 +106,7 @@ public class RawLocalFs extends Delegate
*/
@Override
public FileStatus getFileLinkStatus(final Path f) throws IOException {
- String target = readLink(f);
+ String target = FileUtil.readLink(new File(f.toString()));
try {
FileStatus fs = getFileStatus(f);
// If f refers to a regular file or directory
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java Tue Aug 13 21:19:53 2013
@@ -18,18 +18,16 @@
package org.apache.hadoop.fs.shell;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.PathIsDirectoryException;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
/** Various commands for copy files */
@@ -44,6 +42,7 @@ class CopyCommands {
factory.addClass(CopyToLocal.class, "-copyToLocal");
factory.addClass(Get.class, "-get");
factory.addClass(Put.class, "-put");
+ factory.addClass(AppendToFile.class, "-appendToFile");
}
/** merge multiple files together */
@@ -235,4 +234,93 @@ class CopyCommands {
public static final String USAGE = Get.USAGE;
public static final String DESCRIPTION = "Identical to the -get command.";
}
+
+ /**
+ * Append the contents of one or more local files to a remote
+ * file.
+ */
+ public static class AppendToFile extends CommandWithDestination {
+ public static final String NAME = "appendToFile";
+ public static final String USAGE = "<localsrc> ... <dst>";
+ public static final String DESCRIPTION =
+ "Appends the contents of all the given local files to the\n" +
+ "given dst file. The dst file will be created if it does\n" +
+ "not exist. If <localSrc> is -, then the input is read\n" +
+ "from stdin.";
+
+ private static final int DEFAULT_IO_LENGTH = 1024 * 1024;
+ boolean readStdin = false;
+
+ // commands operating on local paths have no need for glob expansion
+ @Override
+ protected List<PathData> expandArgument(String arg) throws IOException {
+ List<PathData> items = new LinkedList<PathData>();
+ if (arg.equals("-")) {
+ readStdin = true;
+ } else {
+ try {
+ items.add(new PathData(new URI(arg), getConf()));
+ } catch (URISyntaxException e) {
+ if (Path.WINDOWS) {
+ // Unlike URI, PathData knows how to parse Windows drive-letter paths.
+ items.add(new PathData(arg, getConf()));
+ } else {
+ throw new IOException("Unexpected URISyntaxException: " + e.toString());
+ }
+ }
+ }
+ return items;
+ }
+
+ @Override
+ protected void processOptions(LinkedList<String> args)
+ throws IOException {
+
+ if (args.size() < 2) {
+ throw new IOException("missing destination argument");
+ }
+
+ getRemoteDestination(args);
+ super.processOptions(args);
+ }
+
+ @Override
+ protected void processArguments(LinkedList<PathData> args)
+ throws IOException {
+
+ if (!dst.exists) {
+ dst.fs.create(dst.path, false).close();
+ }
+
+ InputStream is = null;
+ FSDataOutputStream fos = dst.fs.append(dst.path);
+
+ try {
+ if (readStdin) {
+ if (args.size() == 0) {
+ IOUtils.copyBytes(System.in, fos, DEFAULT_IO_LENGTH);
+ } else {
+ throw new IOException(
+ "stdin (-) must be the sole input argument when present");
+ }
+ }
+
+ // Read in each input file and write to the target.
+ for (PathData source : args) {
+ is = new FileInputStream(source.toFile());
+ IOUtils.copyBytes(is, fos, DEFAULT_IO_LENGTH);
+ IOUtils.closeStream(is);
+ is = null;
+ }
+ } finally {
+ if (is != null) {
+ IOUtils.closeStream(is);
+ }
+
+ if (fos != null) {
+ IOUtils.closeStream(fos);
+ }
+ }
+ }
+ }
}
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/lib/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java Tue Aug 13 21:19:53 2013
@@ -107,7 +107,7 @@ public class Lz4Codec implements Configu
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
- int compressionOverhead = Math.max((int)(bufferSize * 0.01), 10);
+ int compressionOverhead = bufferSize/255 + 16;
return new BlockCompressorStream(out, compressor, bufferSize,
compressionOverhead);
@@ -140,7 +140,10 @@ public class Lz4Codec implements Configu
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
- return new Lz4Compressor(bufferSize);
+ boolean useLz4HC = conf.getBoolean(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_DEFAULT);
+ return new Lz4Compressor(bufferSize, useLz4HC);
}
/**
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java Tue Aug 13 21:19:53 2013
@@ -52,6 +52,7 @@ public class Lz4Compressor implements Co
private long bytesRead = 0L;
private long bytesWritten = 0L;
+ private final boolean useLz4HC;
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
@@ -72,8 +73,11 @@ public class Lz4Compressor implements Co
* Creates a new compressor.
*
* @param directBufferSize size of the direct buffer to be used.
+ * @param useLz4HC use high compression ratio version of lz4,
+ * which trades CPU for compression ratio.
*/
- public Lz4Compressor(int directBufferSize) {
+ public Lz4Compressor(int directBufferSize, boolean useLz4HC) {
+ this.useLz4HC = useLz4HC;
this.directBufferSize = directBufferSize;
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
@@ -82,6 +86,15 @@ public class Lz4Compressor implements Co
}
/**
+ * Creates a new compressor.
+ *
+ * @param directBufferSize size of the direct buffer to be used.
+ */
+ public Lz4Compressor(int directBufferSize) {
+ this(directBufferSize, false);
+ }
+
+ /**
* Creates a new compressor with the default buffer size.
*/
public Lz4Compressor() {
@@ -227,7 +240,7 @@ public class Lz4Compressor implements Co
}
// Compress data
- n = compressBytesDirect();
+ n = useLz4HC ? compressBytesDirectHC() : compressBytesDirect();
compressedDirectBuf.limit(n);
uncompressedDirectBuf.clear(); // lz4 consumes all buffer input
@@ -297,5 +310,7 @@ public class Lz4Compressor implements Co
private native int compressBytesDirect();
+ private native int compressBytesDirectHC();
+
public native static String getLibraryName();
}
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Tue Aug 13 21:19:53 2013
@@ -18,10 +18,11 @@
package org.apache.hadoop.ipc;
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.*;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FilterInputStream;
@@ -52,6 +53,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory;
+import javax.security.sasl.Sasl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -87,6 +89,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.CodedOutputStream;
@@ -380,6 +383,7 @@ public class Client {
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private boolean doPing; //do we need to send ping message
private int pingInterval; // how often sends ping to the server in msecs
+ private ByteArrayOutputStream pingRequest; // ping message
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@@ -405,6 +409,15 @@ public class Client {
this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
this.tcpNoDelay = remoteId.getTcpNoDelay();
this.doPing = remoteId.getDoPing();
+ if (doPing) {
+ // construct a RPC header with the callId as the ping callId
+ pingRequest = new ByteArrayOutputStream();
+ RpcRequestHeaderProto pingHeader = ProtoUtil
+ .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+ OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
+ RpcConstants.INVALID_RETRY_COUNT, clientId);
+ pingHeader.writeDelimitedTo(pingRequest);
+ }
this.pingInterval = remoteId.getPingInterval();
this.serviceClass = serviceClass;
if (LOG.isDebugEnabled()) {
@@ -700,6 +713,7 @@ public class Client {
}
});
} catch (Exception ex) {
+ authMethod = saslRpcClient.getAuthMethod();
if (rand == null) {
rand = new Random();
}
@@ -711,6 +725,9 @@ public class Client {
// Sasl connect is successful. Let's set up Sasl i/o streams.
inStream = saslRpcClient.getInputStream(inStream);
outStream = saslRpcClient.getOutputStream(outStream);
+ // for testing
+ remoteId.saslQop =
+ (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
} else if (UserGroupInformation.isSecurityEnabled() &&
!fallbackAllowed) {
throw new IOException("Server asks us to fall back to SIMPLE " +
@@ -720,12 +737,16 @@ public class Client {
}
if (doPing) {
- this.in = new DataInputStream(new BufferedInputStream(
- new PingInputStream(inStream)));
- } else {
- this.in = new DataInputStream(new BufferedInputStream(inStream));
+ inStream = new PingInputStream(inStream);
+ }
+ this.in = new DataInputStream(new BufferedInputStream(inStream));
+
+ // SASL may have already buffered the stream
+ if (!(outStream instanceof BufferedOutputStream)) {
+ outStream = new BufferedOutputStream(outStream);
}
- this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+ this.out = new DataOutputStream(outStream);
+
writeConnectionContext(remoteId, authMethod);
// update last activity time
@@ -905,7 +926,8 @@ public class Client {
if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime);
synchronized (out) {
- out.writeInt(RpcConstants.PING_CALL_ID);
+ out.writeInt(pingRequest.size());
+ pingRequest.writeTo(out);
out.flush();
}
}
@@ -1455,6 +1477,7 @@ public class Client {
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private final boolean doPing; //do we need to send ping message
private final int pingInterval; // how often sends ping to the server in msecs
+ private String saslQop; // here for testing
ConnectionId(InetSocketAddress address, Class<?> protocol,
UserGroupInformation ticket, int rpcTimeout, int maxIdleTime,
@@ -1509,6 +1532,11 @@ public class Client {
return pingInterval;
}
+ @VisibleForTesting
+ String getSaslQop() {
+ return saslQop;
+ }
+
static ConnectionId getConnectionId(InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
Configuration conf) throws IOException {
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java Tue Aug 13 21:19:53 2013
@@ -33,6 +33,7 @@ public class ClientId {
/** The byte array of a UUID should be 16 */
public static final int BYTE_LENGTH = 16;
+ private static final int shiftWidth = 8;
/**
* Return clientId as byte[]
@@ -53,15 +54,25 @@ public class ClientId {
}
// otherwise should be 16 bytes
Preconditions.checkArgument(clientId.length == BYTE_LENGTH);
+ long msb = getMsb(clientId);
+ long lsb = getLsb(clientId);
+ return (new UUID(msb, lsb)).toString();
+ }
+
+ public static long getMsb(byte[] clientId) {
long msb = 0;
- long lsb = 0;
- for (int i = 0; i < 8; i++) {
- msb = (msb << 8) | (clientId[i] & 0xff);
+ for (int i = 0; i < BYTE_LENGTH/2; i++) {
+ msb = (msb << shiftWidth) | (clientId[i] & 0xff);
}
- for (int i = 8; i < 16; i++) {
- lsb = (lsb << 8) | (clientId[i] & 0xff);
+ return msb;
+ }
+
+ public static long getLsb(byte[] clientId) {
+ long lsb = 0;
+ for (int i = BYTE_LENGTH/2; i < BYTE_LENGTH; i++) {
+ lsb = (lsb << shiftWidth) | (clientId[i] & 0xff);
}
- return (new UUID(msb, lsb)).toString();
+ return lsb;
}
/** Convert from clientId string byte[] representation of clientId */
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java Tue Aug 13 21:19:53 2013
@@ -70,16 +70,8 @@ public class RetryCache {
"Invalid clientId - length is " + clientId.length
+ " expected length " + ClientId.BYTE_LENGTH);
// Convert UUID bytes to two longs
- long tmp = 0;
- for (int i=0; i<8; i++) {
- tmp = (tmp << 8) | (clientId[i] & 0xff);
- }
- clientIdMsb = tmp;
- tmp = 0;
- for (int i=8; i<16; i++) {
- tmp = (tmp << 8) | (clientId[i] & 0xff);
- }
- clientIdLsb = tmp;
+ clientIdMsb = ClientId.getMsb(clientId);
+ clientIdLsb = ClientId.getLsb(clientId);
this.callId = callId;
this.expirationTime = expirationTime;
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java Tue Aug 13 21:19:53 2013
@@ -27,13 +27,13 @@ public class RpcConstants {
// Hidden Constructor
}
- public static final int PING_CALL_ID = -1;
+ public static final int AUTHORIZATION_FAILED_CALL_ID = -1;
+ public static final int INVALID_CALL_ID = -2;
+ public static final int CONNECTION_CONTEXT_CALL_ID = -3;
+ public static final int PING_CALL_ID = -4;
public static final byte[] DUMMY_CLIENT_ID = new byte[0];
- public static final int INVALID_CALL_ID = -2;
-
- public static final int CONNECTION_CONTEXT_CALL_ID = -3;
public static final int INVALID_RETRY_COUNT = -1;
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Tue Aug 13 21:19:53 2013
@@ -72,8 +72,9 @@ import org.apache.hadoop.fs.CommonConfig
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.*;
+
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -1177,9 +1178,7 @@ public abstract class Server {
public UserGroupInformation attemptingUser = null; // user name before auth
// Fake 'call' for failed authorization response
- private static final int AUTHORIZATION_FAILED_CALLID = -1;
-
- private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID,
+ private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
@@ -1276,8 +1275,28 @@ public abstract class Server {
}
}
- private RpcSaslProto saslReadAndProcess(DataInputStream dis) throws
- WrappedRpcServerException, InterruptedException {
+ private void saslReadAndProcess(DataInputStream dis) throws
+ WrappedRpcServerException, IOException, InterruptedException {
+ final RpcSaslProto saslMessage =
+ decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
+ switch (saslMessage.getState()) {
+ case WRAP: {
+ if (!saslContextEstablished || !useWrap) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ new SaslException("Server is not wrapping data"));
+ }
+ // loops over decoded data and calls processOneRpc
+ unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());
+ break;
+ }
+ default:
+ saslProcess(saslMessage);
+ }
+ }
+
+ private void saslProcess(RpcSaslProto saslMessage)
+ throws WrappedRpcServerException, IOException, InterruptedException {
if (saslContextEstablished) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
@@ -1286,7 +1305,7 @@ public abstract class Server {
RpcSaslProto saslResponse = null;
try {
try {
- saslResponse = processSaslMessage(dis);
+ saslResponse = processSaslMessage(saslMessage);
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
@@ -1310,8 +1329,6 @@ public abstract class Server {
LOG.debug("SASL server context established. Negotiated QoP is "
+ saslServer.getNegotiatedProperty(Sasl.QOP));
}
- String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
- useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
user = getAuthorizedUgi(saslServer.getAuthorizationID());
if (LOG.isDebugEnabled()) {
LOG.debug("SASL server successfully authenticated client: " + user);
@@ -1326,13 +1343,21 @@ public abstract class Server {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
}
- return saslResponse;
+ // send back response if any, may throw IOException
+ if (saslResponse != null) {
+ doSaslReply(saslResponse);
+ }
+ // do NOT enable wrapping until the last auth response is sent
+ if (saslContextEstablished) {
+ String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+ // SASL wrapping is only used if the connection has a QOP, and
+ // the value is not auth. ex. auth-int & auth-priv
+ useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));
+ }
}
- private RpcSaslProto processSaslMessage(DataInputStream dis)
+ private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
throws IOException, InterruptedException {
- final RpcSaslProto saslMessage =
- decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
RpcSaslProto saslResponse = null;
final SaslState state = saslMessage.getState(); // required
switch (state) {
@@ -1517,11 +1542,6 @@ public abstract class Server {
if (data == null) {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
- if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) {
- // covers the !useSasl too
- dataLengthBuffer.clear();
- return 0; // ping message
- }
checkDataLength(dataLength);
data = ByteBuffer.allocate(dataLength);
}
@@ -1532,7 +1552,7 @@ public abstract class Server {
dataLengthBuffer.clear();
data.flip();
boolean isHeaderRead = connectionContextRead;
- processRpcRequestPacket(data.array());
+ processOneRpc(data.array());
data = null;
if (!isHeaderRead) {
continue;
@@ -1695,29 +1715,19 @@ public abstract class Server {
}
/**
- * Process a RPC Request - if SASL wrapping is enabled, unwrap the
- * requests and process each one, else directly process the request
- * @param buf - single request or SASL wrapped requests
- * @throws IOException - connection failed to authenticate or authorize,
- * or the request could not be decoded into a Call
+ * Process a wrapped RPC Request - unwrap the SASL packet and process
+ * each embedded RPC request
+ * @param buf - SASL wrapped request of one or more RPCs
+ * @throws IOException - SASL packet cannot be unwrapped
* @throws InterruptedException
*/
- private void processRpcRequestPacket(byte[] buf)
- throws WrappedRpcServerException, IOException, InterruptedException {
- if (saslContextEstablished && useWrap) {
- if (LOG.isDebugEnabled())
- LOG.debug("Have read input token of size " + buf.length
- + " for processing by saslServer.unwrap()");
- final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length);
- // loops over decoded data and calls processOneRpc
- unwrapPacketAndProcessRpcs(plaintextData);
- } else {
- processOneRpc(buf);
- }
- }
-
private void unwrapPacketAndProcessRpcs(byte[] inBuf)
throws WrappedRpcServerException, IOException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Have read input token of size " + inBuf.length
+ + " for processing by saslServer.unwrap()");
+ }
+ inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
inBuf));
// Read all RPCs contained in the inBuf, even partial ones
@@ -1732,13 +1742,6 @@ public abstract class Server {
if (unwrappedData == null) {
unwrappedDataLengthBuffer.flip();
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
- if (unwrappedDataLength == RpcConstants.PING_CALL_ID) {
- if (LOG.isDebugEnabled())
- LOG.debug("Received ping message");
- unwrappedDataLengthBuffer.clear();
- continue; // ping message
- }
unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
}
@@ -1906,11 +1909,9 @@ public abstract class Server {
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"SASL protocol not requested by client");
}
- RpcSaslProto response = saslReadAndProcess(dis);
- // send back response if any, may throw IOException
- if (response != null) {
- doSaslReply(response);
- }
+ saslReadAndProcess(dis);
+ } else if (callId == PING_CALL_ID) {
+ LOG.debug("Received ping message");
} else {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
@@ -1924,7 +1925,7 @@ public abstract class Server {
*/
private void authorizeConnection() throws WrappedRpcServerException {
try {
- // If auth method is DIGEST, the token was obtained by the
+ // If auth method is TOKEN, the token was obtained by the
// real user for the effective user, therefore not required to
// authorize real user. doAs is allowed only for simple or kerberos
// authentication
@@ -2389,9 +2390,21 @@ public abstract class Server {
LOG.debug("Adding saslServer wrapped token of size " + token.length
+ " as call response.");
response.reset();
- DataOutputStream saslOut = new DataOutputStream(response);
- saslOut.writeInt(token.length);
- saslOut.write(token, 0, token.length);
+ // rebuild with sasl header and payload
+ RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
+ .setCallId(AuthProtocol.SASL.callId)
+ .setStatus(RpcStatusProto.SUCCESS)
+ .build();
+ RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+ .setState(SaslState.WRAP)
+ .setToken(ByteString.copyFrom(token, 0, token.length))
+ .build();
+ RpcResponseMessageWrapper saslResponse =
+ new RpcResponseMessageWrapper(saslHeader, saslMessage);
+
+ DataOutputStream out = new DataOutputStream(response);
+ out.writeInt(saslResponse.getLength());
+ saslResponse.write(out);
}
}
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/file/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Tue Aug 13 21:19:53 2013
@@ -381,7 +381,7 @@ public class MetricsSystemImpl extends M
private void snapshotMetrics(MetricsSourceAdapter sa,
MetricsBufferBuilder bufferBuilder) {
long startTime = Time.now();
- bufferBuilder.add(sa.name(), sa.getMetrics(collector, false));
+ bufferBuilder.add(sa.name(), sa.getMetrics(collector, true));
collector.clear();
snapshotStat.add(Time.now() - startTime);
LOG.debug("Snapshotted source "+ sa.name());
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/compiler/generated/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/compiler/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java Tue Aug 13 21:19:53 2013
@@ -20,15 +20,20 @@ package org.apache.hadoop.security;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
@@ -47,6 +52,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.GlobPattern;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
import org.apache.hadoop.ipc.RPC.RpcKind;
@@ -67,6 +73,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.util.ProtoUtil;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
/**
* A utility class that encapsulates SASL logic for RPC client
@@ -82,6 +89,7 @@ public class SaslRpcClient {
private final Configuration conf;
private SaslClient saslClient;
+ private AuthMethod authMethod;
private static final RpcRequestHeaderProto saslHeader = ProtoUtil
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
@@ -106,6 +114,24 @@ public class SaslRpcClient {
this.conf = conf;
}
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ public Object getNegotiatedProperty(String key) {
+ return (saslClient != null) ? saslClient.getNegotiatedProperty(key) : null;
+ }
+
+
+ // the RPC Client has an inelegant way of handling expiration of TGTs
+ // acquired via a keytab. any connection failure causes a relogin, so
+ // the Client needs to know what authMethod was being attempted if an
+ // exception occurs. the SASL prep for a kerberos connection should
+ // ideally relogin if necessary instead of exposing this detail to the
+ // Client
+ @InterfaceAudience.Private
+ public AuthMethod getAuthMethod() {
+ return authMethod;
+ }
+
/**
* Instantiate a sasl client for the first supported auth type in the
* given list. The auth type must be defined, enabled, and the user
@@ -256,9 +282,8 @@ public class SaslRpcClient {
* @return String of the server's principal
* @throws IOException - error determining configured principal
*/
-
- // try to get the configured principal for the remote server
- private String getServerPrincipal(SaslAuth authType) throws IOException {
+ @VisibleForTesting
+ String getServerPrincipal(SaslAuth authType) throws IOException {
KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
LOG.debug("Get kerberos info proto:"+protocol+" info:"+krbInfo);
if (krbInfo == null) { // protocol has no support for kerberos
@@ -270,28 +295,37 @@ public class SaslRpcClient {
"Can't obtain server Kerberos config key from protocol="
+ protocol.getCanonicalName());
}
- // construct the expected principal from the config
- String confPrincipal = SecurityUtil.getServerPrincipal(
- conf.get(serverKey), serverAddr.getAddress());
- if (confPrincipal == null || confPrincipal.isEmpty()) {
- throw new IllegalArgumentException(
- "Failed to specify server's Kerberos principal name");
- }
- // ensure it looks like a host-based service principal
- KerberosName name = new KerberosName(confPrincipal);
- if (name.getHostName() == null) {
- throw new IllegalArgumentException(
- "Kerberos principal name does NOT have the expected hostname part: "
- + confPrincipal);
+ // construct server advertised principal for comparision
+ String serverPrincipal = new KerberosPrincipal(
+ authType.getProtocol() + "/" + authType.getServerId()).getName();
+ boolean isPrincipalValid = false;
+
+ // use the pattern if defined
+ String serverKeyPattern = conf.get(serverKey + ".pattern");
+ if (serverKeyPattern != null && !serverKeyPattern.isEmpty()) {
+ Pattern pattern = GlobPattern.compile(serverKeyPattern);
+ isPrincipalValid = pattern.matcher(serverPrincipal).matches();
+ } else {
+ // check that the server advertised principal matches our conf
+ String confPrincipal = SecurityUtil.getServerPrincipal(
+ conf.get(serverKey), serverAddr.getAddress());
+ if (confPrincipal == null || confPrincipal.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Failed to specify server's Kerberos principal name");
+ }
+ KerberosName name = new KerberosName(confPrincipal);
+ if (name.getHostName() == null) {
+ throw new IllegalArgumentException(
+ "Kerberos principal name does NOT have the expected hostname part: "
+ + confPrincipal);
+ }
+ isPrincipalValid = serverPrincipal.equals(confPrincipal);
}
- // check that the server advertised principal matches our conf
- KerberosPrincipal serverPrincipal = new KerberosPrincipal(
- authType.getProtocol() + "/" + authType.getServerId());
- if (!serverPrincipal.getName().equals(confPrincipal)) {
+ if (!isPrincipalValid) {
throw new IllegalArgumentException(
"Server has invalid Kerberos principal: " + serverPrincipal);
}
- return confPrincipal;
+ return serverPrincipal;
}
@@ -312,8 +346,9 @@ public class SaslRpcClient {
DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
outS));
- // redefined if/when a SASL negotiation completes
- AuthMethod authMethod = AuthMethod.SIMPLE;
+ // redefined if/when a SASL negotiation starts, can be queried if the
+ // negotiation fails
+ authMethod = AuthMethod.SIMPLE;
sendSaslMessage(outStream, negotiateRequest);
@@ -350,6 +385,7 @@ public class SaslRpcClient {
case NEGOTIATE: {
// create a compatible SASL client, throws if no supported auths
SaslAuth saslAuthType = selectSaslClient(saslMessage.getAuthsList());
+ // define auth being attempted, caller can query if connect fails
authMethod = AuthMethod.valueOf(saslAuthType.getMethod());
byte[] responseToken = null;
@@ -463,38 +499,141 @@ public class SaslRpcClient {
return response;
}
+ private boolean useWrap() {
+ // getNegotiatedProperty throws if client isn't complete
+ String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ // SASL wrapping is only used if the connection has a QOP, and
+ // the value is not auth. ex. auth-int & auth-priv
+ return qop != null && !"auth".equalsIgnoreCase(qop);
+ }
+
/**
- * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
- * been called.
+ * Get SASL wrapped InputStream if SASL QoP requires unwrapping,
+ * otherwise return original stream. Can be called only after
+ * saslConnect() has been called.
*
- * @param in
- * the InputStream to wrap
- * @return a SASL wrapped InputStream
+ * @param in - InputStream used to make the connection
+ * @return InputStream that may be using SASL unwrap
* @throws IOException
*/
public InputStream getInputStream(InputStream in) throws IOException {
- if (!saslClient.isComplete()) {
- throw new IOException("Sasl authentication exchange hasn't completed yet");
+ if (useWrap()) {
+ in = new WrappedInputStream(in);
}
- return new SaslInputStream(in, saslClient);
+ return in;
}
/**
- * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
- * been called.
+ * Get SASL wrapped OutputStream if SASL QoP requires wrapping,
+ * otherwise return original stream. Can be called only after
+ * saslConnect() has been called.
*
- * @param out
- * the OutputStream to wrap
- * @return a SASL wrapped OutputStream
+ * @param in - InputStream used to make the connection
+ * @return InputStream that may be using SASL unwrap
* @throws IOException
*/
public OutputStream getOutputStream(OutputStream out) throws IOException {
- if (!saslClient.isComplete()) {
- throw new IOException("Sasl authentication exchange hasn't completed yet");
+ if (useWrap()) {
+ // the client and server negotiate a maximum buffer size that can be
+ // wrapped
+ String maxBuf = (String)saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE);
+ out = new BufferedOutputStream(new WrappedOutputStream(out),
+ Integer.parseInt(maxBuf));
+ }
+ return out;
+ }
+
+ // ideally this should be folded into the RPC decoding loop but it's
+ // currently split across Client and SaslRpcClient...
+ class WrappedInputStream extends FilterInputStream {
+ private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0);
+ public WrappedInputStream(InputStream in) throws IOException {
+ super(in);
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ int n = read(b, 0, 1);
+ return (n != -1) ? b[0] : -1;
+ }
+
+ @Override
+ public int read(byte b[]) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws IOException {
+ synchronized(unwrappedRpcBuffer) {
+ // fill the buffer with the next RPC message
+ if (unwrappedRpcBuffer.remaining() == 0) {
+ readNextRpcPacket();
+ }
+ // satisfy as much of the request as possible
+ int readLen = Math.min(len, unwrappedRpcBuffer.remaining());
+ unwrappedRpcBuffer.get(buf, off, readLen);
+ return readLen;
+ }
+ }
+
+ // all messages must be RPC SASL wrapped, else an exception is thrown
+ private void readNextRpcPacket() throws IOException {
+ LOG.debug("reading next wrapped RPC packet");
+ DataInputStream dis = new DataInputStream(in);
+ int rpcLen = dis.readInt();
+ byte[] rpcBuf = new byte[rpcLen];
+ dis.readFully(rpcBuf);
+
+ // decode the RPC header
+ ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf);
+ RpcResponseHeaderProto.Builder headerBuilder =
+ RpcResponseHeaderProto.newBuilder();
+ headerBuilder.mergeDelimitedFrom(bis);
+
+ boolean isWrapped = false;
+ // Must be SASL wrapped, verify and decode.
+ if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) {
+ RpcSaslProto.Builder saslMessage = RpcSaslProto.newBuilder();
+ saslMessage.mergeDelimitedFrom(bis);
+ if (saslMessage.getState() == SaslState.WRAP) {
+ isWrapped = true;
+ byte[] token = saslMessage.getToken().toByteArray();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("unwrapping token of length:" + token.length);
+ }
+ token = saslClient.unwrap(token, 0, token.length);
+ unwrappedRpcBuffer = ByteBuffer.wrap(token);
+ }
+ }
+ if (!isWrapped) {
+ throw new SaslException("Server sent non-wrapped response");
+ }
}
- return new SaslOutputStream(out, saslClient);
}
+ class WrappedOutputStream extends FilterOutputStream {
+ public WrappedOutputStream(OutputStream out) throws IOException {
+ super(out);
+ }
+ @Override
+ public void write(byte[] buf, int off, int len) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("wrapping token of length:" + len);
+ }
+ buf = saslClient.wrap(buf, off, len);
+ RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+ .setState(SaslState.WRAP)
+ .setToken(ByteString.copyFrom(buf, 0, buf.length))
+ .build();
+ RpcRequestMessageWrapper request =
+ new RpcRequestMessageWrapper(saslHeader, saslMessage);
+ DataOutputStream dob = new DataOutputStream(out);
+ dob.writeInt(request.getLength());
+ request.write(dob);
+ }
+ }
+
/** Release resources used by wrapped saslClient */
public void dispose() throws SaslException {
if (saslClient != null) {
@@ -550,4 +689,4 @@ public class SaslRpcClient {
}
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java Tue Aug 13 21:19:53 2013
@@ -47,7 +47,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.Server.Connection;
-import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -104,12 +103,12 @@ public class SaslRpcServer {
String fullName = UserGroupInformation.getCurrentUser().getUserName();
if (LOG.isDebugEnabled())
LOG.debug("Kerberos principal name is " + fullName);
- KerberosName krbName = new KerberosName(fullName);
- serverId = krbName.getHostName();
- if (serverId == null) {
- serverId = "";
- }
- protocol = krbName.getServiceName();
+ // don't use KerberosName because we don't want auth_to_local
+ String[] parts = fullName.split("[/@]", 2);
+ protocol = parts[0];
+ // should verify service host is present here rather than in create()
+ // but lazy tests are using a UGI that isn't a SPN...
+ serverId = (parts.length < 2) ? "" : parts[1];
break;
}
default:
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java Tue Aug 13 21:19:53 2013
@@ -123,6 +123,12 @@ abstract public class Shell {
: new String[] { "ln", "-s", target, link };
}
+ /** Return a command to read the target of the a symbolic link*/
+ public static String[] getReadlinkCommand(String link) {
+ return WINDOWS ? new String[] { WINUTILS, "readlink", link }
+ : new String[] { "readlink", link };
+ }
+
/** Return a command for determining if process with specified pid is alive. */
public static String[] getCheckProcessIsAliveCommand(String pid) {
return Shell.WINDOWS ?
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/overview.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj Tue Aug 13 21:19:53 2013
@@ -72,6 +72,7 @@
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="src\org\apache\hadoop\io\compress\lz4\lz4.c" />
+ <ClCompile Include="src\org\apache\hadoop\io\compress\lz4\lz4hc.c" />
<ClCompile Include="src\org\apache\hadoop\io\compress\lz4\Lz4Compressor.c" />
<ClCompile Include="src\org\apache\hadoop\io\compress\lz4\Lz4Decompressor.c" />
<ClCompile Include="src\org\apache\hadoop\io\nativeio\file_descriptor.c" />
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj.filters
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj.filters?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj.filters (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj.filters Tue Aug 13 21:19:53 2013
@@ -51,6 +51,9 @@
<ClCompile Include="src\org\apache\hadoop\io\compress\lz4\lz4.c">
<Filter>Source Files</Filter>
</ClCompile>
+ <ClCompile Include="src\org\apache\hadoop\io\compress\lz4\lz4hc.c">
+ <Filter>Source Files</Filter>
+ </ClCompile>
<ClCompile Include="src\org\apache\hadoop\io\compress\lz4\Lz4Compressor.c">
<Filter>Source Files</Filter>
</ClCompile>
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c Tue Aug 13 21:19:53 2013
@@ -23,21 +23,9 @@
#ifdef UNIX
#include "config.h"
#endif // UNIX
+#include "lz4.h"
+#include "lz4hc.h"
-//****************************
-// Simple Functions
-//****************************
-
-extern int LZ4_compress (const char* source, char* dest, int isize);
-
-/*
-LZ4_compress() :
- return : the number of bytes in compressed buffer dest
- note : destination buffer must be already allocated.
- To avoid any problem, size it to handle worst cases situations (input data not compressible)
- Worst case size is : "inputsize + 0.4%", with "0.4%" being at least 8 bytes.
-
-*/
static jfieldID Lz4Compressor_clazz;
static jfieldID Lz4Compressor_uncompressedDirectBuf;
@@ -107,5 +95,45 @@ JNIEXPORT jstring JNICALL
Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_getLibraryName(
JNIEnv *env, jclass class
) {
- return (*env)->NewStringUTF(env, "revision:43");
+ return (*env)->NewStringUTF(env, "revision:99");
+}
+
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_compressBytesDirectHC
+(JNIEnv *env, jobject thisj){
+ const char* uncompressed_bytes = NULL;
+ char* compressed_bytes = NULL;
+
+ // Get members of Lz4Compressor
+ jobject clazz = (*env)->GetStaticObjectField(env, thisj, Lz4Compressor_clazz);
+ jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf);
+ jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen);
+ jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf);
+ jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_directBufferSize);
+
+ // Get the input direct buffer
+ LOCK_CLASS(env, clazz, "Lz4Compressor");
+ uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "Lz4Compressor");
+
+ if (uncompressed_bytes == 0) {
+ return (jint)0;
+ }
+
+ // Get the output direct buffer
+ LOCK_CLASS(env, clazz, "Lz4Compressor");
+ compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "Lz4Compressor");
+
+ if (compressed_bytes == 0) {
+ return (jint)0;
+ }
+
+ compressed_direct_buf_len = LZ4_compressHC(uncompressed_bytes, compressed_bytes, uncompressed_direct_buf_len);
+ if (compressed_direct_buf_len < 0){
+ THROW(env, "java/lang/InternalError", "LZ4_compressHC failed");
+ }
+
+ (*env)->SetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen, 0);
+
+ return (jint)compressed_direct_buf_len;
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c Tue Aug 13 21:19:53 2013
@@ -22,18 +22,7 @@
#ifdef UNIX
#include "config.h"
#endif // UNIX
-
-int LZ4_uncompress_unknownOutputSize(const char* source, char* dest, int isize, int maxOutputSize);
-
-/*
-LZ4_uncompress_unknownOutputSize() :
- isize : is the input size, therefore the compressed size
- maxOutputSize : is the size of the destination buffer (which must be already allocated)
- return : the number of bytes decoded in the destination buffer (necessarily <= maxOutputSize)
- If the source stream is malformed, the function will stop decoding and return a negative result, indicating the byte position of the faulty instruction
- This version never writes beyond dest + maxOutputSize, and is therefore protected against malicious data packets
- note : This version is a bit slower than LZ4_uncompress
-*/
+#include "lz4.h"
static jfieldID Lz4Decompressor_clazz;
@@ -89,7 +78,7 @@ JNIEXPORT jint JNICALL Java_org_apache_h
return (jint)0;
}
- uncompressed_direct_buf_len = LZ4_uncompress_unknownOutputSize(compressed_bytes, uncompressed_bytes, compressed_direct_buf_len, uncompressed_direct_buf_len);
+ uncompressed_direct_buf_len = LZ4_decompress_safe(compressed_bytes, uncompressed_bytes, compressed_direct_buf_len, uncompressed_direct_buf_len);
if (uncompressed_direct_buf_len < 0) {
THROW(env, "java/lang/InternalError", "LZ4_uncompress_unknownOutputSize failed.");
}