You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2013/10/30 23:22:15 UTC

svn commit: r1537330 [3/7] - in /hadoop/common/branches/YARN-321/hadoop-common-project: ./ hadoop-annotations/ hadoop-auth/ hadoop-common/ hadoop-common/dev-support/ hadoop-common/src/ hadoop-common/src/main/bin/ hadoop-common/src/main/conf/ hadoop-com...

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1531125
  Merged /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java:r1519784-1537326

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java Wed Oct 30 22:21:59 2013
@@ -258,5 +258,9 @@ public class CommonConfigurationKeysPubl
   public static final String HADOOP_SSL_ENABLED_KEY = "hadoop.ssl.enabled";
   public static final boolean HADOOP_SSL_ENABLED_DEFAULT = false;
 
+
+  // HTTP policies to be used in configuration
+  public static final String HTTP_POLICY_HTTP_ONLY = "HTTP_ONLY";
+  public static final String HTTP_POLICY_HTTPS_ONLY = "HTTPS_ONLY";
 }
 

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java Wed Oct 30 22:21:59 2013
@@ -1,4 +1,5 @@
 /**
+ * 
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,17 +20,29 @@ package org.apache.hadoop.fs;
 
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.fs.ByteBufferUtil;
+import org.apache.hadoop.util.IdentityHashStore;
 
 /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
  * and buffers input through a {@link BufferedInputStream}. */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class FSDataInputStream extends DataInputStream
-    implements Seekable, PositionedReadable, Closeable,
-    ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead {
+    implements Seekable, PositionedReadable, Closeable, 
+      ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
+      HasEnhancedByteBufferAccess {
+  /**
+   * Map ByteBuffers that we have handed out to readers to ByteBufferPool 
+   * objects
+   */
+  private final IdentityHashStore<ByteBuffer, ByteBufferPool>
+    extendedReadBuffers
+      = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
 
   public FSDataInputStream(InputStream in)
     throws IOException {
@@ -167,4 +180,45 @@ public class FSDataInputStream extends D
           "support setting the drop-behind caching setting.");
     }
   }
+
+  @Override
+  public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
+      EnumSet<ReadOption> opts) 
+          throws IOException, UnsupportedOperationException {
+    try {
+      return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
+          maxLength, opts);
+    }
+    catch (ClassCastException e) {
+      ByteBuffer buffer = ByteBufferUtil.
+          fallbackRead(this, bufferPool, maxLength);
+      if (buffer != null) {
+        extendedReadBuffers.put(buffer, bufferPool);
+      }
+      return buffer;
+    }
+  }
+
+  private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET =
+      EnumSet.noneOf(ReadOption.class);
+
+  final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
+          throws IOException, UnsupportedOperationException {
+    return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
+  }
+  
+  @Override
+  public void releaseBuffer(ByteBuffer buffer) {
+    try {
+      ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
+    }
+    catch (ClassCastException e) {
+      ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("tried to release a buffer " +
+            "that was not created by this stream.");
+      }
+      bufferPool.putBuffer(buffer);
+    }
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java Wed Oct 30 22:21:59 2013
@@ -18,9 +18,11 @@
 package org.apache.hadoop.fs;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ZeroCopyUnavailableException;
 
 /****************************************************************
  * FSInputStream is a generic old InputStream with a little bit

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java Wed Oct 30 22:21:59 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.ref.WeakReference;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
@@ -31,6 +32,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -2502,28 +2504,149 @@ public abstract class FileSystem extends
     }
   }
   
+  /**
+   * Tracks statistics about how many reads, writes, and so forth have been
+   * done in a FileSystem.
+   * 
+   * Since there is only one of these objects per FileSystem, there will 
+   * typically be many threads writing to this object.  Almost every operation
+   * on an open file will involve a write to this object.  In contrast, reading
+   * statistics is done infrequently by most programs, and not at all by others.
+   * Hence, this is optimized for writes.
+   * 
+   * Each thread writes to its own thread-local area of memory.  This removes 
+   * contention and allows us to scale up to many, many threads.  To read
+   * statistics, the reader thread totals up the contents of all of the 
+   * thread-local data areas.
+   */
   public static final class Statistics {
+    /**
+     * Statistics data.
+     * 
+     * There is only a single writer to thread-local StatisticsData objects.
+     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
+     * to prevent lost updates.
+     * The Java specification guarantees that updates to volatile longs will
+     * be perceived as atomic with respect to other threads, which is all we
+     * need.
+     */
+    private static class StatisticsData {
+      volatile long bytesRead;
+      volatile long bytesWritten;
+      volatile int readOps;
+      volatile int largeReadOps;
+      volatile int writeOps;
+      /**
+       * Stores a weak reference to the thread owning this StatisticsData.
+       * This allows us to remove StatisticsData objects that pertain to
+       * threads that no longer exist.
+       */
+      final WeakReference<Thread> owner;
+
+      StatisticsData(WeakReference<Thread> owner) {
+        this.owner = owner;
+      }
+
+      /**
+       * Add another StatisticsData object to this one.
+       */
+      void add(StatisticsData other) {
+        this.bytesRead += other.bytesRead;
+        this.bytesWritten += other.bytesWritten;
+        this.readOps += other.readOps;
+        this.largeReadOps += other.largeReadOps;
+        this.writeOps += other.writeOps;
+      }
+
+      /**
+       * Negate the values of all statistics.
+       */
+      void negate() {
+        this.bytesRead = -this.bytesRead;
+        this.bytesWritten = -this.bytesWritten;
+        this.readOps = -this.readOps;
+        this.largeReadOps = -this.largeReadOps;
+        this.writeOps = -this.writeOps;
+      }
+
+      @Override
+      public String toString() {
+        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
+            + readOps + " read ops, " + largeReadOps + " large read ops, "
+            + writeOps + " write ops";
+      }
+    }
+
+    private interface StatisticsAggregator<T> {
+      void accept(StatisticsData data);
+      T aggregate();
+    }
+
     private final String scheme;
-    private AtomicLong bytesRead = new AtomicLong();
-    private AtomicLong bytesWritten = new AtomicLong();
-    private AtomicInteger readOps = new AtomicInteger();
-    private AtomicInteger largeReadOps = new AtomicInteger();
-    private AtomicInteger writeOps = new AtomicInteger();
+
+    /**
+     * rootData is data that doesn't belong to any thread, but will be added
+     * to the totals.  This is useful for making copies of Statistics objects,
+     * and for storing data that pertains to threads that have been garbage
+     * collected.  Protected by the Statistics lock.
+     */
+    private final StatisticsData rootData;
+
+    /**
+     * Thread-local data.
+     */
+    private final ThreadLocal<StatisticsData> threadData;
     
+    /**
+     * List of all thread-local data areas.  Protected by the Statistics lock.
+     */
+    private LinkedList<StatisticsData> allData;
+
     public Statistics(String scheme) {
       this.scheme = scheme;
+      this.rootData = new StatisticsData(null);
+      this.threadData = new ThreadLocal<StatisticsData>();
+      this.allData = null;
     }
 
     /**
      * Copy constructor.
      * 
-     * @param st
-     *          The input Statistics object which is cloned.
+     * @param other    The input Statistics object which is cloned.
      */
-    public Statistics(Statistics st) {
-      this.scheme = st.scheme;
-      this.bytesRead = new AtomicLong(st.bytesRead.longValue());
-      this.bytesWritten = new AtomicLong(st.bytesWritten.longValue());
+    public Statistics(Statistics other) {
+      this.scheme = other.scheme;
+      this.rootData = new StatisticsData(null);
+      other.visitAll(new StatisticsAggregator<Void>() {
+        @Override
+        public void accept(StatisticsData data) {
+          rootData.add(data);
+        }
+
+        public Void aggregate() {
+          return null;
+        }
+      });
+      this.threadData = new ThreadLocal<StatisticsData>();
+    }
+
+    /**
+     * Get or create the thread-local data associated with the current thread.
+     */
+    private StatisticsData getThreadData() {
+      StatisticsData data = threadData.get();
+      if (data == null) {
+        data = new StatisticsData(
+            new WeakReference<Thread>(Thread.currentThread()));
+        threadData.set(data);
+        synchronized(this) {
+          if (allData == null) {
+            allData = new LinkedList<StatisticsData>();
+          }
+          allData.add(data);
+        }
+      }
+      return data;
     }
 
     /**
@@ -2531,7 +2654,7 @@ public abstract class FileSystem extends
      * @param newBytes the additional bytes read
      */
     public void incrementBytesRead(long newBytes) {
-      bytesRead.getAndAdd(newBytes);
+      getThreadData().bytesRead += newBytes;
     }
     
     /**
@@ -2539,7 +2662,7 @@ public abstract class FileSystem extends
      * @param newBytes the additional bytes written
      */
     public void incrementBytesWritten(long newBytes) {
-      bytesWritten.getAndAdd(newBytes);
+      getThreadData().bytesWritten += newBytes;
     }
     
     /**
@@ -2547,7 +2670,7 @@ public abstract class FileSystem extends
      * @param count number of read operations
      */
     public void incrementReadOps(int count) {
-      readOps.getAndAdd(count);
+      getThreadData().readOps += count;
     }
 
     /**
@@ -2555,7 +2678,7 @@ public abstract class FileSystem extends
      * @param count number of large read operations
      */
     public void incrementLargeReadOps(int count) {
-      largeReadOps.getAndAdd(count);
+      getThreadData().largeReadOps += count;
     }
 
     /**
@@ -2563,7 +2686,38 @@ public abstract class FileSystem extends
      * @param count number of write operations
      */
     public void incrementWriteOps(int count) {
-      writeOps.getAndAdd(count);
+      getThreadData().writeOps += count;
+    }
+
+    /**
+     * Apply the given aggregator to all StatisticsData objects associated with
+     * this Statistics object.
+     *
+     * For each StatisticsData object, we will call accept on the visitor.
+     * Finally, at the end, we will call aggregate to get the final total. 
+     *
+     * @param         The visitor to use.
+     * @return        The total.
+     */
+    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
+      visitor.accept(rootData);
+      if (allData != null) {
+        for (Iterator<StatisticsData> iter = allData.iterator();
+            iter.hasNext(); ) {
+          StatisticsData data = iter.next();
+          visitor.accept(data);
+          if (data.owner.get() == null) {
+            /*
+             * If the thread that created this thread-local data no
+             * longer exists, remove the StatisticsData from our list
+             * and fold the values into rootData.
+             */
+            rootData.add(data);
+            iter.remove();
+          }
+        }
+      }
+      return visitor.aggregate();
     }
 
     /**
@@ -2571,7 +2725,18 @@ public abstract class FileSystem extends
      * @return the number of bytes
      */
     public long getBytesRead() {
-      return bytesRead.get();
+      return visitAll(new StatisticsAggregator<Long>() {
+        private long bytesRead = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          bytesRead += data.bytesRead;
+        }
+
+        public Long aggregate() {
+          return bytesRead;
+        }
+      });
     }
     
     /**
@@ -2579,7 +2744,18 @@ public abstract class FileSystem extends
      * @return the number of bytes
      */
     public long getBytesWritten() {
-      return bytesWritten.get();
+      return visitAll(new StatisticsAggregator<Long>() {
+        private long bytesWritten = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          bytesWritten += data.bytesWritten;
+        }
+
+        public Long aggregate() {
+          return bytesWritten;
+        }
+      });
     }
     
     /**
@@ -2587,7 +2763,19 @@ public abstract class FileSystem extends
      * @return number of read operations
      */
     public int getReadOps() {
-      return readOps.get() + largeReadOps.get();
+      return visitAll(new StatisticsAggregator<Integer>() {
+        private int readOps = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          readOps += data.readOps;
+          readOps += data.largeReadOps;
+        }
+
+        public Integer aggregate() {
+          return readOps;
+        }
+      });
     }
 
     /**
@@ -2596,7 +2784,18 @@ public abstract class FileSystem extends
      * @return number of large read operations
      */
     public int getLargeReadOps() {
-      return largeReadOps.get();
+      return visitAll(new StatisticsAggregator<Integer>() {
+        private int largeReadOps = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          largeReadOps += data.largeReadOps;
+        }
+
+        public Integer aggregate() {
+          return largeReadOps;
+        }
+      });
     }
 
     /**
@@ -2605,22 +2804,70 @@ public abstract class FileSystem extends
      * @return number of write operations
      */
     public int getWriteOps() {
-      return writeOps.get();
+      return visitAll(new StatisticsAggregator<Integer>() {
+        private int writeOps = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          writeOps += data.writeOps;
+        }
+
+        public Integer aggregate() {
+          return writeOps;
+        }
+      });
     }
 
