You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2013/10/24 23:38:28 UTC

svn commit: r1535563 - in /hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common: ./ src/main/docs/ src/main/java/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/metrics2/ src/main/java/org/apache/hadoop/metrics2/fil...

Author: wang
Date: Thu Oct 24 21:38:25 2013
New Revision: 1535563

URL: http://svn.apache.org/r1535563
Log:
merge trunk into HDFS-4949 branch

Added:
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestFileSink.java
      - copied unchanged from r1535559, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestFileSink.java
Modified:
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/docs/   (props changed)
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/site/apt/FileSystemShell.apt.vm
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/core/   (props changed)
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java
    hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/filter/TestPatternFilter.java

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1535563&r1=1535562&r2=1535563&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/CHANGES.txt Thu Oct 24 21:38:25 2013
@@ -363,6 +363,15 @@ Release 2.3.0 - UNRELEASED
     HADOOP-9078. enhance unit-test coverage of class
     org.apache.hadoop.fs.FileContext (Ivan A. Veselovsky via jeagles)
 
+    HDFS-5276. FileSystem.Statistics should use thread-local counters to avoid
+    multi-threaded performance issues on read/write.  (Colin Patrick McCabe)
+
+    HADOOP-9291. enhance unit-test coverage of package o.a.h.metrics2 (Ivan A.
+    Veselovsky via jeagles)
+
+    HADOOP-10064. Upgrade to maven antrun plugin version 1.7 (Arpit Agarwal via
+    jeagles)
+
   OPTIMIZATIONS
 
     HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
@@ -405,6 +414,9 @@ Release 2.3.0 - UNRELEASED
     HADOOP-9981. globStatus should minimize its listStatus and getFileStatus
     calls.  (Contributed by Colin Patrick McCabe)
 
+    HADOOP-9016. HarFsInputStream.skip(long) must never return negative value.
+    (Ivan A. Veselovsky via jeagles)
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -434,6 +446,9 @@ Release 2.2.1 - UNRELEASED
     HADOOP-10040. hadoop.cmd in UNIX format and would not run by default on
     Windows. (cnauroth)
 
+    HADOOP-10055. FileSystemShell.apt.vm doc has typo "numRepicas".
+    (Akira Ajisaka via cnauroth)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1532946-1535559

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/docs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/docs:r1532946-1535559

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1532946-1535559

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java?rev=1535563&r1=1535562&r2=1535563&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java Thu Oct 24 21:38:25 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.ref.WeakReference;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
@@ -31,6 +32,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -2501,28 +2503,149 @@ public abstract class FileSystem extends
     }
   }
   
