You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/06/03 16:46:31 UTC
svn commit: r1489012 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/
hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/...
Author: jlowe
Date: Mon Jun 3 14:46:30 2013
New Revision: 1489012
URL: http://svn.apache.org/r1489012
Log:
MAPREDUCE-5268. Improve history server startup performance. Contributed by Karthik Kambatla
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobIdHistoryFileInfoMap.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobListCache.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1489012&r1=1489011&r2=1489012&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Jun 3 14:46:30 2013
@@ -281,6 +281,9 @@ Release 2.1.0-beta - UNRELEASED
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
(Gelesh via bobby)
+ MAPREDUCE-5268. Improve history server startup performance (Karthik
+ Kambatla via jlowe)
+
BUG FIXES
MAPREDUCE-4671. AM does not tell the RM about container requests which are
@@ -1064,6 +1067,21 @@ Release 2.0.0-alpha - 05-23-2012
MAPREDUCE-4444. nodemanager fails to start when one of the local-dirs is
bad (Jason Lowe via bobby)
+Release 0.23.9 - UNRELEASED
+
+ INCOMPATIBLE CHANGES
+
+ NEW FEATURES
+
+ IMPROVEMENTS
+
+ OPTIMIZATIONS
+
+ MAPREDUCE-5268. Improve history server startup performance (Karthik
+ Kambatla via jlowe)
+
+ BUG FIXES
+
Release 0.23.8 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1489012&r1=1489011&r2=1489012&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Mon Jun 3 14:46:30 2013
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -36,6 +37,7 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -131,19 +133,73 @@ public class HistoryFileManager extends
}
}
- static class JobListCache {
+ /**
+ * Wrapper around {@link ConcurrentSkipListMap} that maintains size along
+ * side for O(1) size() implementation for use in JobListCache.
+ *
+ * Note: The size is not updated atomically with changes additions/removals.
+ * This race can lead to size() returning an incorrect size at times.
+ */
+ static class JobIdHistoryFileInfoMap {
private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
+ private AtomicInteger mapSize;
+
+ JobIdHistoryFileInfoMap() {
+ cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
+ mapSize = new AtomicInteger();
+ }
+
+ public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) {
+ HistoryFileInfo ret = cache.putIfAbsent(key, value);
+ if (ret == null) {
+ mapSize.incrementAndGet();
+ }
+ return ret;
+ }
+
+ public HistoryFileInfo remove(JobId key) {
+ HistoryFileInfo ret = cache.remove(key);
+ if (ret != null) {
+ mapSize.decrementAndGet();
+ }
+ return ret;
+ }
+
+ /**
+ * Returns the recorded size of the internal map. Note that this could be out
+ * of sync with the actual size of the map
+ * @return "recorded" size
+ */
+ public int size() {
+ return mapSize.get();
+ }
+
+ public HistoryFileInfo get(JobId key) {
+ return cache.get(key);
+ }
+
+ public NavigableSet<JobId> navigableKeySet() {
+ return cache.navigableKeySet();
+ }
+
+ public Collection<HistoryFileInfo> values() {
+ return cache.values();
+ }
+ }
+
+ static class JobListCache {
+ private JobIdHistoryFileInfoMap cache;
private int maxSize;
private long maxAge;
public JobListCache(int maxSize, long maxAge) {
this.maxSize = maxSize;
this.maxAge = maxAge;
- this.cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
+ this.cache = new JobIdHistoryFileInfoMap();
}
public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
- JobId jobId = fileInfo.getJobIndexInfo().getJobId();
+ JobId jobId = fileInfo.getJobId();
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + jobId + " to job list cache with "
+ fileInfo.getJobIndexInfo());
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobIdHistoryFileInfoMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobIdHistoryFileInfoMap.java?rev=1489012&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobIdHistoryFileInfoMap.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobIdHistoryFileInfoMap.java Mon Jun 3 14:46:30 2013
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.util.Collection;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobIdHistoryFileInfoMap;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestJobIdHistoryFileInfoMap {
+
+ private boolean checkSize(JobIdHistoryFileInfoMap map, int size)
+ throws InterruptedException {
+ for (int i = 0; i < 100; i++) {
+ if (map.size() != size)
+ Thread.sleep(20);
+ else
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Trivial test case that verifies basic functionality of {@link
+ * JobIdHistoryFileInfoMap}
+ */
+ @Test(timeout = 2000)
+ public void testWithSingleElement() throws InterruptedException {
+ JobIdHistoryFileInfoMap mapWithSize = new JobIdHistoryFileInfoMap();
+
+ JobId jobId = MRBuilderUtils.newJobId(1, 1, 1);
+ HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class);
+ Mockito.when(fileInfo1.getJobId()).thenReturn(jobId);
+
+ // add it twice
+ assertEquals("Incorrect return on putIfAbsent()",
+ null, mapWithSize.putIfAbsent(jobId, fileInfo1));
+ assertEquals("Incorrect return on putIfAbsent()",
+ fileInfo1, mapWithSize.putIfAbsent(jobId, fileInfo1));
+
+ // check get()
+ assertEquals("Incorrect get()", fileInfo1, mapWithSize.get(jobId));
+ assertTrue("Incorrect size()", checkSize(mapWithSize, 1));
+
+ // check navigableKeySet()
+ NavigableSet<JobId> set = mapWithSize.navigableKeySet();
+ assertEquals("Incorrect navigableKeySet()", 1, set.size());
+ assertTrue("Incorrect navigableKeySet()", set.contains(jobId));
+
+ // check values()
+ Collection<HistoryFileInfo> values = mapWithSize.values();
+ assertEquals("Incorrect values()", 1, values.size());
+ assertTrue("Incorrect values()", values.contains(fileInfo1));
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobListCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobListCache.java?rev=1489012&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobListCache.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobListCache.java Mon Jun 3 14:46:30 2013
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.lang.InterruptedException;
+import java.util.Collection;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.*;
+
+public class TestJobListCache {
+
+ @Test (timeout = 1000)
+ public void testAddExisting() {
+ JobListCache cache = new JobListCache(2, 1000);
+
+ JobId jobId = MRBuilderUtils.newJobId(1, 1, 1);
+ HistoryFileInfo fileInfo = Mockito.mock(HistoryFileInfo.class);
+ Mockito.when(fileInfo.getJobId()).thenReturn(jobId);
+
+ cache.addIfAbsent(fileInfo);
+ cache.addIfAbsent(fileInfo);
+ assertEquals("Incorrect number of cache entries", 1,
+ cache.values().size());
+ }
+
+ @Test (timeout = 1000)
+ public void testEviction() throws InterruptedException {
+ int maxSize = 2;
+ JobListCache cache = new JobListCache(maxSize, 1000);
+
+ JobId jobId1 = MRBuilderUtils.newJobId(1, 1, 1);
+ HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class);
+ Mockito.when(fileInfo1.getJobId()).thenReturn(jobId1);
+
+ JobId jobId2 = MRBuilderUtils.newJobId(2, 2, 2);
+ HistoryFileInfo fileInfo2 = Mockito.mock(HistoryFileInfo.class);
+ Mockito.when(fileInfo2.getJobId()).thenReturn(jobId2);
+
+ JobId jobId3 = MRBuilderUtils.newJobId(3, 3, 3);
+ HistoryFileInfo fileInfo3 = Mockito.mock(HistoryFileInfo.class);
+ Mockito.when(fileInfo3.getJobId()).thenReturn(jobId3);
+
+ cache.addIfAbsent(fileInfo1);
+ cache.addIfAbsent(fileInfo2);
+ cache.addIfAbsent(fileInfo3);
+
+ Collection <HistoryFileInfo> values;
+ for (int i = 0; i < 9; i++) {
+ values = cache.values();
+ if (values.size() > maxSize) {
+ Thread.sleep(100);
+ } else {
+ assertFalse("fileInfo1 should have been evicted",
+ values.contains(fileInfo1));
+ return;
+ }
+ }
+ fail("JobListCache didn't delete the extra entry");
+ }
+}