You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/07/01 23:10:04 UTC

[18/46] hadoop git commit: HADOOP-12107. long running apps may have a huge number of StatisticsData instances under FileSystem (Sangjin Lee via Ming Ma)

HADOOP-12107. long running apps may have a huge number of StatisticsData instances under FileSystem (Sangjin Lee via Ming Ma)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e1bdc17
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e1bdc17
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e1bdc17

Branch: refs/heads/HDFS-7240
Commit: 8e1bdc17d9134e01115ae7c929503d8ac0325207
Parents: 460e98f
Author: Ming Ma <mi...@apache.org>
Authored: Mon Jun 29 14:37:38 2015 -0700
Committer: Ming Ma <mi...@apache.org>
Committed: Mon Jun 29 14:37:38 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../java/org/apache/hadoop/fs/FileSystem.java   | 140 +++++++++++++------
 .../apache/hadoop/fs/FCStatisticsBaseTest.java  |  56 +++++++-
 3 files changed, 155 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e1bdc17/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index a9b44e3..50192ae 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -490,6 +490,9 @@ Trunk (Unreleased)
     HADOOP-11347. RawLocalFileSystem#mkdir and create should honor umask (Varun
     Saxena via Colin P. McCabe)
 
+    HADOOP-12107. long running apps may have a huge number of StatisticsData
+    instances under FileSystem (Sangjin Lee via Ming Ma)
+
   OPTIMIZATIONS
 
     HADOOP-7761. Improve the performance of raw comparisons. (todd)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e1bdc17/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 3f9e3bd..1d7bc87 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -20,7 +20,8 @@ package org.apache.hadoop.fs;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.lang.ref.WeakReference;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
