You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mi...@apache.org on 2015/06/29 23:38:54 UTC
[1/2] hadoop git commit: HADOOP-12107. long running apps may have a
huge number of StatisticsData instances under FileSystem (Sangjin Lee via
Ming Ma)
Repository: hadoop
Updated Branches:
refs/heads/trunk 4672315e2 -> 34ee0b9b4
HADOOP-12107. long running apps may have a huge number of StatisticsData instances under FileSystem (Sangjin Lee via Ming Ma)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e1bdc17
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e1bdc17
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e1bdc17
Branch: refs/heads/trunk
Commit: 8e1bdc17d9134e01115ae7c929503d8ac0325207
Parents: 460e98f
Author: Ming Ma <mi...@apache.org>
Authored: Mon Jun 29 14:37:38 2015 -0700
Committer: Ming Ma <mi...@apache.org>
Committed: Mon Jun 29 14:37:38 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../java/org/apache/hadoop/fs/FileSystem.java | 140 +++++++++++++------
.../apache/hadoop/fs/FCStatisticsBaseTest.java | 56 +++++++-
3 files changed, 155 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e1bdc17/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index a9b44e3..50192ae 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -490,6 +490,9 @@ Trunk (Unreleased)
HADOOP-11347. RawLocalFileSystem#mkdir and create should honor umask (Varun
Saxena via Colin P. McCabe)
+ HADOOP-12107. long running apps may have a huge number of StatisticsData
+ instances under FileSystem (Sangjin Lee via Ming Ma)
+
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e1bdc17/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 3f9e3bd..1d7bc87 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -20,7 +20,8 @@ package org.apache.hadoop.fs;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.lang.ref.WeakReference;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
@@ -32,7 +33,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -2920,16 +2920,6 @@ public abstract class FileSystem extends Configured implements Closeable {
volatile int readOps;
volatile int largeReadOps;
volatile int writeOps;
- /**
- * Stores a weak reference to the thread owning this StatisticsData.
- * This allows us to remove StatisticsData objects that pertain to
- * threads that no longer exist.
- */
- final WeakReference<Thread> owner;
-
- StatisticsData(WeakReference<Thread> owner) {
- this.owner = owner;
- }
/**
* Add another StatisticsData object to this one.
@@ -3000,17 +2990,37 @@ public abstract class FileSystem extends Configured implements Closeable {
* Thread-local data.
*/
private final ThreadLocal<StatisticsData> threadData;
-
+
/**
- * List of all thread-local data areas. Protected by the Statistics lock.
+ * Set of all thread-local data areas. Protected by the Statistics lock.
+ * The references to the statistics data are kept using phantom references
+ * to the associated threads. Proper clean-up is performed by the cleaner
+ * thread when the threads are garbage collected.
*/
- private LinkedList<StatisticsData> allData;
+ private final Set<StatisticsDataReference> allData;
+
+ /**
+ * Global reference queue and a cleaner thread that manage statistics data
+ * references from all filesystem instances.
+ */
+ private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
+ private static final Thread STATS_DATA_CLEANER;
+
+ static {
+ STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
+ // start a single daemon cleaner thread
+ STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
+ STATS_DATA_CLEANER.
+ setName(StatisticsDataReferenceCleaner.class.getName());
+ STATS_DATA_CLEANER.setDaemon(true);
+ STATS_DATA_CLEANER.start();
+ }
public Statistics(String scheme) {
this.scheme = scheme;
- this.rootData = new StatisticsData(null);
+ this.rootData = new StatisticsData();
this.threadData = new ThreadLocal<StatisticsData>();
- this.allData = null;
+ this.allData = new HashSet<StatisticsDataReference>();
}
/**
@@ -3020,7 +3030,7 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
public Statistics(Statistics other) {
this.scheme = other.scheme;
- this.rootData = new StatisticsData(null);
+ this.rootData = new StatisticsData();
other.visitAll(new StatisticsAggregator<Void>() {
@Override
public void accept(StatisticsData data) {
@@ -3032,6 +3042,63 @@ public abstract class FileSystem extends Configured implements Closeable {
}
});
this.threadData = new ThreadLocal<StatisticsData>();
+ this.allData = new HashSet<StatisticsDataReference>();
+ }
+
+ /**
+ * A phantom reference to a thread that also includes the data associated
+ * with that thread. On the thread being garbage collected, it is enqueued
+ * to the reference queue for clean-up.
+ */
+ private class StatisticsDataReference extends PhantomReference<Thread> {
+ private final StatisticsData data;
+
+ public StatisticsDataReference(StatisticsData data, Thread thread) {
+ super(thread, STATS_DATA_REF_QUEUE);
+ this.data = data;
+ }
+
+ public StatisticsData getData() {
+ return data;
+ }
+
+ /**
+ * Performs clean-up action when the associated thread is garbage
+ * collected.
+ */
+ public void cleanUp() {
+ // use the statistics lock for safety
+ synchronized (Statistics.this) {
+ /*
+ * If the thread that created this thread-local data no longer exists,
+ * remove the StatisticsData from our list and fold the values into
+ * rootData.
+ */
+ rootData.add(data);
+ allData.remove(this);
+ }
+ }
+ }
+
+ /**
+ * Background action to act on references being removed.
+ */
+ private static class StatisticsDataReferenceCleaner implements Runnable {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ StatisticsDataReference ref =
+ (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
+ ref.cleanUp();
+ } catch (Throwable th) {
+ // the cleaner thread should continue to run even if there are
+ // exceptions, including InterruptedException
+ LOG.warn("exception in the cleaner thread but it will continue to "
+ + "run", th);
+ }
+ }
+ }
}
/**
@@ -3040,14 +3107,12 @@ public abstract class FileSystem extends Configured implements Closeable {
public StatisticsData getThreadStatistics() {
StatisticsData data = threadData.get();
if (data == null) {
- data = new StatisticsData(
- new WeakReference<Thread>(Thread.currentThread()));
+ data = new StatisticsData();
threadData.set(data);
+ StatisticsDataReference ref =
+ new StatisticsDataReference(data, Thread.currentThread());
synchronized(this) {
- if (allData == null) {
- allData = new LinkedList<StatisticsData>();
- }
- allData.add(data);
+ allData.add(ref);
}
}
return data;
@@ -3105,21 +3170,9 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
visitor.accept(rootData);
- if (allData != null) {
- for (Iterator<StatisticsData> iter = allData.iterator();
- iter.hasNext(); ) {
- StatisticsData data = iter.next();
- visitor.accept(data);
- if (data.owner.get() == null) {
- /*
- * If the thread that created this thread-local data no
- * longer exists, remove the StatisticsData from our list
- * and fold the values into rootData.
- */
- rootData.add(data);
- iter.remove();
- }
- }
+ for (StatisticsDataReference ref: allData) {
+ StatisticsData data = ref.getData();
+ visitor.accept(data);
}
return visitor.aggregate();
}
@@ -3226,7 +3279,7 @@ public abstract class FileSystem extends Configured implements Closeable {
@Override
public String toString() {
return visitAll(new StatisticsAggregator<String>() {
- private StatisticsData total = new StatisticsData(null);
+ private StatisticsData total = new StatisticsData();
@Override
public void accept(StatisticsData data) {
@@ -3259,7 +3312,7 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
public void reset() {
visitAll(new StatisticsAggregator<Void>() {
- private StatisticsData total = new StatisticsData(null);
+ private StatisticsData total = new StatisticsData();
@Override
public void accept(StatisticsData data) {
@@ -3281,6 +3334,11 @@ public abstract class FileSystem extends Configured implements Closeable {
public String getScheme() {
return scheme;
}
+
+ @VisibleForTesting
+ synchronized int getAllThreadLocalDataSize() {
+ return allData.size();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e1bdc17/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java
index 90337a6..3e33362 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java
@@ -18,26 +18,34 @@
package org.apache.hadoop.fs;
+import static org.apache.hadoop.fs.FileContextTestHelper.createFile;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.base.Supplier;
import com.google.common.util.concurrent.Uninterruptibles;
-import static org.apache.hadoop.fs.FileContextTestHelper.*;
-
/**
* <p>
* Base class to test {@link FileContext} Statistics.
* </p>
*/
public abstract class FCStatisticsBaseTest {
-
static protected int blockSize = 512;
static protected int numBlocks = 1;
@@ -102,6 +110,48 @@ public abstract class FCStatisticsBaseTest {
fc.delete(filePath, true);
}
+ @Test(timeout=60000)
+ public void testStatisticsThreadLocalDataCleanUp() throws Exception {
+ final Statistics stats = new Statistics("test");
+ // create a small thread pool to test the statistics
+ final int size = 2;
+ ExecutorService es = Executors.newFixedThreadPool(size);
+ List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(size);
+ for (int i = 0; i < size; i++) {
+ tasks.add(new Callable<Boolean>() {
+ public Boolean call() {
+ // this populates the data set in statistics
+ stats.incrementReadOps(1);
+ return true;
+ }
+ });
+ }
+ // run the threads
+ es.invokeAll(tasks);
+ // assert that the data size is exactly the number of threads
+ final AtomicInteger allDataSize = new AtomicInteger(0);
+ allDataSize.set(stats.getAllThreadLocalDataSize());
+ Assert.assertEquals(size, allDataSize.get());
+ Assert.assertEquals(size, stats.getReadOps());
+ // force the GC to collect the threads by shutting down the thread pool
+ es.shutdownNow();
+ es.awaitTermination(1, TimeUnit.MINUTES);
+ es = null;
+ System.gc();
+
+ // wait for up to 10 seconds
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ int size = stats.getAllThreadLocalDataSize();
+ allDataSize.set(size);
+ return size == 0;
+ }
+ }, 1000, 10*1000);
+ Assert.assertEquals(0, allDataSize.get());
+ Assert.assertEquals(size, stats.getReadOps());
+ }
+
/**
* Bytes read may be different for different file systems. This method should
* throw assertion error if bytes read are incorrect.
[2/2] hadoop git commit: Merge branch 'trunk' of
https://git-wip-us.apache.org/repos/asf/hadoop into trunk
Posted by mi...@apache.org.
Merge branch 'trunk' of https://git-wip-us.apache.org/repos/asf/hadoop into trunk
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34ee0b9b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34ee0b9b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34ee0b9b
Branch: refs/heads/trunk
Commit: 34ee0b9b4797093f5aeb0d55f32cf1b74b02e1c2
Parents: 8e1bdc1 4672315
Author: Ming Ma <mi...@apache.org>
Authored: Mon Jun 29 14:37:44 2015 -0700
Committer: Ming Ma <mi...@apache.org>
Committed: Mon Jun 29 14:37:44 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +++
.../api/records/impl/pb/SerializedExceptionPBImpl.java | 2 +-
.../records/impl/pb/TestSerializedExceptionPBImpl.java | 12 ++++++++++--
3 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------