+  /**
+   * Tracks statistics about how many reads, writes, and so forth have been
+   * done in a FileSystem.
+   * 
+   * Since there is only one of these objects per FileSystem, there will 
+   * typically be many threads writing to this object.  Almost every operation
+   * on an open file will involve a write to this object.  In contrast, reading
+   * statistics is done infrequently by most programs, and not at all by others.
+   * Hence, this is optimized for writes.
+   * 
+   * Each thread writes to its own thread-local area of memory.  This removes 
+   * contention and allows us to scale up to many, many threads.  To read
+   * statistics, the reader thread totals up the contents of all of the 
+   * thread-local data areas.
+   */
   public static final class Statistics {
+    /**
+     * Statistics data.
+     * 
+     * There is only a single writer to thread-local StatisticsData objects.
+     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
+     * to prevent lost updates.
+     * The Java specification guarantees that updates to volatile longs will
+     * be perceived as atomic with respect to other threads, which is all we
+     * need.
+     */
+    private static class StatisticsData {
+      volatile long bytesRead;
+      volatile long bytesWritten;
+      volatile int readOps;
+      volatile int largeReadOps;
+      volatile int writeOps;
+      /**
+       * Stores a weak reference to the thread owning this StatisticsData.
+       * This allows us to remove StatisticsData objects that pertain to
+       * threads that no longer exist.
+       */
+      final WeakReference<Thread> owner;
+
+      StatisticsData(WeakReference<Thread> owner) {
+        this.owner = owner;
+      }
+
+      /**
+       * Add another StatisticsData object to this one.
+       */
+      void add(StatisticsData other) {
+        this.bytesRead += other.bytesRead;
+        this.bytesWritten += other.bytesWritten;
+        this.readOps += other.readOps;
+        this.largeReadOps += other.largeReadOps;
+        this.writeOps += other.writeOps;
+      }
+
+      /**
+       * Negate the values of all statistics.
+       */
+      void negate() {
+        this.bytesRead = -this.bytesRead;
+        this.bytesWritten = -this.bytesWritten;
+        this.readOps = -this.readOps;
+        this.largeReadOps = -this.largeReadOps;
+        this.writeOps = -this.writeOps;
+      }
+
+      @Override
+      public String toString() {
+        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
+            + readOps + " read ops, " + largeReadOps + " large read ops, "
+            + writeOps + " write ops";
+      }
+    }
+
+    private interface StatisticsAggregator<T> {
+      void accept(StatisticsData data);
+      T aggregate();
+    }
+
     private final String scheme;
-    private AtomicLong bytesRead = new AtomicLong();
-    private AtomicLong bytesWritten = new AtomicLong();
-    private AtomicInteger readOps = new AtomicInteger();
-    private AtomicInteger largeReadOps = new AtomicInteger();
-    private AtomicInteger writeOps = new AtomicInteger();
+
+    /**
+     * rootData is data that doesn't belong to any thread, but will be added
+     * to the totals.  This is useful for making copies of Statistics objects,
+     * and for storing data that pertains to threads that have been garbage
+     * collected.  Protected by the Statistics lock.
+     */
+    private final StatisticsData rootData;
+
+    /**
+     * Thread-local data.
+     */
+    private final ThreadLocal<StatisticsData> threadData;
     
+    /**
+     * List of all thread-local data areas.  Protected by the Statistics lock.
+     */
+    private LinkedList<StatisticsData> allData;
+
     public Statistics(String scheme) {
       this.scheme = scheme;
+      this.rootData = new StatisticsData(null);
+      this.threadData = new ThreadLocal<StatisticsData>();
+      this.allData = null;
     }
 
     /**
      * Copy constructor.
      * 
-     * @param st
-     *          The input Statistics object which is cloned.
+     * @param other    The input Statistics object which is cloned.
      */
-    public Statistics(Statistics st) {
-      this.scheme = st.scheme;
-      this.bytesRead = new AtomicLong(st.bytesRead.longValue());
-      this.bytesWritten = new AtomicLong(st.bytesWritten.longValue());
+    public Statistics(Statistics other) {
+      this.scheme = other.scheme;
+      this.rootData = new StatisticsData(null);
+      other.visitAll(new StatisticsAggregator<Void>() {
+        @Override
+        public void accept(StatisticsData data) {
+          rootData.add(data);
+        }
+
+        public Void aggregate() {
+          return null;
+        }
+      });
+      this.threadData = new ThreadLocal<StatisticsData>();
+    }
+
+    /**
+     * Get or create the thread-local data associated with the current thread.
+     */
+    private StatisticsData getThreadData() {
+      StatisticsData data = threadData.get();
+      if (data == null) {
+        data = new StatisticsData(
+            new WeakReference<Thread>(Thread.currentThread()));
+        threadData.set(data);
+        synchronized(this) {
+          if (allData == null) {
+            allData = new LinkedList<StatisticsData>();
+          }
+          allData.add(data);
+        }
+      }
+      return data;
     }
 
     /**
@@ -2530,7 +2653,7 @@ public abstract class FileSystem extends
      * @param newBytes the additional bytes read
      */
     public void incrementBytesRead(long newBytes) {
-      bytesRead.getAndAdd(newBytes);
+      getThreadData().bytesRead += newBytes;
     }
     
     /**
@@ -2538,7 +2661,7 @@ public abstract class FileSystem extends
      * @param newBytes the additional bytes written
      */
     public void incrementBytesWritten(long newBytes) {
-      bytesWritten.getAndAdd(newBytes);
+      getThreadData().bytesWritten += newBytes;
     }
     
     /**
@@ -2546,7 +2669,7 @@ public abstract class FileSystem extends
      * @param count number of read operations
      */
     public void incrementReadOps(int count) {
-      readOps.getAndAdd(count);
+      getThreadData().readOps += count;
     }
 
     /**
@@ -2554,7 +2677,7 @@ public abstract class FileSystem extends
      * @param count number of large read operations
      */
     public void incrementLargeReadOps(int count) {
-      largeReadOps.getAndAdd(count);
+      getThreadData().largeReadOps += count;
     }
 
     /**
@@ -2562,7 +2685,38 @@ public abstract class FileSystem extends
      * @param count number of write operations
      */
     public void incrementWriteOps(int count) {
-      writeOps.getAndAdd(count);
+      getThreadData().writeOps += count;
+    }
+
+    /**
+     * Apply the given aggregator to all StatisticsData objects associated with
+     * this Statistics object.
+     *
+     * For each StatisticsData object, we will call accept on the visitor.
+     * Finally, at the end, we will call aggregate to get the final total. 
+     *
+     * @param         The visitor to use.
+     * @return        The total.
+     */
+    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
+      visitor.accept(rootData);
+      if (allData != null) {
+        for (Iterator<StatisticsData> iter = allData.iterator();
+            iter.hasNext(); ) {
+          StatisticsData data = iter.next();
+          visitor.accept(data);
+          if (data.owner.get() == null) {
+            /*
+             * If the thread that created this thread-local data no
+             * longer exists, remove the StatisticsData from our list
+             * and fold the values into rootData.
+             */
+            rootData.add(data);
+            iter.remove();
+          }
+        }
+      }
+      return visitor.aggregate();
     }
 
     /**
@@ -2570,7 +2724,18 @@ public abstract class FileSystem extends
      * @return the number of bytes
      */
     public long getBytesRead() {
-      return bytesRead.get();
+      return visitAll(new StatisticsAggregator<Long>() {
+        private long bytesRead = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          bytesRead += data.bytesRead;
+        }
+
+        public Long aggregate() {
+          return bytesRead;
+        }
+      });
     }
     
     /**
@@ -2578,7 +2743,18 @@ public abstract class FileSystem extends
      * @return the number of bytes
      */
     public long getBytesWritten() {
-      return bytesWritten.get();
+      return visitAll(new StatisticsAggregator<Long>() {
+        private long bytesWritten = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          bytesWritten += data.bytesWritten;
+        }
+
+        public Long aggregate() {
+          return bytesWritten;
+        }
+      });
     }
     
     /**
@@ -2586,7 +2762,19 @@ public abstract class FileSystem extends
      * @return number of read operations
      */
     public int getReadOps() {
-      return readOps.get() + largeReadOps.get();
+      return visitAll(new StatisticsAggregator<Integer>() {
+        private int readOps = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          readOps += data.readOps;
+          readOps += data.largeReadOps;
+        }
+
+        public Integer aggregate() {
+          return readOps;
+        }
+      });
     }
 
     /**
@@ -2595,7 +2783,18 @@ public abstract class FileSystem extends
      * @return number of large read operations
      */
     public int getLargeReadOps() {
-      return largeReadOps.get();
+      return visitAll(new StatisticsAggregator<Integer>() {
+        private int largeReadOps = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          largeReadOps += data.largeReadOps;
+        }
+
+        public Integer aggregate() {
+          return largeReadOps;
+        }
+      });
     }
 
     /**
@@ -2604,22 +2803,70 @@ public abstract class FileSystem extends
      * @return number of write operations
      */
     public int getWriteOps() {
-      return writeOps.get();
+      return visitAll(new StatisticsAggregator<Integer>() {
+        private int writeOps = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          writeOps += data.writeOps;
+        }
+
+        public Integer aggregate() {
+          return writeOps;
+        }
+      });
     }
 