+
     @Override
     public String toString() {
-      return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
-          + readOps + " read ops, " + largeReadOps + " large read ops, "
-          + writeOps + " write ops";
+      return visitAll(new StatisticsAggregator<String>() {
+        private StatisticsData total = new StatisticsData(null);
+
+        @Override
+        public void accept(StatisticsData data) {
+          total.add(data);
+        }
+
+        public String aggregate() {
+          return total.toString();
+        }
+      });
     }
-    
+
     /**
-     * Reset the counts of bytes to 0.
+     * Resets all statistics to 0.
+     *
+     * In order to reset, we add up all the thread-local statistics data, and
+     * set rootData to the negative of that.
+     *
+     * This may seem like a counterintuitive way to reset the statsitics.  Why
+     * can't we just zero out all the thread-local data?  Well, thread-local
+     * data can only be modified by the thread that owns it.  If we tried to
+     * modify the thread-local data from this thread, our modification might get
+     * interleaved with a read-modify-write operation done by the thread that
+     * owns the data.  That would result in our update getting lost.
+     *
+     * The approach used here avoids this problem because it only ever reads
+     * (not writes) the thread-local data.  Both reads and writes to rootData
+     * are done under the lock, so we're free to modify rootData from any thread
+     * that holds the lock.
      */
     public void reset() {
-      bytesWritten.set(0);
-      bytesRead.set(0);
+      visitAll(new StatisticsAggregator<Void>() {
+        private StatisticsData total = new StatisticsData(null);
+
+        @Override
+        public void accept(StatisticsData data) {
+          total.add(data);
+        }
+
+        public Void aggregate() {
+          total.negate();
+          rootData.add(total);
+          return null;
+        }
+      });
     }
     
     /**

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java Wed Oct 30 22:21:59 2013
@@ -1239,6 +1239,9 @@ public class FileUtil {
     List<String> classPathEntryList = new ArrayList<String>(
       classPathEntries.length);
     for (String classPathEntry: classPathEntries) {
+      if (classPathEntry.length() == 0) {
+        continue;
+      }
       if (classPathEntry.endsWith("*")) {
         // Append all jars that match the wildcard
         Path globPath = new Path(classPathEntry).suffix("{.jar,.JAR}");
@@ -1252,7 +1255,14 @@ public class FileUtil {
         }
       } else {
         // Append just this entry
-        String classPathEntryUrl = new File(classPathEntry).toURI().toURL()
+        File fileCpEntry = null;
+        if(!new Path(classPathEntry).isAbsolute()) {
+          fileCpEntry = new File(workingDir, classPathEntry);
+        }
+        else {
+          fileCpEntry = new File(classPathEntry);
+        }
+        String classPathEntryUrl = fileCpEntry.toURI().toURL()
           .toExternalForm();
 
         // File.toURI only appends trailing '/' if it can determine that it is a

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java Wed Oct 30 22:21:59 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.fs;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -50,38 +51,26 @@ class Globber {
     this.filter = filter;
   }
 
-  private FileStatus getFileStatus(Path path) {
+  private FileStatus getFileStatus(Path path) throws IOException {
     try {
       if (fs != null) {
         return fs.getFileStatus(path);
       } else {
         return fc.getFileStatus(path);
       }
-    } catch (IOException e) {
+    } catch (FileNotFoundException e) {
       return null;
     }
   }
 
-  private FileStatus getFileLinkStatus(Path path) {
-    try {
-      if (fs != null) {
-        return fs.getFileLinkStatus(path);
-      } else {
-        return fc.getFileLinkStatus(path);
-      }
-    } catch (IOException e) {
-      return null;
-    }
-  }
-
-  private FileStatus[] listStatus(Path path) {
+  private FileStatus[] listStatus(Path path) throws IOException {
     try {
       if (fs != null) {
         return fs.listStatus(path);
       } else {
         return fc.util().listStatus(path);
       }
-    } catch (IOException e) {
+    } catch (FileNotFoundException e) {
       return new FileStatus[0];
     }
   }
@@ -95,6 +84,15 @@ class Globber {
   }
 
   /**
+   * Convert a path component that contains backslash ecape sequences to a
+   * literal string.  This is necessary when you want to explicitly refer to a
+   * path that contains globber metacharacters.
+   */
+  private static String unescapePathComponent(String name) {
+    return name.replaceAll("\\\\(.)", "$1");
+  }
+
+  /**
    * Translate an absolute path into a list of path components.
    * We merge double slashes into a single slash here.
    * POSIX root path, i.e. '/', does not get an entry in the list.
@@ -134,18 +132,6 @@ class Globber {
     return authority ;
   }
 
-  /**
-   * The glob filter builds a regexp per path component.  If the component
-   * does not contain a shell metachar, then it falls back to appending the
-   * raw string to the list of built up paths.  This raw path needs to have
-   * the quoting removed.  Ie. convert all occurrences of "\X" to "X"
-   * @param name of the path component
-   * @return the unquoted path component
-   */
-  private static String unquotePathComponent(String name) {
-    return name.replaceAll("\\\\(.)", "$1");
-  }
-
   public FileStatus[] glob() throws IOException {
     // First we get the scheme and authority of the pattern that was passed
     // in.
@@ -189,39 +175,54 @@ class Globber {
             new Path(scheme, authority, Path.SEPARATOR)));
       }
       
