You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by eb...@apache.org on 2020/06/08 20:18:46 UTC
[hadoop] branch branch-3.1 updated: Revert "MAPREDUCE-7277.
IndexCache totalMemoryUsed differs from cache contents. Contributed by Jon
Eagles (jeagles)."
This is an automated email from the ASF dual-hosted git repository.
ebadger pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 0799901 Revert "MAPREDUCE-7277. IndexCache totalMemoryUsed differs from cache contents. Contributed by Jon Eagles (jeagles)."
0799901 is described below
commit 0799901d48c625274fb1588eb3a27e9eaff565bd
Author: Eric Badger <eb...@verizonmedia.com>
AuthorDate: Mon Jun 8 20:12:00 2020 +0000
Revert "MAPREDUCE-7277. IndexCache totalMemoryUsed differs from cache contents. Contributed by Jon Eagles (jeagles)."
This reverts commit 7e05577c4755802e24d14d80282edfdbe14304ab.
---
.../java/org/apache/hadoop/mapred/IndexCache.java | 95 ++++++++--------------
.../org/apache/hadoop/mapred/TestIndexCache.java | 56 ++++++-------
2 files changed, 56 insertions(+), 95 deletions(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java
index 80cbcca..0e24bbe 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java
@@ -72,12 +72,11 @@ class IndexCache {
try {
info.wait();
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
throw new IOException("Interrupted waiting for construction", e);
}
}
}
- LOG.debug("IndexCache HIT: MapId {} found", mapId);
+ LOG.debug("IndexCache HIT: MapId " + mapId + " found");
}
if (info.mapSpillRecord.size() == 0 ||
@@ -107,91 +106,63 @@ class IndexCache {
try {
info.wait();
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
throw new IOException("Interrupted waiting for construction", e);
}
}
}
- LOG.debug("IndexCache HIT: MapId {} found", mapId);
+ LOG.debug("IndexCache HIT: MapId " + mapId + " found");
return info;
}
- LOG.debug("IndexCache MISS: MapId {} not found", mapId);
+ LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
SpillRecord tmp = null;
- boolean success = false;
try {
tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner);
- success = true;
- } catch (Throwable e) {
+ } catch (Throwable e) {
tmp = new SpillRecord(0);
cache.remove(mapId);
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
throw new IOException("Error Reading IndexFile", e);
- } finally {
- synchronized (newInd) {
+ } finally {
+ synchronized (newInd) {
newInd.mapSpillRecord = tmp;
- if (success) {
- // Only add mapId to the queue for successful read and after added to
- // the cache. Once in the queue, it is now eligible for removal once
- // construction is finished.
- queue.add(mapId);
- if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
- freeIndexInformation();
- }
- }
newInd.notifyAll();
}
}
-
+ queue.add(mapId);
+
+ if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
+ freeIndexInformation();
+ }
return newInd;
}
/**
- * This method removes the map from the cache if it is present in the queue.
+ * This method removes the map from the cache if index information for this
+ * map is loaded(size>0), index information entry in cache will not be
+ * removed if it is in the loading phrase(size=0), this prevents corruption
+ * of totalMemoryUsed. It should be called when a map output on this tracker
+ * is discarded.
* @param mapId The taskID of this map.
*/
- public void removeMap(String mapId) throws IOException {
- // Successfully removing the mapId from the queue enters into a contract
- // that this thread will remove the corresponding mapId from the cache.
- if (!queue.remove(mapId)) {
- LOG.debug("Map ID {} not found in queue", mapId);
- return;
- }
- removeMapInternal(mapId);
- }
-
- /** This method should only be called upon successful removal of mapId from
- * the queue. The mapId will be removed from the cache and totalUsedMemory
- * will be decremented.
- * @param mapId the cache item to be removed
- * @throws IOException
- */
- private void removeMapInternal(String mapId) throws IOException {
- IndexInformation info = cache.remove(mapId);
- if (info == null) {
- // Inconsistent state as presence in queue implies presence in cache
- LOG.warn("Map ID " + mapId + " not found in cache");
+ public void removeMap(String mapId) {
+ IndexInformation info = cache.get(mapId);
+ if (info == null || isUnderConstruction(info)) {
return;
}
- try {
- synchronized(info) {
- while (isUnderConstruction(info)) {
- info.wait();
- }
- totalMemoryUsed.getAndAdd(-info.getSize());
+ info = cache.remove(mapId);
+ if (info != null) {
+ totalMemoryUsed.addAndGet(-info.getSize());
+ if (!queue.remove(mapId)) {
+ LOG.warn("Map ID" + mapId + " not found in queue!!");
}
- } catch (InterruptedException e) {
- totalMemoryUsed.getAndAdd(-info.getSize());
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted waiting for construction", e);
+ } else {
+ LOG.info("Map ID " + mapId + " not found in cache");
}
}
/**
- * This method checks if cache and totalMemoryUsed is consistent.
+ * This method checks if cache and totolMemoryUsed is consistent.
* It is only used for unit test.
- * @return True if cache and totalMemoryUsed is consistent
+ * @return True if cache and totolMemoryUsed is consistent
*/
boolean checkTotalMemoryUsed() {
int totalSize = 0;
@@ -204,13 +175,13 @@ class IndexCache {
/**
* Bring memory usage below totalMemoryAllowed.
*/
- private synchronized void freeIndexInformation() throws IOException {
+ private synchronized void freeIndexInformation() {
while (totalMemoryUsed.get() > totalMemoryAllowed) {
- if(queue.isEmpty()) {
- break;
+ String s = queue.remove();
+ IndexInformation info = cache.remove(s);
+ if (info != null) {
+ totalMemoryUsed.addAndGet(-info.getSize());
}
- String mapId = queue.remove();
- removeMapInternal(mapId);
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java
index e7b6915..0cc3c66 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java
@@ -21,7 +21,6 @@ import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CRC32;
import java.util.zip.CheckedOutputStream;
@@ -217,32 +216,23 @@ public class TestIndexCache {
final String user =
UserGroupInformation.getCurrentUser().getShortUserName();
writeFile(fs, big, bytesPerFile, partsPerMap);
-
- // Capture if any runtime exception occurred
- AtomicBoolean failed = new AtomicBoolean();
-
+
// run multiple times
for (int i = 0; i < 20; ++i) {
Thread getInfoThread = new Thread() {
@Override
public void run() {
try {
- cache.getIndexInformation("bigIndex", 0, big, user);
+ cache.getIndexInformation("bigIndex", partsPerMap, big, user);
} catch (Exception e) {
// should not be here
- failed.set(true);
}
}
};
Thread removeMapThread = new Thread() {
@Override
public void run() {
- try {
- cache.removeMap("bigIndex");
- } catch (Exception e) {
- // should not be here
- failed.set(true);
- }
+ cache.removeMap("bigIndex");
}
};
if (i%2==0) {
@@ -255,7 +245,6 @@ public class TestIndexCache {
getInfoThread.join();
removeMapThread.join();
assertEquals(true, cache.checkTotalMemoryUsed());
- assertFalse("An unexpected exception", failed.get());
}
}
@@ -272,9 +261,6 @@ public class TestIndexCache {
UserGroupInformation.getCurrentUser().getShortUserName();
writeFile(fs, racy, bytesPerFile, partsPerMap);
- // Capture if any runtime exception occurred
- AtomicBoolean failed = new AtomicBoolean();
-
// run multiple instances
Thread[] getInfoThreads = new Thread[50];
for (int i = 0; i < 50; i++) {
@@ -282,15 +268,10 @@ public class TestIndexCache {
@Override
public void run() {
try {
- while (!Thread.currentThread().isInterrupted()) {
- cache.getIndexInformation("racyIndex", 0, racy, user);
- cache.removeMap("racyIndex");
- }
+ cache.getIndexInformation("racyIndex", partsPerMap, racy, user);
+ cache.removeMap("racyIndex");
} catch (Exception e) {
- if (!Thread.currentThread().isInterrupted()) {
- // should not be here
- failed.set(true);
- }
+ // should not be here
}
}
};
@@ -300,12 +281,20 @@ public class TestIndexCache {
getInfoThreads[i].start();
}
- // The duration to keep the threads testing
- Thread.sleep(5000);
+ final Thread mainTestThread = Thread.currentThread();
+
+ Thread timeoutThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(15000);
+ mainTestThread.interrupt();
+ } catch (InterruptedException ie) {
+ // we are done;
+ }
+ }
+ };
- for (int i = 0; i < 50; i++) {
- getInfoThreads[i].interrupt();
- }
for (int i = 0; i < 50; i++) {
try {
getInfoThreads[i].join();
@@ -314,9 +303,10 @@ public class TestIndexCache {
fail("Unexpectedly long delay during concurrent cache entry creations");
}
}
- assertFalse("An unexpected exception", failed.get());
- assertTrue("Total memory used does not represent contents of the cache",
- cache.checkTotalMemoryUsed());
+ // stop the timeoutThread. If we get interrupted before stopping, there
+ // must be something wrong, although it wasn't a deadlock. No need to
+ // catch and swallow.
+ timeoutThread.interrupt();
}
private static void checkRecord(IndexRecord rec, long fill) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org