+
     @Override
     public String toString() {
-      return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
-          + readOps + " read ops, " + largeReadOps + " large read ops, "
-          + writeOps + " write ops";
+      return visitAll(new StatisticsAggregator<String>() {
+        private StatisticsData total = new StatisticsData(null);
+
+        @Override
+        public void accept(StatisticsData data) {
+          total.add(data);
+        }
+
+        public String aggregate() {
+          return total.toString();
+        }
+      });
     }
-    
+
     /**
-     * Reset the counts of bytes to 0.
+     * Resets all statistics to 0.
+     *
+     * In order to reset, we add up all the thread-local statistics data, and
+     * set rootData to the negative of that.
+     *
+     * This may seem like a counterintuitive way to reset the statsitics.  Why
+     * can't we just zero out all the thread-local data?  Well, thread-local
+     * data can only be modified by the thread that owns it.  If we tried to
+     * modify the thread-local data from this thread, our modification might get
+     * interleaved with a read-modify-write operation done by the thread that
+     * owns the data.  That would result in our update getting lost.
+     *
+     * The approach used here avoids this problem because it only ever reads
+     * (not writes) the thread-local data.  Both reads and writes to rootData
+     * are done under the lock, so we're free to modify rootData from any thread
+     * that holds the lock.
      */
     public void reset() {
-      bytesWritten.set(0);
-      bytesRead.set(0);
+      visitAll(new StatisticsAggregator<Void>() {
+        private StatisticsData total = new StatisticsData(null);
+
+        @Override
+        public void accept(StatisticsData data) {
+          total.add(data);
+        }
+
+        public Void aggregate() {
+          total.negate();
+          rootData.add(total);
+          return null;
+        }
+      });
     }
     
     /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java?rev=1535563&r1=1535562&r2=1535563&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java Thu Oct 24 21:38:25 2013