@@ -32,7 +33,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -2920,16 +2920,6 @@ public abstract class FileSystem extends Configured implements Closeable {
       volatile int readOps;
       volatile int largeReadOps;
       volatile int writeOps;
-      /**
-       * Stores a weak reference to the thread owning this StatisticsData.
-       * This allows us to remove StatisticsData objects that pertain to
-       * threads that no longer exist.
-       */
-      final WeakReference<Thread> owner;
-
-      StatisticsData(WeakReference<Thread> owner) {
-        this.owner = owner;
-      }
 
       /**
        * Add another StatisticsData object to this one.
@@ -3000,17 +2990,37 @@ public abstract class FileSystem extends Configured implements Closeable {
      * Thread-local data.
      */
     private final ThreadLocal<StatisticsData> threadData;
-    
+
     /**
-     * List of all thread-local data areas.  Protected by the Statistics lock.
+     * Set of all thread-local data areas.  Protected by the Statistics lock.
+     * The references to the statistics data are kept using phantom references
+     * to the associated threads. Proper clean-up is performed by the cleaner
+     * thread when the threads are garbage collected.
      */
-    private LinkedList<StatisticsData> allData;
+    private final Set<StatisticsDataReference> allData;
+
+    /**
+     * Global reference queue and a cleaner thread that manage statistics data
+     * references from all filesystem instances.
+     */
+    private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
+    private static final Thread STATS_DATA_CLEANER;
+
+    static {
+      STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
+      // start a single daemon cleaner thread
+      STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
+      STATS_DATA_CLEANER.
+          setName(StatisticsDataReferenceCleaner.class.getName());
+      STATS_DATA_CLEANER.setDaemon(true);
+      STATS_DATA_CLEANER.start();
+    }
 
     public Statistics(String scheme) {
       this.scheme = scheme;
-      this.rootData = new StatisticsData(null);
+      this.rootData = new StatisticsData();
       this.threadData = new ThreadLocal<StatisticsData>();
-      this.allData = null;
+      this.allData = new HashSet<StatisticsDataReference>();
     }
 
     /**
@@ -3020,7 +3030,7 @@ public abstract class FileSystem extends Configured implements Closeable {
      */
     public Statistics(Statistics other) {
       this.scheme = other.scheme;
-      this.rootData = new StatisticsData(null);
+      this.rootData = new StatisticsData();
       other.visitAll(new StatisticsAggregator<Void>() {
         @Override
         public void accept(StatisticsData data) {
@@ -3032,6 +3042,63 @@ public abstract class FileSystem extends Configured implements Closeable {
         }
       });
       this.threadData = new ThreadLocal<StatisticsData>();
+      this.allData = new HashSet<StatisticsDataReference>();
+    }
+
+    /**
+     * A phantom reference to a thread that also includes the data associated
+     * with that thread. On the thread being garbage collected, it is enqueued
+     * to the reference queue for clean-up.
+     */
+    private class StatisticsDataReference extends PhantomReference<Thread> {
+      private final StatisticsData data;
+
+      public StatisticsDataReference(StatisticsData data, Thread thread) {
+        super(thread, STATS_DATA_REF_QUEUE);
+        this.data = data;
+      }
+
+      public StatisticsData getData() {
+        return data;
+      }
+
+      /**
+       * Performs clean-up action when the associated thread is garbage
+       * collected.
+       */
+      public void cleanUp() {
+        // use the statistics lock for safety
+        synchronized (Statistics.this) {
+          /*
+           * If the thread that created this thread-local data no longer exists,
+           * remove the StatisticsData from our list and fold the values into
+           * rootData.
+           */
+          rootData.add(data);
+          allData.remove(this);
+        }
+      }
+    }
+
+    /**
+     * Background action to act on references being removed.
+     */
+    private static class StatisticsDataReferenceCleaner implements Runnable {
+      @Override
+      public void run() {
+        while (true) {
+          try {
+            StatisticsDataReference ref =
+                (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
+            ref.cleanUp();
+          } catch (Throwable th) {
+            // the cleaner thread should continue to run even if there are
+            // exceptions, including InterruptedException
+            LOG.warn("exception in the cleaner thread but it will continue to "
+                + "run", th);
+          }
+        }
+      }
     }
 
     /**
@@ -3040,14 +3107,12 @@ public abstract class FileSystem extends Configured implements Closeable {
     public StatisticsData getThreadStatistics() {
       StatisticsData data = threadData.get();
       if (data == null) {
-        data = new StatisticsData(
-            new WeakReference<Thread>(Thread.currentThread()));
+        data = new StatisticsData();
         threadData.set(data);
+        StatisticsDataReference ref =
+            new StatisticsDataReference(data, Thread.currentThread());
         synchronized(this) {
-          if (allData == null) {
-            allData = new LinkedList<StatisticsData>();
-          }
-          allData.add(data);
+          allData.add(ref);
         }
       }
       return data;
@@ -3105,21 +3170,9 @@ public abstract class FileSystem extends Configured implements Closeable {
      */
     private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
       visitor.accept(rootData);
-      if (allData != null) {
-        for (Iterator<StatisticsData> iter = allData.iterator();
-            iter.hasNext(); ) {
-          StatisticsData data = iter.next();
-          visitor.accept(data);
-          if (data.owner.get() == null) {
-            /*
-             * If the thread that created this thread-local data no
-             * longer exists, remove the StatisticsData from our list
-             * and fold the values into rootData.
-             */
-            rootData.add(data);
-            iter.remove();
-          }
-        }
+      for (StatisticsDataReference ref: allData) {
+        StatisticsData data = ref.getData();
+        visitor.accept(data);
       }
       return visitor.aggregate();
     }
@@ -3226,7 +3279,7 @@ public abstract class FileSystem extends Configured implements Closeable {
     @Override
     public String toString() {
       return visitAll(new StatisticsAggregator<String>() {
-        private StatisticsData total = new StatisticsData(null);
+        private StatisticsData total = new StatisticsData();
 
         @Override
         public void accept(StatisticsData data) {
@@ -3259,7 +3312,7 @@ public abstract class FileSystem extends Configured implements Closeable {
      */
     public void reset() {
       visitAll(new StatisticsAggregator<Void>() {
-        private StatisticsData total = new StatisticsData(null);
+        private StatisticsData total = new StatisticsData();
 
         @Override
         public void accept(StatisticsData data) {
@@ -3281,6 +3334,11 @@ public abstract class FileSystem extends Configured implements Closeable {
     public String getScheme() {
       return scheme;
     }
+
+    @VisibleForTesting
+    synchronized int getAllThreadLocalDataSize() {
+      return allData.size();
+    }
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e1bdc17/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java
index 90337a6..3e33362 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java
@@ -18,26 +18,34 @@
 
 package org.apache.hadoop.fs;
 
+import static org.apache.hadoop.fs.FileContextTestHelper.createFile;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.Uninterruptibles;
 
-import static org.apache.hadoop.fs.FileContextTestHelper.*;
-
 /**
  * <p>
  *   Base class to test {@link FileContext} Statistics.
  * </p>
  */
 public abstract class FCStatisticsBaseTest {
-  
   static protected int blockSize = 512;
   static protected int numBlocks = 1;
   
@@ -102,6 +110,48 @@ public abstract class FCStatisticsBaseTest {
     fc.delete(filePath, true);
   }
 
+  @Test(timeout=60000)
+  public void testStatisticsThreadLocalDataCleanUp() throws Exception {
+    final Statistics stats = new Statistics("test");
+    // create a small thread pool to test the statistics
+    final int size = 2;
+    ExecutorService es = Executors.newFixedThreadPool(size);
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(size);
+    for (int i = 0; i < size; i++) {
+      tasks.add(new Callable<Boolean>() {
+        public Boolean call() {
+          // this populates the data set in statistics
+          stats.incrementReadOps(1);
+          return true;
+        }
+      });
+    }
+    // run the threads
+    es.invokeAll(tasks);
+    // assert that the data size is exactly the number of threads
+    final AtomicInteger allDataSize = new AtomicInteger(0);
+    allDataSize.set(stats.getAllThreadLocalDataSize());
+    Assert.assertEquals(size, allDataSize.get());
+    Assert.assertEquals(size, stats.getReadOps());
+    // force the GC to collect the threads by shutting down the thread pool
+    es.shutdownNow();
+    es.awaitTermination(1, TimeUnit.MINUTES);
+    es = null;
+    System.gc();
+
+    // wait for up to 10 seconds
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            int size = stats.getAllThreadLocalDataSize();
+            allDataSize.set(size);
+            return size == 0;
+          }
+        }, 1000, 10*1000);
+    Assert.assertEquals(0, allDataSize.get());
+    Assert.assertEquals(size, stats.getReadOps());
+  }
+
   /**
    * Bytes read may be different for different file systems. This method should
    * throw assertion error if bytes read are incorrect.