-      for (String component : components) {
+      for (int componentIdx = 0; componentIdx < components.size();
+          componentIdx++) {
         ArrayList<FileStatus> newCandidates =
             new ArrayList<FileStatus>(candidates.size());
-        GlobFilter globFilter = new GlobFilter(component);
+        GlobFilter globFilter = new GlobFilter(components.get(componentIdx));
+        String component = unescapePathComponent(components.get(componentIdx));
         if (globFilter.hasPattern()) {
           sawWildcard = true;
         }
         if (candidates.isEmpty() && sawWildcard) {
+          // Optimization: if there are no more candidates left, stop examining 
+          // the path components.  We can only do this if we've already seen
+          // a wildcard component-- otherwise, we still need to visit all path 
+          // components in case one of them is a wildcard.
           break;
         }
-        for (FileStatus candidate : candidates) {
-          FileStatus resolvedCandidate = candidate;
-          if (candidate.isSymlink()) {
-            // We have to resolve symlinks, because otherwise we don't know
-            // whether they are directories.
-            resolvedCandidate = getFileStatus(candidate.getPath());
-          }
-          if (resolvedCandidate == null ||
-              resolvedCandidate.isDirectory() == false) {
-            continue;
+        if ((componentIdx < components.size() - 1) &&
+            (!globFilter.hasPattern())) {
+          // Optimization: if this is not the terminal path component, and we 
+          // are not matching against a glob, assume that it exists.  If it 
+          // doesn't exist, we'll find out later when resolving a later glob
+          // or the terminal path component.
+          for (FileStatus candidate : candidates) {
+            candidate.setPath(new Path(candidate.getPath(), component));
           }
-          // For components without pattern, we get its FileStatus directly
-          // using getFileLinkStatus for two reasons:
-          // 1. It should be faster to only get FileStatus needed rather than
-          //    get all children.
-          // 2. Some special filesystem directories (e.g. HDFS snapshot
-          //    directories) are not returned by listStatus, but do exist if
-          //    checked explicitly via getFileLinkStatus.
+          continue;
+        }
+        for (FileStatus candidate : candidates) {
           if (globFilter.hasPattern()) {
             FileStatus[] children = listStatus(candidate.getPath());
+            if (children.length == 1) {
+              // If we get back only one result, this could be either a listing
+              // of a directory with one entry, or it could reflect the fact
+              // that what we listed resolved to a file.
+              //
+              // Unfortunately, we can't just compare the returned paths to
+              // figure this out.  Consider the case where you have /a/b, where
+              // b is a symlink to "..".  In that case, listing /a/b will give
+              // back "/a/b" again.  If we just went by returned pathname, we'd
+              // incorrectly conclude that /a/b was a file and should not match
+              // /a/*/*.  So we use getFileStatus of the path we just listed to
+              // disambiguate.
+              if (!getFileStatus(candidate.getPath()).isDirectory()) {
+                continue;
+              }
+            }
             for (FileStatus child : children) {
               // Set the child path based on the parent path.
-              // This keeps the symlinks in our path.
               child.setPath(new Path(candidate.getPath(),
                       child.getPath().getName()));
               if (globFilter.accept(child.getPath())) {
@@ -229,13 +230,17 @@ class Globber {
               }
             }
           } else {
-            Path p = new Path(candidate.getPath(), unquotePathComponent(component));
-            FileStatus s = getFileLinkStatus(p);
-            if (s != null) {
-              s.setPath(p);
-              newCandidates.add(s);
-            }
-          }
+            // When dealing with non-glob components, use getFileStatus 
+            // instead of listStatus.  This is an optimization, but it also
+            // is necessary for correctness in HDFS, since there are some
+            // special HDFS directories like .reserved and .snapshot that are
+            // not visible to listStatus, but which do exist.  (See HADOOP-9877)
+            FileStatus childStatus = getFileStatus(
+                new Path(candidate.getPath(), component));
+            if (childStatus != null) {
+              newCandidates.add(childStatus);
+             }
+           }
         }
         candidates = newCandidates;
       }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java Wed Oct 30 22:21:59 2013
@@ -17,20 +17,6 @@
  */
 package org.apache.hadoop.fs;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.HashMap;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +26,14 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.Progressable;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.util.*;
+
 /**
  * This is an implementation of the Hadoop Archive 
  * Filesystem. This archive Filesystem has index files
@@ -53,7 +47,7 @@ import org.apache.hadoop.util.Progressab
  * index for ranges of hashcodes.
  */
 
-public class HarFileSystem extends FilterFileSystem {
+public class HarFileSystem extends FileSystem {
 
   private static final Log LOG = LogFactory.getLog(HarFileSystem.class);
 
@@ -75,11 +69,13 @@ public class HarFileSystem extends Filte
   // pointer into the static metadata cache
   private HarMetaData metadata;
 
+  private FileSystem fs;
+
   /**
    * public construction of harfilesystem
-   *
    */
   public HarFileSystem() {
+    // Must call #initialize() method to set the underlying file system
   }
 
   /**
@@ -96,10 +92,11 @@ public class HarFileSystem extends Filte
   /**
    * Constructor to create a HarFileSystem with an
    * underlying filesystem.
-   * @param fs
+   * @param fs underlying file system
    */
   public HarFileSystem(FileSystem fs) {
-    super(fs);
+    this.fs = fs;
+    this.statistics = fs.statistics;
   }
  
   private synchronized void initializeMetadataCache(Configuration conf) {
@@ -171,6 +168,11 @@ public class HarFileSystem extends Filte
     }
   }
 
+  @Override
+  public Configuration getConf() {
+    return fs.getConf();
+  }
+
   // get the version of the filesystem from the masterindex file
   // the version is currently not useful since its the first version
   // of archives
@@ -236,8 +238,7 @@ public class HarFileSystem extends Filte
       throw new IOException("query component in Path not supported  " + rawURI);
     }
  
-    URI tmp = null;
- 
+    URI tmp;
     try {
       // convert <scheme>-<host> to <scheme>://<host>
       URI baseUri = new URI(authority.replaceFirst("-", "://"));
@@ -256,7 +257,7 @@ public class HarFileSystem extends Filte
     return URLDecoder.decode(str, "UTF-8");
   }
 
-  private String decodeFileName(String fname) 
+  private String decodeFileName(String fname)
     throws UnsupportedEncodingException {
     int version = metadata.getVersion();
     if (version == 2 || version == 3){
@@ -272,19 +273,30 @@ public class HarFileSystem extends Filte
   public Path getWorkingDirectory() {
     return new Path(uri.toString());
   }
-  
+
+  @Override
+  public Path getInitialWorkingDirectory() {
+    return getWorkingDirectory();
+  }
+
+  @Override
+  public FsStatus getStatus(Path p) throws IOException {
+    return fs.getStatus(p);
+  }
+
   /**
    * Create a har specific auth 
    * har-underlyingfs:port
-   * @param underLyingURI the uri of underlying
+   * @param underLyingUri the uri of underlying
    * filesystem
    * @return har specific auth
    */
   private String getHarAuth(URI underLyingUri) {
     String auth = underLyingUri.getScheme() + "-";
     if (underLyingUri.getHost() != null) {
-      auth += underLyingUri.getHost() + ":";
+      auth += underLyingUri.getHost();
       if (underLyingUri.getPort() != -1) {
+        auth += ":";
         auth +=  underLyingUri.getPort();
       }
     }
@@ -293,7 +305,21 @@ public class HarFileSystem extends Filte
     }
     return auth;
   }
-  
+
+  /**
+   * Used for delegation token related functionality. Must delegate to
+   * underlying file system.
+   */
+  @Override
+  protected URI getCanonicalUri() {
+    return fs.getCanonicalUri();
+  }
+
+  @Override
+  protected URI canonicalizeUri(URI uri) {
+    return fs.canonicalizeUri(uri);
+  }
+
   /**
    * Returns the uri of this filesystem.
    * The uri is of the form 
@@ -304,6 +330,16 @@ public class HarFileSystem extends Filte
     return this.uri;
   }
   
+  @Override
+  protected void checkPath(Path path) {
+    fs.checkPath(path);
+  }
+
+  @Override
+  public Path resolvePath(Path p) throws IOException {
+    return fs.resolvePath(p);
+  }
+
   /**
    * this method returns the path 
    * inside the har filesystem.
@@ -418,7 +454,7 @@ public class HarFileSystem extends Filte
   /**
    * Get block locations from the underlying fs and fix their
    * offsets and lengths.
-   * @param file the input filestatus to get block locations
+   * @param file the input file status to get block locations
    * @param start the start of the desired range in the contained file
    * @param len the length of the desired range
    * @return block locations for this segment of file
@@ -440,8 +476,7 @@ public class HarFileSystem extends Filte
   }
   
   /**
-   * the hash of the path p inside iniside
-   * the filesystem
+   * the hash of the path p inside  the filesystem
    * @param p the path in the harfilesystem
    * @return the hash code of the path.
    */
@@ -474,13 +509,9 @@ public class HarFileSystem extends Filte
    *          the parent path directory
    * @param statuses
    *          the list to add the children filestatuses to
-   * @param children
-   *          the string list of children for this parent
-   * @param archiveIndexStat
-   *          the archive index filestatus
    */
-  private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses,
-      List<String> children) throws IOException {
+  private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses)
+          throws IOException {
     String parentString = parent.getName();
     if (!parentString.endsWith(Path.SEPARATOR)){
         parentString += Path.SEPARATOR;
@@ -546,7 +577,7 @@ public class HarFileSystem extends Filte
   // stored in a single line in the index files 
   // the format is of the form 
   // filename "dir"/"file" partFileName startIndex length 
-  // <space seperated children>
+  // <space separated children>
   private class HarStatus {
     boolean isDir;
     String name;
@@ -665,7 +696,6 @@ public class HarFileSystem extends Filte
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
     // get the fs DataInputStream for the underlying file
     HarStatus hstatus = getFileHarStatus(f);
-    // we got it.. woo hooo!!! 
     if (hstatus.isDir()) {
       throw new FileNotFoundException(f + " : not a file in " +
                 archivePath);
@@ -674,20 +704,39 @@ public class HarFileSystem extends Filte
         hstatus.getPartName()),
         hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
   }
- 
+
+  /**
+   * Used for delegation token related functionality. Must delegate to
+   * underlying file system.
+   */
+  @Override
+  public FileSystem[] getChildFileSystems() {
+    return new FileSystem[]{fs};
+  }
+
   @Override
-  public FSDataOutputStream create(Path f,
-      FsPermission permission,
-      boolean overwrite,
-      int bufferSize,
-      short replication,
-      long blockSize,
+  public FSDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
     throw new IOException("Har: create not allowed.");
   }
-  
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
+      int bufferSize, short replication, long blockSize, Progressable progress)
+      throws IOException {
+    throw new IOException("Har: create not allowed.");
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+    throw new IOException("Har: append not allowed.");
+  }
+
   @Override
   public void close() throws IOException {
+    super.close();
     if (fs != null) {
       try {
         fs.close();
@@ -703,9 +752,19 @@ public class HarFileSystem extends Filte
    */
   @Override
   public boolean setReplication(Path src, short replication) throws IOException{
-    throw new IOException("Har: setreplication not allowed");
+    throw new IOException("Har: setReplication not allowed");
   }
-  
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    throw new IOException("Har: rename not allowed");
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f) throws IOException {
+    throw new IOException("Har: append not allowed");
+  }
+
   /**
    * Not implemented.
    */
@@ -713,7 +772,7 @@ public class HarFileSystem extends Filte
   public boolean delete(Path f, boolean recursive) throws IOException { 
     throw new IOException("Har: delete not allowed");
   }
-  
+
   /**
    * liststatus returns the children of a directory 
    * after looking up the index files.
@@ -732,7 +791,7 @@ public class HarFileSystem extends Filte
       throw new FileNotFoundException("File " + f + " not found in " + archivePath);
     }
     if (hstatus.isDir()) {
-      fileStatusesInIndex(hstatus, statuses, hstatus.children);
+      fileStatusesInIndex(hstatus, statuses);
     } else {
       statuses.add(toFileStatus(hstatus, null));
     }
@@ -747,7 +806,7 @@ public class HarFileSystem extends Filte
   public Path getHomeDirectory() {
     return new Path(uri.toString());
   }
-  
+
   @Override
   public void setWorkingDirectory(Path newDir) {
     //does nothing.
@@ -765,11 +824,17 @@ public class HarFileSystem extends Filte
    * not implemented.
    */
   @Override
-  public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws 
-        IOException {
+  public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+      Path src, Path dst) throws IOException {
     throw new IOException("Har: copyfromlocalfile not allowed");
   }
-  
+
+  @Override
+  public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+      Path[] srcs, Path dst) throws IOException {
+    throw new IOException("Har: copyfromlocalfile not allowed");
+  }
+
   /**
    * copies the file in the har filesystem to a local file.
    */
@@ -806,11 +871,16 @@ public class HarFileSystem extends Filte
     throw new IOException("Har: setowner not allowed");
   }
 
+  @Override
+  public void setTimes(Path p, long mtime, long atime) throws IOException {
+    throw new IOException("Har: setTimes not allowed");
+  }
+
   /**
    * Not implemented.
    */
   @Override
-  public void setPermission(Path p, FsPermission permisssion) 
+  public void setPermission(Path p, FsPermission permission)
     throws IOException {
     throw new IOException("Har: setPermission not allowed");
   }
@@ -828,11 +898,15 @@ public class HarFileSystem extends Filte
       private long position, start, end;
       //The underlying data input stream that the
       // underlying filesystem will return.
-      private FSDataInputStream underLyingStream;
+      private final FSDataInputStream underLyingStream;
       //one byte buffer
-      private byte[] oneBytebuff = new byte[1];
+      private final byte[] oneBytebuff = new byte[1];
+      
       HarFsInputStream(FileSystem fs, Path path, long start,
           long length, int bufferSize) throws IOException {
+        if (length < 0) {
+          throw new IllegalArgumentException("Negative length ["+length+"]");
+        }
         underLyingStream = fs.open(path, bufferSize);
         underLyingStream.seek(start);
         // the start of this file in the part file
@@ -846,7 +920,7 @@ public class HarFileSystem extends Filte
       @Override
       public synchronized int available() throws IOException {
         long remaining = end - underLyingStream.getPos();
-        if (remaining > (long)Integer.MAX_VALUE) {
+        if (remaining > Integer.MAX_VALUE) {
           return Integer.MAX_VALUE;
         }
         return (int) remaining;
@@ -878,10 +952,14 @@ public class HarFileSystem extends Filte
         return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff);
       }
       
+      // NB: currently this method actually never executed becusae
+      // java.io.DataInputStream.read(byte[]) directly delegates to 
+      // method java.io.InputStream.read(byte[], int, int).
+      // However, potentially it can be invoked, so leave it intact for now.
       @Override
       public synchronized int read(byte[] b) throws IOException {
-        int ret = read(b, 0, b.length);
-        if (ret != -1) {
+        final int ret = read(b, 0, b.length);
+        if (ret > 0) {
           position += ret;
         }
         return ret;
@@ -899,7 +977,7 @@ public class HarFileSystem extends Filte
           newlen = (int) (end - position);
         }
         // end case
-        if (newlen == 0) 
+        if (newlen == 0)
           return ret;
         ret = underLyingStream.read(b, offset, newlen);
         position += ret;
@@ -910,15 +988,19 @@ public class HarFileSystem extends Filte
       public synchronized long skip(long n) throws IOException {
         long tmpN = n;
         if (tmpN > 0) {
-          if (position + tmpN > end) {
-            tmpN = end - position;
-          }
+          final long actualRemaining = end - position; 
+          if (tmpN > actualRemaining) {
+            tmpN = actualRemaining;
+          }   
           underLyingStream.seek(tmpN + position);
           position += tmpN;
           return tmpN;
-        }
-        return (tmpN < 0)? -1 : 0;
-      }
+        }   
+        // NB: the contract is described in java.io.InputStream.skip(long):
+        // this method returns the number of bytes actually skipped, so,
+        // the return value should never be negative. 
+        return 0;
+      }   
       
       @Override
       public synchronized long getPos() throws IOException {
@@ -926,18 +1008,27 @@ public class HarFileSystem extends Filte
       }
       
       @Override
-      public synchronized void seek(long pos) throws IOException {
-        if (pos < 0 || (start + pos > end)) {
-          throw new IOException("Failed to seek: EOF");
-        }
+      public synchronized void seek(final long pos) throws IOException {
+        validatePosition(pos);
         position = start + pos;
         underLyingStream.seek(position);
       }
 
+      private void validatePosition(final long pos) throws IOException {
+        if (pos < 0) {
+          throw new IOException("Negative position: "+pos);
+         }
+         final long length = end - start;
+         if (pos > length) {
+           throw new IOException("Position behind the end " +
+               "of the stream (length = "+length+"): " + pos);
+         }
+      }
+
       @Override
       public boolean seekToNewSource(long targetPos) throws IOException {
-        //do not need to implement this
-        // hdfs in itself does seektonewsource 
+        // do not need to implement this
+        // hdfs in itself does seektonewsource
         // while reading.
         return false;
       }
@@ -950,7 +1041,12 @@ public class HarFileSystem extends Filte
       throws IOException {
         int nlength = length;
         if (start + nlength + pos > end) {
-          nlength = (int) (end - (start + pos));
+          // length corrected to the real remaining length:
+          nlength = (int) (end - start - pos);
+        }
+        if (nlength <= 0) {
+          // EOS:
+          return -1;
         }
         return underLyingStream.read(pos + start , b, offset, nlength);
       }
@@ -973,14 +1069,12 @@ public class HarFileSystem extends Filte
       }
 
       @Override
-      public void setReadahead(Long readahead)
-          throws IOException, UnsupportedEncodingException {
+      public void setReadahead(Long readahead) throws IOException {
         underLyingStream.setReadahead(readahead);
       }
 
       @Override
-      public void setDropBehind(Boolean dropBehind)
-          throws IOException, UnsupportedEncodingException {
+      public void setDropBehind(Boolean dropBehind) throws IOException {
         underLyingStream.setDropBehind(dropBehind);
       }
     }
@@ -998,19 +1092,6 @@ public class HarFileSystem extends Filte
         long length, int bufsize) throws IOException {
         super(new HarFsInputStream(fs, p, start, length, bufsize));
     }
-
-    /**
-     * constructor for har input stream.
-     * @param fs the underlying filesystem
-     * @param p the path in the underlying file system
-     * @param start the start position in the part file
-     * @param length the length of valid data in the part file.
-     * @throws IOException
-     */
-    public HarFSDataInputStream(FileSystem fs, Path  p, long start, long length)
-      throws IOException {
-        super(new HarFsInputStream(fs, p, start, length, 0));
-    }
   }
 
   private class HarMetaData {
@@ -1057,7 +1138,7 @@ public class HarFileSystem extends Filte
     }
 
     private void parseMetaData() throws IOException {
-      Text line;
+      Text line = new Text();
       long read;
       FSDataInputStream in = null;
       LineReader lin = null;
@@ -1067,7 +1148,6 @@ public class HarFileSystem extends Filte
         FileStatus masterStat = fs.getFileStatus(masterIndexPath);
         masterIndexTimestamp = masterStat.getModificationTime();
         lin = new LineReader(in, getConf());
-        line = new Text();
         read = lin.readLine(line);
 
         // the first line contains the version of the index file
@@ -1081,7 +1161,7 @@ public class HarFileSystem extends Filte
         }
 
         // each line contains a hashcode range and the index file name
-        String[] readStr = null;
+        String[] readStr;
         while(read < masterStat.getLen()) {
           int b = lin.readLine(line);
           read += b;
@@ -1093,6 +1173,9 @@ public class HarFileSystem extends Filte
               endHash));
           line.clear();
         }
+      } catch (IOException ioe) {
+        LOG.warn("Encountered exception ", ioe);
+        throw ioe;
       } finally {
         IOUtils.cleanup(LOG, lin, in);
       }