@@ -898,11 +898,15 @@ public class HarFileSystem extends FileS
       private long position, start, end;
       //The underlying data input stream that the
       // underlying filesystem will return.
-      private FSDataInputStream underLyingStream;
+      private final FSDataInputStream underLyingStream;
       //one byte buffer
-      private byte[] oneBytebuff = new byte[1];
+      private final byte[] oneBytebuff = new byte[1];
+      
       HarFsInputStream(FileSystem fs, Path path, long start,
           long length, int bufferSize) throws IOException {
+        if (length < 0) {
+          throw new IllegalArgumentException("Negative length ["+length+"]");
+        }
         underLyingStream = fs.open(path, bufferSize);
         underLyingStream.seek(start);
         // the start of this file in the part file
@@ -916,7 +920,7 @@ public class HarFileSystem extends FileS
       @Override
       public synchronized int available() throws IOException {
         long remaining = end - underLyingStream.getPos();
-        if (remaining > (long)Integer.MAX_VALUE) {
+        if (remaining > Integer.MAX_VALUE) {
           return Integer.MAX_VALUE;
         }
         return (int) remaining;
@@ -948,10 +952,14 @@ public class HarFileSystem extends FileS
         return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff);
       }
       
+      // NB: currently this method actually never executed becusae
+      // java.io.DataInputStream.read(byte[]) directly delegates to 
+      // method java.io.InputStream.read(byte[], int, int).
+      // However, potentially it can be invoked, so leave it intact for now.
       @Override
       public synchronized int read(byte[] b) throws IOException {
-        int ret = read(b, 0, b.length);
-        if (ret != -1) {
+        final int ret = read(b, 0, b.length);
+        if (ret > 0) {
           position += ret;
         }
         return ret;
@@ -980,15 +988,19 @@ public class HarFileSystem extends FileS
       public synchronized long skip(long n) throws IOException {
         long tmpN = n;
         if (tmpN > 0) {
-          if (position + tmpN > end) {
-            tmpN = end - position;
-          }
+          final long actualRemaining = end - position; 
+          if (tmpN > actualRemaining) {
+            tmpN = actualRemaining;
+          }   
           underLyingStream.seek(tmpN + position);
           position += tmpN;
           return tmpN;
-        }
-        return (tmpN < 0)? -1 : 0;
-      }
+        }   
+        // NB: the contract is described in java.io.InputStream.skip(long):
+        // this method returns the number of bytes actually skipped, so,
+        // the return value should never be negative. 
+        return 0;
+      }   
       
       @Override
       public synchronized long getPos() throws IOException {
@@ -996,14 +1008,23 @@ public class HarFileSystem extends FileS
       }
       
       @Override
-      public synchronized void seek(long pos) throws IOException {
-        if (pos < 0 || (start + pos > end)) {
-          throw new IOException("Failed to seek: EOF");
-        }
+      public synchronized void seek(final long pos) throws IOException {
+        validatePosition(pos);
         position = start + pos;
         underLyingStream.seek(position);
       }
 
+      private void validatePosition(final long pos) throws IOException {
+        if (pos < 0) {
+          throw new IOException("Negative position: "+pos);
+         }
+         final long length = end - start;
+         if (pos > length) {
+           throw new IOException("Position behind the end " +
+               "of the stream (length = "+length+"): " + pos);
+         }
+      }
+
       @Override
       public boolean seekToNewSource(long targetPos) throws IOException {
         // do not need to implement this
@@ -1020,7 +1041,12 @@ public class HarFileSystem extends FileS
       throws IOException {
         int nlength = length;
         if (start + nlength + pos > end) {
-          nlength = (int) (end - (start + pos));
+          // length corrected to the real remaining length:
+          nlength = (int) (end - start - pos);
+        }
+        if (nlength <= 0) {
+          // EOS:
+          return -1;
         }
         return underLyingStream.read(pos + start , b, offset, nlength);
       }

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java?rev=1535563&r1=1535562&r2=1535563&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java Thu Oct 24 21:38:25 2013
@@ -112,7 +112,7 @@ public abstract class AbstractPatternFil
       return false;
     }
     // Reject if no match in whitelist only mode
-    if (ipat != null && epat == null) {
+    if (!includeTagPatterns.isEmpty() && excludeTagPatterns.isEmpty()) {
       return false;
     }
     return true;

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java?rev=1535563&r1=1535562&r2=1535563&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java Thu Oct 24 21:38:25 2013
@@ -234,7 +234,7 @@
     patterns.
   </p>
   <p>Similarly, you can specify the <code>record.filter</code> and
-    <code>metrics.filter</code> options, which operate at record and metric
+    <code>metric.filter</code> options, which operate at record and metric
     level, respectively. Filters can be combined to optimize
     the filtering efficiency.</p>
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/site/apt/FileSystemShell.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/site/apt/FileSystemShell.apt.vm?rev=1535563&r1=1535562&r2=1535563&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/site/apt/FileSystemShell.apt.vm (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/site/apt/FileSystemShell.apt.vm Thu Oct 24 21:38:25 2013
@@ -381,7 +381,7 @@ rmr
 
 setrep
 
-   Usage: <<<hdfs dfs -setrep [-R] [-w] <numRepicas> <path> >>>
+   Usage: <<<hdfs dfs -setrep [-R] [-w] <numReplicas> <path> >>>
 
    Changes the replication factor of a file. If <path> is a directory then
    the command recursively changes the replication factor of all files under

Propchange: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/core/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/core:r1532946-1535559

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java?rev=1535563&r1=1535562&r2=1535563&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java Thu Oct 24 21:38:25 2013
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem.S
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 import static org.apache.hadoop.fs.FileContextTestHelper.*;
 
 /**
@@ -44,6 +46,38 @@ public abstract class FCStatisticsBaseTe
   //fc should be set appropriately by the deriving test.
   protected static FileContext fc = null;
   
+  @Test(timeout=60000)
+  public void testStatisticsOperations() throws Exception {
+    final Statistics stats = new Statistics("file");
+    Assert.assertEquals(0L, stats.getBytesRead());
+    Assert.assertEquals(0L, stats.getBytesWritten());
+    Assert.assertEquals(0, stats.getWriteOps());
+    stats.incrementBytesWritten(1000);
+    Assert.assertEquals(1000L, stats.getBytesWritten());
+    Assert.assertEquals(0, stats.getWriteOps());
+    stats.incrementWriteOps(123);
+    Assert.assertEquals(123, stats.getWriteOps());
+    
+    Thread thread = new Thread() {
+      @Override
+      public void run() {
+        stats.incrementWriteOps(1);
+      }
+    };
+    thread.start();
+    Uninterruptibles.joinUninterruptibly(thread);
+    Assert.assertEquals(124, stats.getWriteOps());
+    // Test copy constructor and reset function
+    Statistics stats2 = new Statistics(stats);
+    stats.reset();
+    Assert.assertEquals(0, stats.getWriteOps());
+    Assert.assertEquals(0L, stats.getBytesWritten());
+    Assert.assertEquals(0L, stats.getBytesRead());
+    Assert.assertEquals(124, stats2.getWriteOps());
+    Assert.assertEquals(1000L, stats2.getBytesWritten());
+    Assert.assertEquals(0L, stats2.getBytesRead());
+  }
+
   @Test
   public void testStatistics() throws IOException, URISyntaxException {
     URI fsUri = getFsUri();

Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/filter/TestPatternFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/filter/TestPatternFilter.java?rev=1535563&r1=1535562&r2=1535563&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/filter/TestPatternFilter.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/filter/TestPatternFilter.java Thu Oct 24 21:38:25 2013
@@ -23,9 +23,11 @@ import java.util.List;
 
 import org.apache.commons.configuration.SubsetConfiguration;
 import org.junit.Test;
+
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+import org.apache.hadoop.metrics2.MetricsFilter;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsTag;
 import org.apache.hadoop.metrics2.impl.ConfigBuilder;
@@ -53,7 +55,7 @@ public class TestPatternFilter {
         .add("p.include.tags", "foo:f").subset("p");
     shouldAccept(wl, "foo");
     shouldAccept(wl, Arrays.asList(tag("bar", "", ""),
-                                   tag("foo", "", "f")));
+                                   tag("foo", "", "f")), new boolean[] {false, true});
     shouldAccept(wl, mockMetricsRecord("foo", Arrays.asList(
       tag("bar", "", ""), tag("foo", "", "f"))));
     shouldReject(wl, "bar");
@@ -78,7 +80,7 @@ public class TestPatternFilter {
       tag("bar", "", ""))));
     shouldReject(bl, "foo");
     shouldReject(bl, Arrays.asList(tag("bar", "", ""),
-                                   tag("foo", "", "f")));
+                                   tag("foo", "", "f")), new boolean[] {true, false});
     shouldReject(bl, mockMetricsRecord("foo", Arrays.asList(
       tag("bar", "", ""))));
     shouldReject(bl, mockMetricsRecord("bar", Arrays.asList(
@@ -125,15 +127,61 @@ public class TestPatternFilter {
     shouldAccept(c, mockMetricsRecord("foo", Arrays.asList(
       tag("foo", "", "f"))));
   }
-
+  
   static void shouldAccept(SubsetConfiguration conf, String s) {
     assertTrue("accepts "+ s, newGlobFilter(conf).accepts(s));
     assertTrue("accepts "+ s, newRegexFilter(conf).accepts(s));
   }
 
+  // Version for one tag:
   static void shouldAccept(SubsetConfiguration conf, List<MetricsTag> tags) {
-    assertTrue("accepts "+ tags, newGlobFilter(conf).accepts(tags));
-    assertTrue("accepts "+ tags, newRegexFilter(conf).accepts(tags));
+    shouldAcceptImpl(true, conf, tags, new boolean[] {true});
+  }
+  // Version for multiple tags: 
+  static void shouldAccept(SubsetConfiguration conf, List<MetricsTag> tags, 
+      boolean[] expectedAcceptedSpec) {
+    shouldAcceptImpl(true, conf, tags, expectedAcceptedSpec);
+  }
+
+  // Version for one tag:
+  static void shouldReject(SubsetConfiguration conf, List<MetricsTag> tags) {
+    shouldAcceptImpl(false, conf, tags, new boolean[] {false});
+  }
+  // Version for multiple tags: 
+  static void shouldReject(SubsetConfiguration conf, List<MetricsTag> tags, 
+      boolean[] expectedAcceptedSpec) {
+    shouldAcceptImpl(false, conf, tags, expectedAcceptedSpec);
+  }
+  
+  private static void shouldAcceptImpl(final boolean expectAcceptList,  
+      SubsetConfiguration conf, List<MetricsTag> tags, boolean[] expectedAcceptedSpec) {
+    final MetricsFilter globFilter = newGlobFilter(conf);
+    final MetricsFilter regexFilter = newRegexFilter(conf);
+    
+    // Test acceptance of the tag list:  
+    assertEquals("accepts "+ tags, expectAcceptList, globFilter.accepts(tags));
+    assertEquals("accepts "+ tags, expectAcceptList, regexFilter.accepts(tags));
+    
+    // Test results on each of the individual tags:
+    int acceptedCount = 0;
+    for (int i=0; i<tags.size(); i++) {
+      MetricsTag tag = tags.get(i);
+      boolean actGlob = globFilter.accepts(tag);
+      boolean actRegex = regexFilter.accepts(tag);
+      assertEquals("accepts "+tag, expectedAcceptedSpec[i], actGlob);
+      // Both the filters should give the same result:
+      assertEquals(actGlob, actRegex);
+      if (actGlob) {
+        acceptedCount++;
+      }
+    }
+    if (expectAcceptList) {
+      // At least one individual tag should be accepted:
+      assertTrue("No tag of the following accepted: " + tags, acceptedCount > 0);
+    } else {
+      // At least one individual tag should be rejected: 
+      assertTrue("No tag of the following rejected: " + tags, acceptedCount < tags.size());
+    }
   }
 
   /**
@@ -152,11 +200,6 @@ public class TestPatternFilter {
     assertTrue("rejects "+ s, !newRegexFilter(conf).accepts(s));
   }
 
-  static void shouldReject(SubsetConfiguration conf, List<MetricsTag> tags) {
-    assertTrue("rejects "+ tags, !newGlobFilter(conf).accepts(tags));
-    assertTrue("rejects "+ tags, !newRegexFilter(conf).accepts(tags));
-  }
-
   /**
    * Asserts that filters with the given configuration reject the given record.
    *