You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2013/08/13 23:19:57 UTC

svn commit: r1513658 [3/4] - in /hadoop/common/branches/HDFS-4949/hadoop-common-project: hadoop-auth-examples/src/main/webapp/ hadoop-auth-examples/src/main/webapp/annonymous/ hadoop-auth-examples/src/main/webapp/kerberos/ hadoop-auth-examples/src/main...

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/docs/releasenotes.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1509426-1512447

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java Tue Aug 13 21:19:53 2013
@@ -96,7 +96,7 @@ public class CommonConfigurationKeys ext
   public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT =
       256 * 1024;
 
-  /** Internal buffer size for Snappy compressor/decompressors */
+  /** Internal buffer size for Lz4 compressor/decompressors */
   public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY =
       "io.compression.codec.lz4.buffersize";
 
@@ -104,6 +104,14 @@ public class CommonConfigurationKeys ext
   public static final int IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT =
       256 * 1024;
 
+  /** Use lz4hc(slow but with high compression ratio) for lz4 compression */
+  public static final String IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY =
+      "io.compression.codec.lz4.use.lz4hc";
+
+  /** Default value for IO_COMPRESSION_CODEC_USELZ4HC_KEY */
+  public static final boolean IO_COMPRESSION_CODEC_LZ4_USELZ4HC_DEFAULT =
+      false;
+
   /**
    * Service Authorization
    */

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java Tue Aug 13 21:19:53 2013
@@ -258,7 +258,7 @@ public final class FileContext {
    * Hence this method is not called makeAbsolute() and 
    * has been deliberately declared private.
    */
-  private Path fixRelativePart(Path p) {
+  Path fixRelativePart(Path p) {
     if (p.isUriPathAbsolute()) {
       return p;
     } else {
@@ -1905,7 +1905,7 @@ public final class FileContext {
     public FileStatus[] globStatus(Path pathPattern)
         throws AccessControlException, UnsupportedFileSystemException,
         IOException {
-      return globStatus(pathPattern, DEFAULT_FILTER);
+      return new Globber(FileContext.this, pathPattern, DEFAULT_FILTER).glob();
     }
     
     /**
@@ -1934,154 +1934,7 @@ public final class FileContext {
     public FileStatus[] globStatus(final Path pathPattern,
         final PathFilter filter) throws AccessControlException,
         UnsupportedFileSystemException, IOException {
-      URI uri = getFSofPath(fixRelativePart(pathPattern)).getUri();
-
-      String filename = pathPattern.toUri().getPath();
-
-      List<String> filePatterns = GlobExpander.expand(filename);
-      if (filePatterns.size() == 1) {
-        Path absPathPattern = fixRelativePart(pathPattern);
-        return globStatusInternal(uri, new Path(absPathPattern.toUri()
-            .getPath()), filter);
-      } else {
-        List<FileStatus> results = new ArrayList<FileStatus>();
-        for (String iFilePattern : filePatterns) {
-          Path iAbsFilePattern = fixRelativePart(new Path(iFilePattern));
-          FileStatus[] files = globStatusInternal(uri, iAbsFilePattern, filter);
-          for (FileStatus file : files) {
-            results.add(file);
-          }
-        }
-        return results.toArray(new FileStatus[results.size()]);
-      }
-    }
-
-    /**
-     * 
-     * @param uri for all the inPathPattern
-     * @param inPathPattern - without the scheme & authority (take from uri)
-     * @param filter
-     *
-     * @return an array of FileStatus objects
-     *
-     * @throws AccessControlException If access is denied
-     * @throws IOException If an I/O error occurred
-     */
-    private FileStatus[] globStatusInternal(final URI uri,
-        final Path inPathPattern, final PathFilter filter)
-        throws AccessControlException, IOException
-      {
-      Path[] parents = new Path[1];
-      int level = 0;
-      
-      assert(inPathPattern.toUri().getScheme() == null &&
-          inPathPattern.toUri().getAuthority() == null && 
-          inPathPattern.isUriPathAbsolute());
-
-      
-      String filename = inPathPattern.toUri().getPath();
-      
-      // path has only zero component
-      if (filename.isEmpty() || Path.SEPARATOR.equals(filename)) {
-        Path p = inPathPattern.makeQualified(uri, null);
-        return getFileStatus(new Path[]{p});
-      }
-
-      // path has at least one component
-      String[] components = filename.split(Path.SEPARATOR);
-      
-      // Path is absolute, first component is "/" hence first component
-      // is the uri root
-      parents[0] = new Path(new Path(uri), new Path("/"));
-      level = 1;
-
-      // glob the paths that match the parent path, ie. [0, components.length-1]
-      boolean[] hasGlob = new boolean[]{false};
-      Path[] relParentPaths = 
-        globPathsLevel(parents, components, level, hasGlob);
-      FileStatus[] results;
-      
-      if (relParentPaths == null || relParentPaths.length == 0) {
-        results = null;
-      } else {
-        // fix the pathes to be abs
-        Path[] parentPaths = new Path [relParentPaths.length]; 
-        for(int i=0; i<relParentPaths.length; i++) {
-          parentPaths[i] = relParentPaths[i].makeQualified(uri, null);
-        }
-        
-        // Now work on the last component of the path
-        GlobFilter fp = 
-                    new GlobFilter(components[components.length - 1], filter);
-        if (fp.hasPattern()) { // last component has a pattern
-          // list parent directories and then glob the results
-          try {
-            results = listStatus(parentPaths, fp);
-          } catch (FileNotFoundException e) {
-            results = null;
-          }
-          hasGlob[0] = true;
-        } else { // last component does not have a pattern
-          // get all the path names
-          ArrayList<Path> filteredPaths = 
-                                      new ArrayList<Path>(parentPaths.length);
-          for (int i = 0; i < parentPaths.length; i++) {
-            parentPaths[i] = new Path(parentPaths[i],
-              components[components.length - 1]);
-            if (fp.accept(parentPaths[i])) {
-              filteredPaths.add(parentPaths[i]);
-            }
-          }
-          // get all their statuses
-          results = getFileStatus(
-              filteredPaths.toArray(new Path[filteredPaths.size()]));
-        }
-      }
-
-      // Decide if the pathPattern contains a glob or not
-      if (results == null) {
-        if (hasGlob[0]) {
-          results = new FileStatus[0];
-        }
-      } else {
-        if (results.length == 0) {
-          if (!hasGlob[0]) {
-            results = null;
-          }
-        } else {
-          Arrays.sort(results);
-        }
-      }
-      return results;
-    }
-
-    /*
-     * For a path of N components, return a list of paths that match the
-     * components [<code>level</code>, <code>N-1</code>].
-     */
-    private Path[] globPathsLevel(Path[] parents, String[] filePattern,
-        int level, boolean[] hasGlob) throws AccessControlException,
-        FileNotFoundException, IOException {
-      if (level == filePattern.length - 1) {
-        return parents;
-      }
-      if (parents == null || parents.length == 0) {
-        return null;
-      }
-      GlobFilter fp = new GlobFilter(filePattern[level]);
-      if (fp.hasPattern()) {
-        try {
-          parents = FileUtil.stat2Paths(listStatus(parents, fp));
-        } catch (FileNotFoundException e) {
-          parents = null;
-        }
-        hasGlob[0] = true;
-      } else {
-        for (int i = 0; i < parents.length; i++) {
-          parents[i] = new Path(parents[i], filePattern[level]);
-        }
-      }
-      return globPathsLevel(parents, filePattern, level + 1, hasGlob);
+      return new Globber(FileContext.this, pathPattern, filter).glob();
     }
 
     /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java Tue Aug 13 21:19:53 2013
@@ -1619,7 +1619,7 @@ public abstract class FileSystem extends
    * @throws IOException
    */
   public FileStatus[] globStatus(Path pathPattern) throws IOException {
-    return globStatus(pathPattern, DEFAULT_FILTER);
+    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
   }
   
   /**
@@ -1637,126 +1637,7 @@ public abstract class FileSystem extends
    */
   public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
       throws IOException {
-    String filename = pathPattern.toUri().getPath();
-    List<FileStatus> allMatches = null;
-    
-    List<String> filePatterns = GlobExpander.expand(filename);
-    for (String filePattern : filePatterns) {
-      Path path = new Path(filePattern.isEmpty() ? Path.CUR_DIR : filePattern);
-      List<FileStatus> matches = globStatusInternal(path, filter);
-      if (matches != null) {
-        if (allMatches == null) {
-          allMatches = matches;
-        } else {
-          allMatches.addAll(matches);
-        }
-      }
-    }
-    
-    FileStatus[] results = null;
-    if (allMatches != null) {
-      results = allMatches.toArray(new FileStatus[allMatches.size()]);
-    } else if (filePatterns.size() > 1) {
-      // no matches with multiple expansions is a non-matching glob 
-      results = new FileStatus[0];
-    }
-    return results;
-  }
-
-  // sort gripes because FileStatus Comparable isn't parameterized...
-  @SuppressWarnings("unchecked") 
-  private List<FileStatus> globStatusInternal(Path pathPattern,
-      PathFilter filter) throws IOException {
-    boolean patternHasGlob = false;       // pathPattern has any globs
-    List<FileStatus> matches = new ArrayList<FileStatus>();
-
-    // determine starting point
-    int level = 0;
-    String baseDir = Path.CUR_DIR;
-    if (pathPattern.isAbsolute()) {
-      level = 1; // need to skip empty item at beginning of split list
-      baseDir = Path.SEPARATOR;
-    }
-    
-    // parse components and determine if it's a glob
-    String[] components = null;
-    GlobFilter[] filters = null;
-    String filename = pathPattern.toUri().getPath();
-    if (!filename.isEmpty() && !Path.SEPARATOR.equals(filename)) {
-      components = filename.split(Path.SEPARATOR);
-      filters = new GlobFilter[components.length];
-      for (int i=level; i < components.length; i++) {
-        filters[i] = new GlobFilter(components[i]);
-        patternHasGlob |= filters[i].hasPattern();
-      }
-      if (!patternHasGlob) {
-        baseDir = unquotePathComponent(filename);
-        components = null; // short through to filter check
-      }
-    }
-    
-    // seed the parent directory path, return if it doesn't exist
-    try {
-      matches.add(getFileStatus(new Path(baseDir)));
-    } catch (FileNotFoundException e) {
-      return patternHasGlob ? matches : null;
-    }
-    
-    // skip if there are no components other than the basedir
-    if (components != null) {
-      // iterate through each path component
-      for (int i=level; (i < components.length) && !matches.isEmpty(); i++) {
-        List<FileStatus> children = new ArrayList<FileStatus>();
-        for (FileStatus match : matches) {
-          // don't look for children in a file matched by a glob
-          if (!match.isDirectory()) {
-            continue;
-          }
-          try {
-            if (filters[i].hasPattern()) {
-              // get all children matching the filter
-              FileStatus[] statuses = listStatus(match.getPath(), filters[i]);
-              children.addAll(Arrays.asList(statuses));
-            } else {
-              // the component does not have a pattern
-              String component = unquotePathComponent(components[i]);
-              Path child = new Path(match.getPath(), component);
-              children.add(getFileStatus(child));
-            }
-          } catch (FileNotFoundException e) {
-            // don't care
-          }
-        }
-        matches = children;
-      }
-    }
-    // remove anything that didn't match the filter
-    if (!matches.isEmpty()) {
-      Iterator<FileStatus> iter = matches.iterator();
-      while (iter.hasNext()) {
-        if (!filter.accept(iter.next().getPath())) {
-          iter.remove();
-        }
-      }
-    }
-    // no final paths, if there were any globs return empty list
-    if (matches.isEmpty()) {
-      return patternHasGlob ? matches : null;
-    }
-    Collections.sort(matches);
-    return matches;
-  }
-
-  /**
-   * The glob filter builds a regexp per path component.  If the component
-   * does not contain a shell metachar, then it falls back to appending the
-   * raw string to the list of built up paths.  This raw path needs to have
-   * the quoting removed.  Ie. convert all occurances of "\X" to "X"
-   * @param name of the path component
-   * @return the unquoted path component
-   */
-  private String unquotePathComponent(String name) {
-    return name.replaceAll("\\\\(.)", "$1");
+    return new Globber(this, pathPattern, filter).glob();
   }
   
   /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemLinkResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemLinkResolver.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemLinkResolver.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemLinkResolver.java Tue Aug 13 21:19:53 2013
@@ -73,7 +73,9 @@ public abstract class FileSystemLinkReso
     int count = 0;
     T in = null;
     Path p = path;
-    FileSystem fs = FileSystem.getFSofPath(p, filesys.getConf());
+    // Assumes path belongs to this FileSystem.
+    // Callers validate this by passing paths through FileSystem#checkPath
+    FileSystem fs = filesys;
     for (boolean isLink = true; isLink;) {
       try {
         in = doCall(p);

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java Tue Aug 13 21:19:53 2013
@@ -142,7 +142,28 @@ public class FileUtil {
     }
     return deleteImpl(dir, true);
   }
-  
+
+  /**
+   * Returns the target of the given symlink. Returns the empty string if
+   * the given path does not refer to a symlink or there is an error
+   * accessing the symlink.
+   * @param f File representing the symbolic link.
+   * @return The target of the symbolic link, empty string on error or if not
+   *         a symlink.
+   */
+  public static String readLink(File f) {
+    /* NB: Use readSymbolicLink in java.nio.file.Path once available. Could
+     * use getCanonicalPath in File to get the target of the symlink but that
+     * does not indicate if the given path refers to a symlink.
+     */
+    try {
+      return Shell.execCommand(
+          Shell.getReadlinkCommand(f.toString())).trim();
+    } catch (IOException x) {
+      return "";
+    }
+  }
+
   /*
    * Pure-Java implementation of "chmod +rwx f".
    */
@@ -737,15 +758,18 @@ public class FileUtil {
    * On Windows, when symlink creation fails due to security
    * setting, we will log a warning. The return code in this
    * case is 2.
+   *
    * @param target the target for symlink 
    * @param linkname the symlink
-   * @return value returned by the command
+   * @return 0 on success
    */
   public static int symLink(String target, String linkname) throws IOException{
     // Run the input paths through Java's File so that they are converted to the
     // native OS form
-    File targetFile = new File(target);
-    File linkFile = new File(linkname);
+    File targetFile = new File(
+        Path.getPathWithoutSchemeAndAuthority(new Path(target)).toString());
+    File linkFile = new File(
+        Path.getPathWithoutSchemeAndAuthority(new Path(linkname)).toString());
 
     // If not on Java7+, copy a file instead of creating a symlink since
     // Java6 has close to no support for symlinks on Windows. Specifically
@@ -757,9 +781,16 @@ public class FileUtil {
     // is symlinked under userlogs and userlogs are generated afterwards).
     if (Shell.WINDOWS && !Shell.isJava7OrAbove() && targetFile.isFile()) {
       try {
-        LOG.info("FileUtil#symlink: On Java6, copying file instead "
-            + linkname + " -> " + target);
-        org.apache.commons.io.FileUtils.copyFile(targetFile, linkFile);
+        LOG.warn("FileUtil#symlink: On Windows+Java6, copying file instead " +
+            "of creating a symlink. Copying " + target + " -> " + linkname);
+
+        if (!linkFile.getParentFile().exists()) {
+          LOG.warn("Parent directory " + linkFile.getParent() +
+              " does not exist.");
+          return 1;
+        } else {
+          org.apache.commons.io.FileUtils.copyFile(targetFile, linkFile);
+        }
       } catch (IOException ex) {
         LOG.warn("FileUtil#symlink failed to copy the file with error: "
             + ex.getMessage());
@@ -769,10 +800,23 @@ public class FileUtil {
       return 0;
     }
 
-    String[] cmd = Shell.getSymlinkCommand(targetFile.getPath(),
-        linkFile.getPath());
-    ShellCommandExecutor shExec = new ShellCommandExecutor(cmd);
+    String[] cmd = Shell.getSymlinkCommand(
+        targetFile.toString(),
+        linkFile.toString());
+
+    ShellCommandExecutor shExec;
     try {
+      if (Shell.WINDOWS &&
+          linkFile.getParentFile() != null &&
+          !new Path(target).isAbsolute()) {
+        // Relative links on Windows must be resolvable at the time of
+        // creation. To ensure this we run the shell command in the directory
+        // of the link.
+        //
+        shExec = new ShellCommandExecutor(cmd, linkFile.getParentFile());
+      } else {
+        shExec = new ShellCommandExecutor(cmd);
+      }
       shExec.execute();
     } catch (Shell.ExitCodeException ec) {
       int returnVal = ec.getExitCode();
@@ -795,7 +839,7 @@ public class FileUtil {
     }
     return shExec.getExitCode();
   }
-  
+
   /**
    * Change the permissions on a filename.
    * @param filename the name of the file to change

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java Tue Aug 13 21:19:53 2013
@@ -682,31 +682,13 @@ public class RawLocalFileSystem extends 
     if (createParent) {
       mkdirs(link.getParent());
     }
-    // NB: Use createSymbolicLink in java.nio.file.Path once available
-    try {
-      Shell.execCommand(Shell.getSymlinkCommand(
-        Path.getPathWithoutSchemeAndAuthority(target).toString(),
-        Path.getPathWithoutSchemeAndAuthority(makeAbsolute(link)).toString()));
-    } catch (IOException x) {
-      throw new IOException("Unable to create symlink: "+x.getMessage());
-    }
-  }
 
-  /**
-   * Returns the target of the given symlink. Returns the empty string if
-   * the given path does not refer to a symlink or there is an error
-   * accessing the symlink.
-   */
-  private String readLink(Path p) {
-    /* NB: Use readSymbolicLink in java.nio.file.Path once available. Could
-     * use getCanonicalPath in File to get the target of the symlink but that
-     * does not indicate if the given path refers to a symlink.
-     */
-    try {
-      final String path = p.toUri().getPath();
-      return Shell.execCommand(Shell.READ_LINK_COMMAND, path).trim();
-    } catch (IOException x) {
-      return "";
+    // NB: Use createSymbolicLink in java.nio.file.Path once available
+    int result = FileUtil.symLink(target.toString(),
+        makeAbsolute(link).toString());
+    if (result != 0) {
+      throw new IOException("Error " + result + " creating symlink " +
+          link + " to " + target);
     }
   }
 
@@ -729,7 +711,7 @@ public class RawLocalFileSystem extends 
   }
 
   private FileStatus getFileLinkStatusInternal(final Path f) throws IOException {
-    String target = readLink(f);
+    String target = FileUtil.readLink(new File(f.toString()));
 
     try {
       FileStatus fs = getFileStatus(f);

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/RawLocalFs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/RawLocalFs.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/RawLocalFs.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/RawLocalFs.java Tue Aug 13 21:19:53 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.fs.local;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
@@ -28,12 +29,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.DelegateToFileSystem;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FsConstants;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Shell;
 
 /**
  * The RawLocalFs implementation of AbstractFileSystem.
@@ -75,47 +76,29 @@ public class RawLocalFs extends Delegate
   @Override
   public boolean supportsSymlinks() {
     return true;
-  }  
-  
+  }
+
   @Override
-  public void createSymlink(Path target, Path link, boolean createParent) 
+  public void createSymlink(Path target, Path link, boolean createParent)
       throws IOException {
     final String targetScheme = target.toUri().getScheme();
     if (targetScheme != null && !"file".equals(targetScheme)) {
       throw new IOException("Unable to create symlink to non-local file "+
-                            "system: "+target.toString());
+          "system: "+target.toString());
     }
+
     if (createParent) {
       mkdir(link.getParent(), FsPermission.getDirDefault(), true);
     }
+
     // NB: Use createSymbolicLink in java.nio.file.Path once available
-    try {
-      Shell.execCommand(Shell.getSymlinkCommand(
-        Path.getPathWithoutSchemeAndAuthority(target).toString(),
-        Path.getPathWithoutSchemeAndAuthority(link).toString()));
-    } catch (IOException x) {
-      throw new IOException("Unable to create symlink: "+x.getMessage());
+    int result = FileUtil.symLink(target.toString(), link.toString());
+    if (result != 0) {
+      throw new IOException("Error " + result + " creating symlink " +
+          link + " to " + target);
     }
   }
 
-  /** 
-   * Returns the target of the given symlink. Returns the empty string if  
-   * the given path does not refer to a symlink or there is an error 
-   * acessing the symlink.
-   */
-  private String readLink(Path p) {
-    /* NB: Use readSymbolicLink in java.nio.file.Path once available. Could
-     * use getCanonicalPath in File to get the target of the symlink but that 
-     * does not indicate if the given path refers to a symlink.
-     */
-    try {
-      final String path = p.toUri().getPath();
-      return Shell.execCommand(Shell.READ_LINK_COMMAND, path).trim(); 
-    } catch (IOException x) {
-      return "";
-    }
-  }
-  
   /**
    * Return a FileStatus representing the given path. If the path refers 
    * to a symlink return a FileStatus representing the link rather than
@@ -123,7 +106,7 @@ public class RawLocalFs extends Delegate
    */
   @Override
   public FileStatus getFileLinkStatus(final Path f) throws IOException {
-    String target = readLink(f);
+    String target = FileUtil.readLink(new File(f.toString()));
     try {
       FileStatus fs = getFileStatus(f);
       // If f refers to a regular file or directory      

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java Tue Aug 13 21:19:53 2013
@@ -18,18 +18,16 @@
 
 package org.apache.hadoop.fs.shell;
 
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.PathIsDirectoryException;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.IOUtils;
 
 /** Various commands for copy files */
@@ -44,6 +42,7 @@ class CopyCommands {  
     factory.addClass(CopyToLocal.class, "-copyToLocal");
     factory.addClass(Get.class, "-get");
     factory.addClass(Put.class, "-put");
+    factory.addClass(AppendToFile.class, "-appendToFile");
   }
 
   /** merge multiple files together */
@@ -235,4 +234,93 @@ class CopyCommands {  
     public static final String USAGE = Get.USAGE;
     public static final String DESCRIPTION = "Identical to the -get command.";
   }
+
+  /**
+   *  Append the contents of one or more local files to a remote
+   *  file.
+   */
+  public static class AppendToFile extends CommandWithDestination {
+    public static final String NAME = "appendToFile";
+    public static final String USAGE = "<localsrc> ... <dst>";
+    public static final String DESCRIPTION =
+        "Appends the contents of all the given local files to the\n" +
+            "given dst file. The dst file will be created if it does\n" +
+            "not exist. If <localSrc> is -, then the input is read\n" +
+            "from stdin.";
+
+    private static final int DEFAULT_IO_LENGTH = 1024 * 1024;
+    boolean readStdin = false;
+
+    // commands operating on local paths have no need for glob expansion
+    @Override
+    protected List<PathData> expandArgument(String arg) throws IOException {
+      List<PathData> items = new LinkedList<PathData>();
+      if (arg.equals("-")) {
+        readStdin = true;
+      } else {
+        try {
+          items.add(new PathData(new URI(arg), getConf()));
+        } catch (URISyntaxException e) {
+          if (Path.WINDOWS) {
+            // Unlike URI, PathData knows how to parse Windows drive-letter paths.
+            items.add(new PathData(arg, getConf()));
+          } else {
+            throw new IOException("Unexpected URISyntaxException: " + e.toString());
+          }
+        }
+      }
+      return items;
+    }
+
+    @Override
+    protected void processOptions(LinkedList<String> args)
+        throws IOException {
+
+      if (args.size() < 2) {
+        throw new IOException("missing destination argument");
+      }
+
+      getRemoteDestination(args);
+      super.processOptions(args);
+    }
+
+    @Override
+    protected void processArguments(LinkedList<PathData> args)
+        throws IOException {
+
+      if (!dst.exists) {
+        dst.fs.create(dst.path, false).close();
+      }
+
+      InputStream is = null;
+      FSDataOutputStream fos = dst.fs.append(dst.path);
+
+      try {
+        if (readStdin) {
+          if (args.size() == 0) {
+            IOUtils.copyBytes(System.in, fos, DEFAULT_IO_LENGTH);
+          } else {
+            throw new IOException(
+                "stdin (-) must be the sole input argument when present");
+          }
+        }
+
+        // Read in each input file and write to the target.
+        for (PathData source : args) {
+          is = new FileInputStream(source.toFile());
+          IOUtils.copyBytes(is, fos, DEFAULT_IO_LENGTH);
+          IOUtils.closeStream(is);
+          is = null;
+        }
+      } finally {
+        if (is != null) {
+          IOUtils.closeStream(is);
+        }
+
+        if (fos != null) {
+          IOUtils.closeStream(fos);
+        }
+      }
+    }
+  }
 }

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/lib/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java Tue Aug 13 21:19:53 2013
@@ -107,7 +107,7 @@ public class Lz4Codec implements Configu
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
 
-    int compressionOverhead = Math.max((int)(bufferSize * 0.01), 10);
+    int compressionOverhead = bufferSize/255 + 16;
 
     return new BlockCompressorStream(out, compressor, bufferSize,
         compressionOverhead);
@@ -140,7 +140,10 @@ public class Lz4Codec implements Configu
     int bufferSize = conf.getInt(
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
-    return new Lz4Compressor(bufferSize);
+    boolean useLz4HC = conf.getBoolean(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_DEFAULT);
+    return new Lz4Compressor(bufferSize, useLz4HC);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java Tue Aug 13 21:19:53 2013
@@ -52,6 +52,7 @@ public class Lz4Compressor implements Co
   private long bytesRead = 0L;
   private long bytesWritten = 0L;
 
+  private final boolean useLz4HC;
 
   static {
     if (NativeCodeLoader.isNativeCodeLoaded()) {
@@ -72,8 +73,11 @@ public class Lz4Compressor implements Co
    * Creates a new compressor.
    *
    * @param directBufferSize size of the direct buffer to be used.
+   * @param useLz4HC use high compression ratio version of lz4, 
+   *                 which trades CPU for compression ratio.
    */
-  public Lz4Compressor(int directBufferSize) {
+  public Lz4Compressor(int directBufferSize, boolean useLz4HC) {
+    this.useLz4HC = useLz4HC;
     this.directBufferSize = directBufferSize;
 
     uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
@@ -82,6 +86,15 @@ public class Lz4Compressor implements Co
   }
 
   /**
+   * Creates a new compressor.
+   *
+   * @param directBufferSize size of the direct buffer to be used.
+   */
+  public Lz4Compressor(int directBufferSize) {
+    this(directBufferSize, false);
+  }
+
+  /**
    * Creates a new compressor with the default buffer size.
    */
   public Lz4Compressor() {
@@ -227,7 +240,7 @@ public class Lz4Compressor implements Co
     }
 
     // Compress data
-    n = compressBytesDirect();
+    n = useLz4HC ? compressBytesDirectHC() : compressBytesDirect();
     compressedDirectBuf.limit(n);
     uncompressedDirectBuf.clear(); // lz4 consumes all buffer input
 
@@ -297,5 +310,7 @@ public class Lz4Compressor implements Co
 
   private native int compressBytesDirect();
 
+  private native int compressBytesDirectHC();
+
   public native static String getLibraryName();
 }

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Tue Aug 13 21:19:53 2013
@@ -18,10 +18,11 @@
 
 package org.apache.hadoop.ipc;
 
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.*;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FilterInputStream;
@@ -52,6 +53,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.SocketFactory;
+import javax.security.sasl.Sasl;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -87,6 +89,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.CodedOutputStream;
@@ -380,6 +383,7 @@ public class Client {
     private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
     private boolean doPing; //do we need to send ping message
     private int pingInterval; // how often sends ping to the server in msecs
+    private ByteArrayOutputStream pingRequest; // ping message
     
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@@ -405,6 +409,15 @@ public class Client {
       this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
       this.tcpNoDelay = remoteId.getTcpNoDelay();
       this.doPing = remoteId.getDoPing();
+      if (doPing) {
+        // construct a RPC header with the callId as the ping callId
+        pingRequest = new ByteArrayOutputStream();
+        RpcRequestHeaderProto pingHeader = ProtoUtil
+            .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+                OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
+                RpcConstants.INVALID_RETRY_COUNT, clientId);
+        pingHeader.writeDelimitedTo(pingRequest);
+      }
       this.pingInterval = remoteId.getPingInterval();
       this.serviceClass = serviceClass;
       if (LOG.isDebugEnabled()) {
@@ -700,6 +713,7 @@ public class Client {
                     }
                   });
             } catch (Exception ex) {
+              authMethod = saslRpcClient.getAuthMethod();
               if (rand == null) {
                 rand = new Random();
               }
@@ -711,6 +725,9 @@ public class Client {
               // Sasl connect is successful. Let's set up Sasl i/o streams.
               inStream = saslRpcClient.getInputStream(inStream);
               outStream = saslRpcClient.getOutputStream(outStream);
+              // for testing
+              remoteId.saslQop =
+                  (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
             } else if (UserGroupInformation.isSecurityEnabled() &&
                        !fallbackAllowed) {
               throw new IOException("Server asks us to fall back to SIMPLE " +
@@ -720,12 +737,16 @@ public class Client {
           }
         
           if (doPing) {
-            this.in = new DataInputStream(new BufferedInputStream(
-                new PingInputStream(inStream)));
-          } else {
-            this.in = new DataInputStream(new BufferedInputStream(inStream));
+            inStream = new PingInputStream(inStream);
+          }
+          this.in = new DataInputStream(new BufferedInputStream(inStream));
+
+          // SASL may have already buffered the stream
+          if (!(outStream instanceof BufferedOutputStream)) {
+            outStream = new BufferedOutputStream(outStream);
           }
-          this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+          this.out = new DataOutputStream(outStream);
+          
           writeConnectionContext(remoteId, authMethod);
 
           // update last activity time
@@ -905,7 +926,8 @@ public class Client {
       if ( curTime - lastActivity.get() >= pingInterval) {
         lastActivity.set(curTime);
         synchronized (out) {
-          out.writeInt(RpcConstants.PING_CALL_ID);
+          out.writeInt(pingRequest.size());
+          pingRequest.writeTo(out);
           out.flush();
         }
       }
@@ -1455,6 +1477,7 @@ public class Client {
     private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
     private final boolean doPing; //do we need to send ping message
     private final int pingInterval; // how often sends ping to the server in msecs
+    private String saslQop; // here for testing
     
     ConnectionId(InetSocketAddress address, Class<?> protocol, 
                  UserGroupInformation ticket, int rpcTimeout, int maxIdleTime, 
@@ -1509,6 +1532,11 @@ public class Client {
       return pingInterval;
     }
     
+    @VisibleForTesting
+    String getSaslQop() {
+      return saslQop;
+    }
+    
     static ConnectionId getConnectionId(InetSocketAddress addr,
         Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
         Configuration conf) throws IOException {

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java Tue Aug 13 21:19:53 2013
@@ -33,6 +33,7 @@ public class ClientId {
   
   /** The byte array of a UUID should be 16 */ 
   public static final int BYTE_LENGTH = 16;
+  private static final int shiftWidth = 8;
   
   /**
    * Return clientId as byte[]
@@ -53,15 +54,25 @@ public class ClientId {
     }
     // otherwise should be 16 bytes
     Preconditions.checkArgument(clientId.length == BYTE_LENGTH);
+    long msb = getMsb(clientId);
+    long lsb = getLsb(clientId);
+    return (new UUID(msb, lsb)).toString();
+  }
+  
+  public static long getMsb(byte[] clientId) {
     long msb = 0;
-    long lsb = 0;
-    for (int i = 0; i < 8; i++) {
-      msb = (msb << 8) | (clientId[i] & 0xff);
+    for (int i = 0; i < BYTE_LENGTH/2; i++) {
+      msb = (msb << shiftWidth) | (clientId[i] & 0xff);
     }
-    for (int i = 8; i < 16; i++) {
-      lsb = (lsb << 8) | (clientId[i] & 0xff);
+    return msb;
+  }
+  
+  public static long getLsb(byte[] clientId) {
+    long lsb = 0;
+    for (int i = BYTE_LENGTH/2; i < BYTE_LENGTH; i++) {
+      lsb = (lsb << shiftWidth) | (clientId[i] & 0xff);
     }
-    return (new UUID(msb, lsb)).toString();
+    return lsb;
   }
   
   /** Convert from clientId string byte[] representation of clientId */

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java Tue Aug 13 21:19:53 2013
@@ -70,16 +70,8 @@ public class RetryCache {
           "Invalid clientId - length is " + clientId.length
               + " expected length " + ClientId.BYTE_LENGTH);
       // Convert UUID bytes to two longs
-      long tmp = 0;
-      for (int i=0; i<8; i++) {
-        tmp = (tmp << 8) | (clientId[i] & 0xff);
-      }
-      clientIdMsb = tmp;
-      tmp = 0;
-      for (int i=8; i<16; i++) {
-        tmp = (tmp << 8) | (clientId[i] & 0xff);
-      }
-      clientIdLsb = tmp;
+      clientIdMsb = ClientId.getMsb(clientId);
+      clientIdLsb = ClientId.getLsb(clientId);
       this.callId = callId;
       this.expirationTime = expirationTime;
     }

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java Tue Aug 13 21:19:53 2013
@@ -27,13 +27,13 @@ public class RpcConstants {
     // Hidden Constructor
   }
   
-  public static final int PING_CALL_ID = -1;
+  public static final int AUTHORIZATION_FAILED_CALL_ID = -1;
+  public static final int INVALID_CALL_ID = -2;
+  public static final int CONNECTION_CONTEXT_CALL_ID = -3;
+  public static final int PING_CALL_ID = -4;
   
   public static final byte[] DUMMY_CLIENT_ID = new byte[0];
   
-  public static final int INVALID_CALL_ID = -2;
-
-  public static final int CONNECTION_CONTEXT_CALL_ID = -3;
   
   public static final int INVALID_RETRY_COUNT = -1;
   

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Tue Aug 13 21:19:53 2013
@@ -72,8 +72,9 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.*;
+
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -1177,9 +1178,7 @@ public abstract class Server {
     public UserGroupInformation attemptingUser = null; // user name before auth
 
     // Fake 'call' for failed authorization response
-    private static final int AUTHORIZATION_FAILED_CALLID = -1;
-    
-    private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID,
+    private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
         RpcConstants.INVALID_RETRY_COUNT, null, this);
     private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
     
@@ -1276,8 +1275,28 @@ public abstract class Server {
       }
     }
 
-    private RpcSaslProto saslReadAndProcess(DataInputStream dis) throws
-        WrappedRpcServerException, InterruptedException {
+    private void saslReadAndProcess(DataInputStream dis) throws
+    WrappedRpcServerException, IOException, InterruptedException {
+      final RpcSaslProto saslMessage =
+          decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
+      switch (saslMessage.getState()) {
+        case WRAP: {
+          if (!saslContextEstablished || !useWrap) {
+            throw new WrappedRpcServerException(
+                RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+                new SaslException("Server is not wrapping data"));
+          }
+          // loops over decoded data and calls processOneRpc
+          unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());
+          break;
+        }
+        default:
+          saslProcess(saslMessage);
+      }
+    }
+
+    private void saslProcess(RpcSaslProto saslMessage)
+        throws WrappedRpcServerException, IOException, InterruptedException {
       if (saslContextEstablished) {
         throw new WrappedRpcServerException(
             RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
@@ -1286,7 +1305,7 @@ public abstract class Server {
       RpcSaslProto saslResponse = null;
       try {
         try {
-          saslResponse = processSaslMessage(dis);
+          saslResponse = processSaslMessage(saslMessage);
         } catch (IOException e) {
           IOException sendToClient = e;
           Throwable cause = e;
@@ -1310,8 +1329,6 @@ public abstract class Server {
             LOG.debug("SASL server context established. Negotiated QoP is "
                 + saslServer.getNegotiatedProperty(Sasl.QOP));
           }
-          String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
-          useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
           user = getAuthorizedUgi(saslServer.getAuthorizationID());
           if (LOG.isDebugEnabled()) {
             LOG.debug("SASL server successfully authenticated client: " + user);
@@ -1326,13 +1343,21 @@ public abstract class Server {
         throw new WrappedRpcServerException(
             RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
       }
-      return saslResponse; 
+      // send back response if any, may throw IOException
+      if (saslResponse != null) {
+        doSaslReply(saslResponse);
+      }
+      // do NOT enable wrapping until the last auth response is sent
+      if (saslContextEstablished) {
+        String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+        // SASL wrapping is only used if the connection has a QOP, and
+        // the value is not auth.  ex. auth-int & auth-priv
+        useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));        
+      }
     }
     
-    private RpcSaslProto processSaslMessage(DataInputStream dis)
+    private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
         throws IOException, InterruptedException {
-      final RpcSaslProto saslMessage =
-          decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
       RpcSaslProto saslResponse = null;
       final SaslState state = saslMessage.getState(); // required      
       switch (state) {
@@ -1517,11 +1542,6 @@ public abstract class Server {
         if (data == null) {
           dataLengthBuffer.flip();
           dataLength = dataLengthBuffer.getInt();
-          if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) {
-            // covers the !useSasl too
-            dataLengthBuffer.clear();
-            return 0; // ping message
-          }
           checkDataLength(dataLength);
           data = ByteBuffer.allocate(dataLength);
         }
@@ -1532,7 +1552,7 @@ public abstract class Server {
           dataLengthBuffer.clear();
           data.flip();
           boolean isHeaderRead = connectionContextRead;
-          processRpcRequestPacket(data.array());
+          processOneRpc(data.array());
           data = null;
           if (!isHeaderRead) {
             continue;
@@ -1695,29 +1715,19 @@ public abstract class Server {
     }
     
     /**
-     * Process a RPC Request - if SASL wrapping is enabled, unwrap the
-     * requests and process each one, else directly process the request 
-     * @param buf - single request or SASL wrapped requests
-     * @throws IOException - connection failed to authenticate or authorize,
-     *   or the request could not be decoded into a Call
+     * Process a wrapped RPC Request - unwrap the SASL packet and process
+     * each embedded RPC request 
+     * @param buf - SASL wrapped request of one or more RPCs
+     * @throws IOException - SASL packet cannot be unwrapped
      * @throws InterruptedException
      */    
-    private void processRpcRequestPacket(byte[] buf)
-        throws WrappedRpcServerException, IOException, InterruptedException {
-      if (saslContextEstablished && useWrap) {
-        if (LOG.isDebugEnabled())
-          LOG.debug("Have read input token of size " + buf.length
-              + " for processing by saslServer.unwrap()");        
-        final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length);
-        // loops over decoded data and calls processOneRpc
-        unwrapPacketAndProcessRpcs(plaintextData);
-      } else {
-        processOneRpc(buf);
-      }
-    }
-    
     private void unwrapPacketAndProcessRpcs(byte[] inBuf)
         throws WrappedRpcServerException, IOException, InterruptedException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Have read input token of size " + inBuf.length
+            + " for processing by saslServer.unwrap()");
+      }
+      inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);
       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
           inBuf));
       // Read all RPCs contained in the inBuf, even partial ones
@@ -1732,13 +1742,6 @@ public abstract class Server {
         if (unwrappedData == null) {
           unwrappedDataLengthBuffer.flip();
           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
-          if (unwrappedDataLength == RpcConstants.PING_CALL_ID) {
-            if (LOG.isDebugEnabled())
-              LOG.debug("Received ping message");
-            unwrappedDataLengthBuffer.clear();
-            continue; // ping message
-          }
           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
         }
 
@@ -1906,11 +1909,9 @@ public abstract class Server {
               RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
               "SASL protocol not requested by client");
         }
-        RpcSaslProto response = saslReadAndProcess(dis);
-        // send back response if any, may throw IOException
-        if (response != null) {
-          doSaslReply(response);
-        }
+        saslReadAndProcess(dis);
+      } else if (callId == PING_CALL_ID) {
+        LOG.debug("Received ping message");
       } else {
         throw new WrappedRpcServerException(
             RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
@@ -1924,7 +1925,7 @@ public abstract class Server {
      */
     private void authorizeConnection() throws WrappedRpcServerException {
       try {
-        // If auth method is DIGEST, the token was obtained by the
+        // If auth method is TOKEN, the token was obtained by the
         // real user for the effective user, therefore not required to
         // authorize real user. doAs is allowed only for simple or kerberos
         // authentication
@@ -2389,9 +2390,21 @@ public abstract class Server {
         LOG.debug("Adding saslServer wrapped token of size " + token.length
             + " as call response.");
       response.reset();
-      DataOutputStream saslOut = new DataOutputStream(response);
-      saslOut.writeInt(token.length);
-      saslOut.write(token, 0, token.length);
+      // rebuild with sasl header and payload
+      RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
+          .setCallId(AuthProtocol.SASL.callId)
+          .setStatus(RpcStatusProto.SUCCESS)
+          .build();
+      RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+          .setState(SaslState.WRAP)
+          .setToken(ByteString.copyFrom(token, 0, token.length))
+          .build();
+      RpcResponseMessageWrapper saslResponse =
+          new RpcResponseMessageWrapper(saslHeader, saslMessage);
+
+      DataOutputStream out = new DataOutputStream(response);
+      out.writeInt(saslResponse.getLength());
+      saslResponse.write(out);
     }
   }
   

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/file/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Tue Aug 13 21:19:53 2013
@@ -381,7 +381,7 @@ public class MetricsSystemImpl extends M
   private void snapshotMetrics(MetricsSourceAdapter sa,
                                MetricsBufferBuilder bufferBuilder) {
     long startTime = Time.now();
-    bufferBuilder.add(sa.name(), sa.getMetrics(collector, false));
+    bufferBuilder.add(sa.name(), sa.getMetrics(collector, true));
     collector.clear();
     snapshotStat.add(Time.now() - startTime);
     LOG.debug("Snapshotted source "+ sa.name());

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/compiler/generated/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/compiler/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java Tue Aug 13 21:19:53 2013
@@ -20,15 +20,20 @@ package org.apache.hadoop.security;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -47,6 +52,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.GlobPattern;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
 import org.apache.hadoop.ipc.RPC.RpcKind;
@@ -67,6 +73,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.util.ProtoUtil;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 /**
  * A utility class that encapsulates SASL logic for RPC client
@@ -82,6 +89,7 @@ public class SaslRpcClient {
   private final Configuration conf;
 
   private SaslClient saslClient;
+  private AuthMethod authMethod;
   
   private static final RpcRequestHeaderProto saslHeader = ProtoUtil
       .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
@@ -106,6 +114,24 @@ public class SaslRpcClient {
     this.conf = conf;
   }
   
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  public Object getNegotiatedProperty(String key) {
+    return (saslClient != null) ? saslClient.getNegotiatedProperty(key) : null;
+  }
+  
+
+  // the RPC Client has an inelegant way of handling expiration of TGTs
+  // acquired via a keytab.  any connection failure causes a relogin, so
+  // the Client needs to know what authMethod was being attempted if an
+  // exception occurs.  the SASL prep for a kerberos connection should
+  // ideally relogin if necessary instead of exposing this detail to the
+  // Client
+  @InterfaceAudience.Private
+  public AuthMethod getAuthMethod() {
+    return authMethod;
+  }
+  
   /**
    * Instantiate a sasl client for the first supported auth type in the
    * given list.  The auth type must be defined, enabled, and the user
@@ -256,9 +282,8 @@ public class SaslRpcClient {
    * @return String of the server's principal
    * @throws IOException - error determining configured principal
    */
-
-  // try to get the configured principal for the remote server
-  private String getServerPrincipal(SaslAuth authType) throws IOException {
+  @VisibleForTesting
+  String getServerPrincipal(SaslAuth authType) throws IOException {
     KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
     LOG.debug("Get kerberos info proto:"+protocol+" info:"+krbInfo);
     if (krbInfo == null) { // protocol has no support for kerberos
@@ -270,28 +295,37 @@ public class SaslRpcClient {
           "Can't obtain server Kerberos config key from protocol="
               + protocol.getCanonicalName());
     }
-    // construct the expected principal from the config
-    String confPrincipal = SecurityUtil.getServerPrincipal(
-        conf.get(serverKey), serverAddr.getAddress());
-    if (confPrincipal == null || confPrincipal.isEmpty()) {
-      throw new IllegalArgumentException(
-          "Failed to specify server's Kerberos principal name");
-    }
-    // ensure it looks like a host-based service principal
-    KerberosName name = new KerberosName(confPrincipal);
-    if (name.getHostName() == null) {
-      throw new IllegalArgumentException(
-          "Kerberos principal name does NOT have the expected hostname part: "
-              + confPrincipal);
+    // construct server advertised principal for comparision
+    String serverPrincipal = new KerberosPrincipal(
+        authType.getProtocol() + "/" + authType.getServerId()).getName();
+    boolean isPrincipalValid = false;
+
+    // use the pattern if defined
+    String serverKeyPattern = conf.get(serverKey + ".pattern");
+    if (serverKeyPattern != null && !serverKeyPattern.isEmpty()) {
+      Pattern pattern = GlobPattern.compile(serverKeyPattern);
+      isPrincipalValid = pattern.matcher(serverPrincipal).matches();
+    } else {
+      // check that the server advertised principal matches our conf
+      String confPrincipal = SecurityUtil.getServerPrincipal(
+          conf.get(serverKey), serverAddr.getAddress());
+      if (confPrincipal == null || confPrincipal.isEmpty()) {
+        throw new IllegalArgumentException(
+            "Failed to specify server's Kerberos principal name");
+      }
+      KerberosName name = new KerberosName(confPrincipal);
+      if (name.getHostName() == null) {
+        throw new IllegalArgumentException(
+            "Kerberos principal name does NOT have the expected hostname part: "
+                + confPrincipal);
+      }
+      isPrincipalValid = serverPrincipal.equals(confPrincipal);
     }
-    // check that the server advertised principal matches our conf
-    KerberosPrincipal serverPrincipal = new KerberosPrincipal(
-        authType.getProtocol() + "/" + authType.getServerId());
-    if (!serverPrincipal.getName().equals(confPrincipal)) {
+    if (!isPrincipalValid) {
       throw new IllegalArgumentException(
           "Server has invalid Kerberos principal: " + serverPrincipal);
     }
-    return confPrincipal;
+    return serverPrincipal;
   }
   
 
@@ -312,8 +346,9 @@ public class SaslRpcClient {
     DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
         outS));
     
-    // redefined if/when a SASL negotiation completes
-    AuthMethod authMethod = AuthMethod.SIMPLE;
+    // redefined if/when a SASL negotiation starts, can be queried if the
+    // negotiation fails
+    authMethod = AuthMethod.SIMPLE;
     
     sendSaslMessage(outStream, negotiateRequest);
     
@@ -350,6 +385,7 @@ public class SaslRpcClient {
         case NEGOTIATE: {
           // create a compatible SASL client, throws if no supported auths
           SaslAuth saslAuthType = selectSaslClient(saslMessage.getAuthsList());
+          // define auth being attempted, caller can query if connect fails
           authMethod = AuthMethod.valueOf(saslAuthType.getMethod());
           
           byte[] responseToken = null;
@@ -463,38 +499,141 @@ public class SaslRpcClient {
     return response;
   }
 
+  private boolean useWrap() {
+    // getNegotiatedProperty throws if client isn't complete
+    String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+    // SASL wrapping is only used if the connection has a QOP, and
+    // the value is not auth.  ex. auth-int & auth-priv
+    return qop != null && !"auth".equalsIgnoreCase(qop);
+  }
+  
   /**
-   * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
-   * been called.
+   * Get SASL wrapped InputStream if SASL QoP requires unwrapping,
+   * otherwise return original stream.  Can be called only after
+   * saslConnect() has been called.
    * 
-   * @param in
-   *          the InputStream to wrap
-   * @return a SASL wrapped InputStream
+   * @param in - InputStream used to make the connection
+   * @return InputStream that may be using SASL unwrap
    * @throws IOException
    */
   public InputStream getInputStream(InputStream in) throws IOException {
-    if (!saslClient.isComplete()) {
-      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    if (useWrap()) {
+      in = new WrappedInputStream(in);
     }
-    return new SaslInputStream(in, saslClient);
+    return in;
   }
 
   /**
-   * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
-   * been called.
+   * Get SASL wrapped OutputStream if SASL QoP requires wrapping,
+   * otherwise return original stream.  Can be called only after
+   * saslConnect() has been called.
    * 
-   * @param out
-   *          the OutputStream to wrap
-   * @return a SASL wrapped OutputStream
+   * @param in - InputStream used to make the connection
+   * @return InputStream that may be using SASL unwrap
    * @throws IOException
    */
   public OutputStream getOutputStream(OutputStream out) throws IOException {
-    if (!saslClient.isComplete()) {
-      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    if (useWrap()) {
+      // the client and server negotiate a maximum buffer size that can be
+      // wrapped
+      String maxBuf = (String)saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE);
+      out = new BufferedOutputStream(new WrappedOutputStream(out),
+                                     Integer.parseInt(maxBuf));
+    }
+    return out;
+  }
+
+  // ideally this should be folded into the RPC decoding loop but it's
+  // currently split across Client and SaslRpcClient...
+  class WrappedInputStream extends FilterInputStream {
+    private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0);
+    public WrappedInputStream(InputStream in) throws IOException {
+      super(in);
+    }
+    
+    @Override
+    public int read() throws IOException {
+      byte[] b = new byte[1];
+      int n = read(b, 0, 1);
+      return (n != -1) ? b[0] : -1;
+    }
+    
+    @Override
+    public int read(byte b[]) throws IOException {
+      return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(byte[] buf, int off, int len) throws IOException {
+      synchronized(unwrappedRpcBuffer) {
+        // fill the buffer with the next RPC message
+        if (unwrappedRpcBuffer.remaining() == 0) {
+          readNextRpcPacket();
+        }
+        // satisfy as much of the request as possible
+        int readLen = Math.min(len, unwrappedRpcBuffer.remaining());
+        unwrappedRpcBuffer.get(buf, off, readLen);
+        return readLen;
+      }
+    }
+    
+    // all messages must be RPC SASL wrapped, else an exception is thrown
+    private void readNextRpcPacket() throws IOException {
+      LOG.debug("reading next wrapped RPC packet");
+      DataInputStream dis = new DataInputStream(in);
+      int rpcLen = dis.readInt();
+      byte[] rpcBuf = new byte[rpcLen];
+      dis.readFully(rpcBuf);
+      
+      // decode the RPC header
+      ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf);
+      RpcResponseHeaderProto.Builder headerBuilder =
+          RpcResponseHeaderProto.newBuilder();
+      headerBuilder.mergeDelimitedFrom(bis);
+      
+      boolean isWrapped = false;
+      // Must be SASL wrapped, verify and decode.
+      if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) {
+        RpcSaslProto.Builder saslMessage = RpcSaslProto.newBuilder();
+        saslMessage.mergeDelimitedFrom(bis);
+        if (saslMessage.getState() == SaslState.WRAP) {
+          isWrapped = true;
+          byte[] token = saslMessage.getToken().toByteArray();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("unwrapping token of length:" + token.length);
+          }
+          token = saslClient.unwrap(token, 0, token.length);
+          unwrappedRpcBuffer = ByteBuffer.wrap(token);
+        }
+      }
+      if (!isWrapped) {
+        throw new SaslException("Server sent non-wrapped response");
+      }
     }
-    return new SaslOutputStream(out, saslClient);
   }
 
+  class WrappedOutputStream extends FilterOutputStream {
+    public WrappedOutputStream(OutputStream out) throws IOException {
+      super(out);
+    }
+    @Override
+    public void write(byte[] buf, int off, int len) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("wrapping token of length:" + len);
+      }
+      buf = saslClient.wrap(buf, off, len);
+      RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+          .setState(SaslState.WRAP)
+          .setToken(ByteString.copyFrom(buf, 0, buf.length))
+          .build();
+      RpcRequestMessageWrapper request =
+          new RpcRequestMessageWrapper(saslHeader, saslMessage);
+      DataOutputStream dob = new DataOutputStream(out);
+      dob.writeInt(request.getLength());
+      request.write(dob);
+     }
+  }
+  
   /** Release resources used by wrapped saslClient */
   public void dispose() throws SaslException {
     if (saslClient != null) {
@@ -550,4 +689,4 @@ public class SaslRpcClient {
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java Tue Aug 13 21:19:53 2013
@@ -47,7 +47,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server.Connection;
-import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -104,12 +103,12 @@ public class SaslRpcServer {
         String fullName = UserGroupInformation.getCurrentUser().getUserName();
         if (LOG.isDebugEnabled())
           LOG.debug("Kerberos principal name is " + fullName);
-        KerberosName krbName = new KerberosName(fullName);
-        serverId = krbName.getHostName();
-        if (serverId == null) {
-          serverId = "";
-        }
-        protocol = krbName.getServiceName();
+        // don't use KerberosName because we don't want auth_to_local
+        String[] parts = fullName.split("[/@]", 2);
+        protocol = parts[0];
+        // should verify service host is present here rather than in create()
+        // but lazy tests are using a UGI that isn't a SPN...
+        serverId = (parts.length < 2) ? "" : parts[1];
         break;
       }
       default:

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java Tue Aug 13 21:19:53 2013
@@ -123,6 +123,12 @@ abstract public class Shell {
                    : new String[] { "ln", "-s", target, link };
   }
 
+  /** Return a command to read the target of the a symbolic link*/
+  public static String[] getReadlinkCommand(String link) {
+    return WINDOWS ? new String[] { WINUTILS, "readlink", link }
+        : new String[] { "readlink", link };
+  }
+
   /** Return a command for determining if process with specified pid is alive. */
   public static String[] getCheckProcessIsAliveCommand(String pid) {
     return Shell.WINDOWS ?

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/overview.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj Tue Aug 13 21:19:53 2013
@@ -72,6 +72,7 @@
   </ItemDefinitionGroup>
   <ItemGroup>
     <ClCompile Include="src\org\apache\hadoop\io\compress\lz4\lz4.c" />
+    <ClCompile Include="src\org\apache\hadoop\io\compress\lz4\lz4hc.c" />
     <ClCompile Include="src\org\apache\hadoop\io\compress\lz4\Lz4Compressor.c" />
     <ClCompile Include="src\org\apache\hadoop\io\compress\lz4\Lz4Decompressor.c" />
     <ClCompile Include="src\org\apache\hadoop\io\nativeio\file_descriptor.c" />

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj.filters
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj.filters?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj.filters (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj.filters Tue Aug 13 21:19:53 2013
@@ -51,6 +51,9 @@
     <ClCompile Include="src\org\apache\hadoop\io\compress\lz4\lz4.c">
       <Filter>Source Files</Filter>
     </ClCompile>
+    <ClCompile Include="src\org\apache\hadoop\io\compress\lz4\lz4hc.c">
+      <Filter>Source Files</Filter>
+    </ClCompile>
     <ClCompile Include="src\org\apache\hadoop\io\compress\lz4\Lz4Compressor.c">
       <Filter>Source Files</Filter>
     </ClCompile>

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c Tue Aug 13 21:19:53 2013
@@ -23,21 +23,9 @@
 #ifdef UNIX
 #include "config.h"
 #endif // UNIX
+#include "lz4.h"
+#include "lz4hc.h"
 
-//****************************
-// Simple Functions
-//****************************
-
-extern int LZ4_compress   (const char* source, char* dest, int isize);
-
-/*
-LZ4_compress() :
- return : the number of bytes in compressed buffer dest
- note : destination buffer must be already allocated.
-  To avoid any problem, size it to handle worst cases situations (input data not compressible)
-  Worst case size is : "inputsize + 0.4%", with "0.4%" being at least 8 bytes.
-
-*/
 
 static jfieldID Lz4Compressor_clazz;
 static jfieldID Lz4Compressor_uncompressedDirectBuf;
@@ -107,5 +95,45 @@ JNIEXPORT jstring JNICALL
 Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_getLibraryName(
  JNIEnv *env, jclass class
  ) {
-  return (*env)->NewStringUTF(env, "revision:43");
+  return (*env)->NewStringUTF(env, "revision:99");
+}
+
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_compressBytesDirectHC
+(JNIEnv *env, jobject thisj){
+  const char* uncompressed_bytes = NULL;
+  char* compressed_bytes = NULL;
+
+  // Get members of Lz4Compressor
+  jobject clazz = (*env)->GetStaticObjectField(env, thisj, Lz4Compressor_clazz);
+  jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf);
+  jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen);
+  jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf);
+  jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_directBufferSize);
+
+  // Get the input direct buffer
+  LOCK_CLASS(env, clazz, "Lz4Compressor");
+  uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
+  UNLOCK_CLASS(env, clazz, "Lz4Compressor");
+
+  if (uncompressed_bytes == 0) {
+    return (jint)0;
+  }
+
+  // Get the output direct buffer
+  LOCK_CLASS(env, clazz, "Lz4Compressor");
+  compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
+  UNLOCK_CLASS(env, clazz, "Lz4Compressor");
+
+  if (compressed_bytes == 0) {
+    return (jint)0;
+  }
+
+  compressed_direct_buf_len = LZ4_compressHC(uncompressed_bytes, compressed_bytes, uncompressed_direct_buf_len);
+  if (compressed_direct_buf_len < 0){
+    THROW(env, "java/lang/InternalError", "LZ4_compressHC failed");
+  }
+
+  (*env)->SetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen, 0);
+
+  return (jint)compressed_direct_buf_len;
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c?rev=1513658&r1=1513657&r2=1513658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c Tue Aug 13 21:19:53 2013
@@ -22,18 +22,7 @@
 #ifdef UNIX
 #include "config.h"
 #endif // UNIX
-
-int LZ4_uncompress_unknownOutputSize(const char* source, char* dest, int isize, int maxOutputSize);
-
-/*
-LZ4_uncompress_unknownOutputSize() :
- isize  : is the input size, therefore the compressed size
- maxOutputSize : is the size of the destination buffer (which must be already allocated)
- return : the number of bytes decoded in the destination buffer (necessarily <= maxOutputSize)
-    If the source stream is malformed, the function will stop decoding and return a negative result, indicating the byte position of the faulty instruction
-    This version never writes beyond dest + maxOutputSize, and is therefore protected against malicious data packets
- note   : This version is a bit slower than LZ4_uncompress
-*/
+#include "lz4.h"
 
 
 static jfieldID Lz4Decompressor_clazz;
@@ -89,7 +78,7 @@ JNIEXPORT jint JNICALL Java_org_apache_h
     return (jint)0;
   }
 
-  uncompressed_direct_buf_len = LZ4_uncompress_unknownOutputSize(compressed_bytes, uncompressed_bytes, compressed_direct_buf_len, uncompressed_direct_buf_len);
+  uncompressed_direct_buf_len = LZ4_decompress_safe(compressed_bytes, uncompressed_bytes, compressed_direct_buf_len, uncompressed_direct_buf_len);
   if (uncompressed_direct_buf_len < 0) {
     THROW(env, "java/lang/InternalError", "LZ4_uncompress_unknownOutputSize failed.");
   }