@@ -1144,4 +1227,43 @@ public class HarFileSystem extends Filte
         return size() > MAX_ENTRIES;
     }
   }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public FsServerDefaults getServerDefaults() throws IOException {
+    return fs.getServerDefaults();
+  }
+
+  @Override
+  public FsServerDefaults getServerDefaults(Path f) throws IOException {
+    return fs.getServerDefaults(f);
+  }
+
+  @Override
+  public long getUsed() throws IOException{
+    return fs.getUsed();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public long getDefaultBlockSize() {
+    return fs.getDefaultBlockSize();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public long getDefaultBlockSize(Path f) {
+    return fs.getDefaultBlockSize(f);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public short getDefaultReplication() {
+    return fs.getDefaultReplication();
+  }
+
+  @Override
+  public short getDefaultReplication(Path f) {
+    return fs.getDefaultReplication(f);
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java Wed Oct 30 22:21:59 2013
@@ -218,10 +218,13 @@ public class Path implements Comparable 
    */
   public static Path mergePaths(Path path1, Path path2) {
     String path2Str = path2.toUri().getPath();
-    if(hasWindowsDrive(path2Str)) {
-      path2Str = path2Str.substring(path2Str.indexOf(':')+1);
-    }
-    return new Path(path1 + path2Str);
+    path2Str = path2Str.substring(startPositionWithoutWindowsDrive(path2Str));
+    // Add path components explicitly, because simply concatenating two path
+    // string is not safe, for example:
+    // "/" + "/foo" yields "//foo", which will be parsed as authority in Path
+    return new Path(path1.toUri().getScheme(), 
+        path1.toUri().getAuthority(), 
+        path1.toUri().getPath() + path2Str);
   }
 
   /**
@@ -247,8 +250,8 @@ public class Path implements Comparable 
     }
     
     // trim trailing slash from non-root path (ignoring windows drive)
-    int minLength = hasWindowsDrive(path) ? 4 : 1;
-    if (path.length() > minLength && path.endsWith("/")) {
+    int minLength = startPositionWithoutWindowsDrive(path) + 1;
+    if (path.length() > minLength && path.endsWith(SEPARATOR)) {
       path = path.substring(0, path.length()-1);
     }
     
@@ -259,6 +262,14 @@ public class Path implements Comparable 
     return (WINDOWS && hasDriveLetterSpecifier.matcher(path).find());
   }
 
+  private static int startPositionWithoutWindowsDrive(String path) {
+    if (hasWindowsDrive(path)) {
+      return path.charAt(0) ==  SEPARATOR_CHAR ? 3 : 2;
+    } else {
+      return 0;
+    }
+  }
+  
   /**
    * Determine whether a given path string represents an absolute path on
    * Windows. e.g. "C:/a/b" is an absolute path. "C:a/b" is not.
@@ -270,13 +281,11 @@ public class Path implements Comparable 
    */
   public static boolean isWindowsAbsolutePath(final String pathString,
                                               final boolean slashed) {
-    int start = (slashed ? 1 : 0);
-
-    return
-        hasWindowsDrive(pathString) &&
-        pathString.length() >= (start + 3) &&
-        ((pathString.charAt(start + 2) == SEPARATOR_CHAR) ||
-          (pathString.charAt(start + 2) == '\\'));
+    int start = startPositionWithoutWindowsDrive(pathString);
+    return start > 0
+        && pathString.length() > start
+        && ((pathString.charAt(start) == SEPARATOR_CHAR) ||
+            (pathString.charAt(start) == '\\'));
   }
 
   /** Convert this to a URI. */
@@ -300,7 +309,7 @@ public class Path implements Comparable 
    *  True if the path component (i.e. directory) of this URI is absolute.
    */
   public boolean isUriPathAbsolute() {
-    int start = hasWindowsDrive(uri.getPath()) ? 3 : 0;
+    int start = startPositionWithoutWindowsDrive(uri.getPath());
     return uri.getPath().startsWith(SEPARATOR, start);
    }
   
@@ -334,7 +343,7 @@ public class Path implements Comparable 
   public Path getParent() {
     String path = uri.getPath();
     int lastSlash = path.lastIndexOf('/');
-    int start = hasWindowsDrive(path) ? 3 : 0;
+    int start = startPositionWithoutWindowsDrive(path);
     if ((path.length() == start) ||               // empty path
         (lastSlash == start && path.length() == start+1)) { // at root
       return null;
@@ -343,8 +352,7 @@ public class Path implements Comparable 
     if (lastSlash==-1) {
       parent = CUR_DIR;
     } else {
-      int end = hasWindowsDrive(path) ? 3 : 0;
-      parent = path.substring(0, lastSlash==end?end+1:lastSlash);
+      parent = path.substring(0, lastSlash==start?start+1:lastSlash);
     }
     return new Path(uri.getScheme(), uri.getAuthority(), parent);
   }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Seekable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Seekable.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Seekable.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Seekable.java Wed Oct 30 22:21:59 2013
@@ -22,7 +22,9 @@ import java.io.*;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
-/** Stream that permits seeking. */
+/**
+ *  Stream that permits seeking.
+ */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface Seekable {

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Stat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Stat.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Stat.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Stat.java Wed Oct 30 22:21:59 2013
@@ -80,7 +80,7 @@ public class Stat extends Shell {
    * @return
    */
   public static boolean isAvailable() {
-    if (Shell.LINUX || Shell.FREEBSD) {
+    if (Shell.LINUX || Shell.FREEBSD || Shell.MAC) {
       return true;
     }
     return false;
@@ -100,7 +100,7 @@ public class Stat extends Shell {
     if (Shell.LINUX) {
       return new String[] {
           "stat", derefFlag + "c", "%s,%F,%Y,%X,%a,%U,%G,%N", path.toString() };
-    } else if (Shell.FREEBSD) {
+    } else if (Shell.FREEBSD || Shell.MAC) {
       return new String[] {
           "stat", derefFlag + "f", "%z,%HT,%m,%a,%Op,%Su,%Sg,`link' -> `%Y'",
           path.toString() };

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java Wed Oct 30 22:21:59 2013
@@ -84,11 +84,16 @@ abstract class CommandWithDestination ex
    */
   protected void getLocalDestination(LinkedList<String> args)
   throws IOException {
+    String pathString = (args.size() < 2) ? Path.CUR_DIR : args.removeLast();
     try {
-      String pathString = (args.size() < 2) ? Path.CUR_DIR : args.removeLast();
       dst = new PathData(new URI(pathString), getConf());
     } catch (URISyntaxException e) {
-      throw new IOException("unexpected URISyntaxException", e);
+      if (Path.WINDOWS) {
+        // Unlike URI, PathData knows how to parse Windows drive-letter paths.
+        dst = new PathData(pathString, getConf());
+      } else {
+        throw new IOException("unexpected URISyntaxException", e);
+      }
     }
   }
 

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java Wed Oct 30 22:21:59 2013
@@ -204,13 +204,18 @@ class CopyCommands {  
     // commands operating on local paths have no need for glob expansion
     @Override
     protected List<PathData> expandArgument(String arg) throws IOException {
+      List<PathData> items = new LinkedList<PathData>();
       try {
-        List<PathData> items = new LinkedList<PathData>();
         items.add(new PathData(new URI(arg), getConf()));
-        return items;
       } catch (URISyntaxException e) {
-        throw new IOException("unexpected URISyntaxException", e);
+        if (Path.WINDOWS) {
+          // Unlike URI, PathData knows how to parse Windows drive-letter paths.
+          items.add(new PathData(arg, getConf()));
+        } else {
+          throw new IOException("unexpected URISyntaxException", e);
+        }
       }
+      return items;
     }
 
     @Override

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java Wed Oct 30 22:21:59 2013
@@ -39,11 +39,14 @@ class SetReplication extends FsCommand {
   }
   
   public static final String NAME = "setrep";
-  public static final String USAGE = "[-R] [-w] <rep> <path/file> ...";
+  public static final String USAGE = "[-R] [-w] <rep> <path> ...";
   public static final String DESCRIPTION =
-    "Set the replication level of a file.\n" +
-    "The -R flag requests a recursive change of replication level\n" +
-    "for an entire tree.";
+    "Set the replication level of a file. If <path> is a directory\n" +
+    "then the command recursively changes the replication factor of\n" +
+    "all files under the directory tree rooted at <path>.\n" +
+    "The -w flag requests that the command wait for the replication\n" +
+    "to complete. This can potentially take a very long time.\n" +
+    "The -R flag is accepted for backwards compatibility. It has no effect.";
   
   protected short newRep = 0;
   protected List<PathData> waitList = new LinkedList<PathData>();
@@ -54,7 +57,7 @@ class SetReplication extends FsCommand {
     CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "R", "w");
     cf.parse(args);
     waitOpt = cf.getOpt("w");
-    setRecursive(cf.getOpt("R"));
+    setRecursive(true);
     
     try {
       newRep = Short.parseShort(args.removeFirst());
@@ -126,4 +129,4 @@ class SetReplication extends FsCommand {
       out.println(" done");
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SnapshotCommands.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SnapshotCommands.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SnapshotCommands.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SnapshotCommands.java Wed Oct 30 22:21:59 2013
@@ -68,7 +68,7 @@ class SnapshotCommands extends FsCommand
         throw new IllegalArgumentException("<snapshotDir> is missing.");
       } 
       if (args.size() > 2) {
-        throw new IllegalArgumentException("Too many arguements.");
+        throw new IllegalArgumentException("Too many arguments.");
       }
       if (args.size() == 2) {
         snapshotName = args.removeLast();
@@ -110,7 +110,7 @@ class SnapshotCommands extends FsCommand
     @Override
     protected void processOptions(LinkedList<String> args) throws IOException {
       if (args.size() != 2) {
-        throw new IOException("args number not 2: " + args.size());
+        throw new IllegalArgumentException("Incorrect number of arguments.");
       }
       snapshotName = args.removeLast();
     }
@@ -150,7 +150,7 @@ class SnapshotCommands extends FsCommand
     @Override
     protected void processOptions(LinkedList<String> args) throws IOException {
       if (args.size() != 3) {
-        throw new IOException("args number not 3: " + args.size());
+        throw new IllegalArgumentException("Incorrect number of arguments.");
       }
       newName = args.removeLast();
       oldName = args.removeLast();

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Wed Oct 30 22:21:59 2013
@@ -568,6 +568,9 @@ public class ActiveStandbyElector implem
         enterNeutralMode();
         reJoinElection(0);
         break;
+      case SaslAuthenticated:
+        LOG.info("Successfully authenticated to ZooKeeper using SASL.");
+        break;
       default:
         fatalError("Unexpected Zookeeper watch event state: "
             + event.getState());

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java Wed Oct 30 22:21:59 2013
@@ -63,7 +63,7 @@ public abstract class HAAdmin extends Co
 
   private int rpcTimeoutForChecks = -1;
   
-  private static Map<String, UsageInfo> USAGE =
+  protected final static Map<String, UsageInfo> USAGE =
     ImmutableMap.<String, UsageInfo>builder()
     .put("-transitionToActive",
         new UsageInfo("<serviceId>", "Transitions the service into Active state"))
@@ -91,6 +91,14 @@ public abstract class HAAdmin extends Co
   protected PrintStream out = System.out;
   private RequestSource requestSource = RequestSource.REQUEST_BY_USER;
 
+  protected HAAdmin() {
+    super();
+  }
+
+  protected HAAdmin(Configuration conf) {
+    super(conf);
+  }
+
   protected abstract HAServiceTarget resolveTarget(String string);
 
   protected String getUsageString() {
@@ -461,9 +469,9 @@ public abstract class HAAdmin extends Co
     return 0;
   }
   
-  private static class UsageInfo {
-    private final String args;
-    private final String help;
+  protected static class UsageInfo {
+    public final String args;
+    public final String help;
     
     public UsageInfo(String args, String help) {
       this.args = args;

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java Wed Oct 30 22:21:59 2013
@@ -43,13 +43,15 @@ public interface HAServiceProtocol {
   public static final long versionID = 1L;
 
   /**
-   * An HA service may be in active or standby state. During
-   * startup, it is in an unknown INITIALIZING state.
+   * An HA service may be in active or standby state. During startup, it is in
+   * an unknown INITIALIZING state. During shutdown, it is in the STOPPING state
+   * and can no longer return to active/standby states.
    */
   public enum HAServiceState {
     INITIALIZING("initializing"),
     ACTIVE("active"),
-    STANDBY("standby");
+    STANDBY("standby"),
+    STOPPING("stopping");
 
     private String name;
 

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java Wed Oct 30 22:21:59 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.http;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -29,26 +28,41 @@ import org.apache.hadoop.fs.CommonConfig
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class HttpConfig {
-  private static boolean sslEnabled;
+  private static Policy policy;
+  public enum Policy {
+    HTTP_ONLY,
+    HTTPS_ONLY;
+
+    public static Policy fromString(String value) {
+      if (value.equalsIgnoreCase(CommonConfigurationKeysPublic
+              .HTTP_POLICY_HTTPS_ONLY)) {
+        return HTTPS_ONLY;
+      }
+      return HTTP_ONLY;
+    }
+  }
 
   static {
     Configuration conf = new Configuration();
-    sslEnabled = conf.getBoolean(
-        CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY,
-        CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
+    boolean sslEnabled = conf.getBoolean(
+            CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY,
+            CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
+    policy = sslEnabled ? Policy.HTTPS_ONLY : Policy.HTTP_ONLY;
   }
 
-  @VisibleForTesting
-  static void setSecure(boolean secure) {
-    sslEnabled = secure;
+  public static void setPolicy(Policy policy) {
+    HttpConfig.policy = policy;
   }
 
   public static boolean isSecure() {
-    return sslEnabled;
+    return policy == Policy.HTTPS_ONLY;
   }
 
   public static String getSchemePrefix() {
     return (isSecure()) ? "https://" : "http://";
   }
 
+  public static String getScheme(Policy policy) {
+    return policy == Policy.HTTPS_ONLY ? "https://" : "http://";
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java Wed Oct 30 22:21:59 2013
@@ -66,9 +66,12 @@ import org.mortbay.io.Buffer;
 import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Handler;
 import org.mortbay.jetty.MimeTypes;
+import org.mortbay.jetty.RequestLog;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.handler.ContextHandler;
 import org.mortbay.jetty.handler.ContextHandlerCollection;
+import org.mortbay.jetty.handler.RequestLogHandler;
+import org.mortbay.jetty.handler.HandlerCollection;
 import org.mortbay.jetty.nio.SelectChannelConnector;
 import org.mortbay.jetty.security.SslSocketConnector;
 import org.mortbay.jetty.servlet.Context;
@@ -337,6 +340,7 @@ public class HttpServer implements Filte
       }
       listener.setHost(bindAddress);
       listener.setPort(port);
+      LOG.info("SSL is enabled on " + toString());
     } else {
       listenerStartedExternally = true;
       listener = connector;
@@ -354,7 +358,18 @@ public class HttpServer implements Filte
 
     final String appDir = getWebAppsPath(name);
     ContextHandlerCollection contexts = new ContextHandlerCollection();
-    webServer.setHandler(contexts);
+    RequestLog requestLog = HttpRequestLog.getRequestLog(name);
+
+    if (requestLog != null) {
+      RequestLogHandler requestLogHandler = new RequestLogHandler();
+      requestLogHandler.setRequestLog(requestLog);
+      HandlerCollection handlers = new HandlerCollection();
+      handlers.setHandlers(new Handler[] {requestLogHandler, contexts});
+      webServer.setHandler(handlers);
+    }
+    else {
+      webServer.setHandler(contexts);
+    }
 
     webAppContext = new WebAppContext();
     webAppContext.setDisplayName(name);

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Wed Oct 30 22:21:59 2013
@@ -64,7 +64,7 @@ public class RetryInvocationHandler<T> i
     this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
   }
 
-  RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
+  protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
       RetryPolicy defaultPolicy,
       Map<String, RetryPolicy> methodNameToPolicyMap) {
     this.proxyProvider = proxyProvider;

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java Wed Oct 30 22:21:59 2013
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.net.ConnectTimeoutException;
 
@@ -531,6 +532,15 @@ public class RetryPolicies {
       this.maxDelayBase = maxDelayBase;
     }
 
+    /**
+     * @return 0 if this is our first failover/retry (i.e., retry immediately),
+     *         sleep exponentially otherwise
+     */
+    private long getFailoverOrRetrySleepTime(int times) {
+      return times == 0 ? 0 : 
+        calculateExponentialTime(delayMillis, times, maxDelayBase);
+    }
+    
     @Override
     public RetryAction shouldRetry(Exception e, int retries,
         int failovers, boolean isIdempotentOrAtMostOnce) throws Exception {
@@ -546,11 +556,8 @@ public class RetryPolicies {
           e instanceof StandbyException ||
           e instanceof ConnectTimeoutException ||
           isWrappedStandbyException(e)) {
-        return new RetryAction(
-            RetryAction.RetryDecision.FAILOVER_AND_RETRY,
-            // retry immediately if this is our first failover, sleep otherwise
-            failovers == 0 ? 0 :
-                calculateExponentialTime(delayMillis, failovers, maxDelayBase));
+        return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
+            getFailoverOrRetrySleepTime(failovers));
       } else if (e instanceof SocketException ||
                  (e instanceof IOException && !(e instanceof RemoteException))) {
         if (isIdempotentOrAtMostOnce) {
@@ -561,8 +568,14 @@ public class RetryPolicies {
               "whether it was invoked");
         }
       } else {
-        return fallbackPolicy.shouldRetry(e, retries, failovers,
-            isIdempotentOrAtMostOnce);
+        RetriableException re = getWrappedRetriableException(e);
+        if (re != null) {
+          return new RetryAction(RetryAction.RetryDecision.RETRY,
+              getFailoverOrRetrySleepTime(retries));
+        } else {
+          return fallbackPolicy.shouldRetry(e, retries, failovers,
+              isIdempotentOrAtMostOnce);
+        }
       }
     }
     
@@ -596,4 +609,14 @@ public class RetryPolicies {
         StandbyException.class);
     return unwrapped instanceof StandbyException;
   }
+  
+  private static RetriableException getWrappedRetriableException(Exception e) {
+    if (!(e instanceof RemoteException)) {
+      return null;
+    }
+    Exception unwrapped = ((RemoteException)e).unwrapRemoteException(
+        RetriableException.class);
+    return unwrapped instanceof RetriableException ? 
+        (RetriableException) unwrapped : null;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Wed Oct 30 22:21:59 2013
@@ -1063,8 +1063,8 @@ public class Client {
         if (status == RpcStatusProto.SUCCESS) {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);                 // read value
-          call.setRpcResponse(value);
           calls.remove(callId);
+          call.setRpcResponse(value);
           
           // verify that length was correct
           // only for ProtobufEngine where len can be verified easily
@@ -1098,8 +1098,8 @@ public class Client {
                   new RemoteException(exceptionClassName, errorMsg) :
               new RemoteException(exceptionClassName, errorMsg, erCode));
           if (status == RpcStatusProto.ERROR) {
-            call.setException(re);
             calls.remove(callId);
+            call.setException(re);
           } else if (status == RpcStatusProto.FATAL) {
             // Close the connection
             markClosed(re);
@@ -1166,8 +1166,8 @@ public class Client {
       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
       while (itor.hasNext()) {
         Call c = itor.next().getValue(); 
+        itor.remove();
         c.setException(closeException); // local exception
-        itor.remove();         
       }
     }
   }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java Wed Oct 30 22:21:59 2013
@@ -76,6 +76,12 @@ public class RetryCache {
       this.expirationTime = expirationTime;
     }
 
+    CacheEntry(byte[] clientId, int callId, long expirationTime,
+        boolean success) {
+      this(clientId, callId, expirationTime);
+      this.state = success ? SUCCESS : FAILED;
+    }
+
     private static int hashCode(long value) {
       return (int)(value ^ (value >>> 32));
     }
@@ -147,6 +153,12 @@ public class RetryCache {
       this.payload = payload;
     }
 
+    CacheEntryWithPayload(byte[] clientId, int callId, Object payload,
+        long expirationTime, boolean success) {
+     super(clientId, callId, expirationTime, success);
+     this.payload = payload;
+   }
+
     /** Override equals to avoid findbugs warnings */
     @Override
     public boolean equals(Object obj) {
@@ -253,18 +265,20 @@ public class RetryCache {
    */
   public void addCacheEntry(byte[] clientId, int callId) {
     CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime()
-        + expirationTime);
-    newEntry.completed(true);
-    set.put(newEntry);
+        + expirationTime, true);
+    synchronized(this) {
+      set.put(newEntry);
+    }
   }
   
   public void addCacheEntryWithPayload(byte[] clientId, int callId,
       Object payload) {
-    CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
-        System.nanoTime() + expirationTime);
     // since the entry is loaded from editlog, we can assume it succeeded.    
-    newEntry.completed(true);
-    set.put(newEntry);
+    CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
+        System.nanoTime() + expirationTime, true);
+    synchronized(this) {
+      set.put(newEntry);
+    }
   }
 
   private static CacheEntry newEntry(long expirationTime) {

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Wed Oct 30 22:21:59 2013
@@ -1292,6 +1292,29 @@ public abstract class Server {
       }
     }
 
+    private Throwable getCauseForInvalidToken(IOException e) {
+      Throwable cause = e;
+      while (cause != null) {
+        if (cause instanceof RetriableException) {
+          return (RetriableException) cause;
+        } else if (cause instanceof StandbyException) {
+          return (StandbyException) cause;
+        } else if (cause instanceof InvalidToken) {
+          // FIXME: hadoop method signatures are restricting the SASL
+          // callbacks to only returning InvalidToken, but some services
+          // need to throw other exceptions (ex. NN + StandyException),
+          // so for now we'll tunnel the real exceptions via an
+          // InvalidToken's cause which normally is not set 
+          if (cause.getCause() != null) {
+            cause = cause.getCause();
+          }
+          return cause;
+        }
+        cause = cause.getCause();
+      }
+      return e;
+    }
+    
     private void saslProcess(RpcSaslProto saslMessage)
         throws WrappedRpcServerException, IOException, InterruptedException {
       if (saslContextEstablished) {
@@ -1304,29 +1327,11 @@ public abstract class Server {
         try {
           saslResponse = processSaslMessage(saslMessage);
         } catch (IOException e) {
-          IOException sendToClient = e;
-          Throwable cause = e;
-          while (cause != null) {
-            if (cause instanceof InvalidToken) {
-              // FIXME: hadoop method signatures are restricting the SASL
-              // callbacks to only returning InvalidToken, but some services
-              // need to throw other exceptions (ex. NN + StandyException),
-              // so for now we'll tunnel the real exceptions via an
-              // InvalidToken's cause which normally is not set 
-              if (cause.getCause() != null) {
-                cause = cause.getCause();
-              }
-              sendToClient = (IOException) cause;
-              break;
-            }
-            cause = cause.getCause();
-          }
           rpcMetrics.incrAuthenticationFailures();
-          String clientIP = this.toString();
           // attempting user could be null
-          AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
-            " (" + e.getLocalizedMessage() + ")");
-          throw sendToClient;
+          AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
+              + attemptingUser + " (" + e.getLocalizedMessage() + ")");
+          throw (IOException) getCauseForInvalidToken(e);
         }
         
         if (saslServer != null && saslServer.isComplete()) {

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java Wed Oct 30 22:21:59 2013
@@ -112,7 +112,7 @@ public abstract class AbstractPatternFil
       return false;
     }
     // Reject if no match in whitelist only mode
-    if (ipat != null && epat == null) {
+    if (!includeTagPatterns.isEmpty() && excludeTagPatterns.isEmpty()) {
       return false;
     }
     return true;

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java Wed Oct 30 22:21:59 2013
@@ -234,7 +234,7 @@
     patterns.
   </p>
   <p>Similarly, you can specify the <code>record.filter</code> and
-    <code>metrics.filter</code> options, which operate at record and metric
+    <code>metric.filter</code> options, which operate at record and metric
     level, respectively. Filters can be combined to optimize
     the filtering efficiency.</p>
 

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java Wed Oct 30 22:21:59 2013
@@ -154,4 +154,11 @@ public class CachedDNSToSwitchMapping ex
   public void reloadCachedMappings() {
     cache.clear();
   }
+
+  @Override
+  public void reloadCachedMappings(List<String> names) {
+    for (String name : names) {
+      cache.remove(name);
+    }
+  }
 }