You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/05/26 15:30:38 UTC
svn commit: r778696 [2/2] - in /hadoop/core/trunk: ./ conf/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/ma...
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue May 26 13:30:37 2009
@@ -200,58 +200,10 @@
private TaskMemoryManagerThread taskMemoryManager;
private boolean taskMemoryManagerEnabled = true;
private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
- private long totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
- private long reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
- private long reservedPmem = JobConf.DISABLED_MEMORY_LIMIT;
-
- // Cluster wide default value for max-vm per task
- private long defaultMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
- // Cluster wide upper limit on max-vm per task
- private long limitMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
-
- /**
- * Configuration property to specify the amount of virtual memory that has to
- * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
- * virtual memory should be a part of the total virtual memory available on
- * the TaskTracker. TaskTracker obtains the total virtual memory available on
- * the system by using a {@link MemoryCalculatorPlugin}. The total physical
- * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
- * MemoryCalculatorPlugin implementation.
- *
- * <p>
- *
- * The reserved virtual memory and the total virtual memory values are
- * reported by the TaskTracker as part of heart-beat so that they can
- * considered by a scheduler.
- *
- * <p>
- *
- * These two values are also used by the TaskTracker for tracking tasks'
- * memory usage. Memory management functionality on a TaskTracker is disabled
- * if this property is not set, if it more than the total virtual memory
- * reported by MemoryCalculatorPlugin, or if either of the values is negative.
- */
- static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY =
- "mapred.tasktracker.vmem.reserved";
-
- /**
- * Configuration property to specify the amount of physical memory that has to
- * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
- * physical memory should be a part of the total physical memory available on
- * the TaskTracker. TaskTracker obtains the total physical memory available on
- * the system by using a {@link MemoryCalculatorPlugin}. The total physical
- * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
- * MemoryCalculatorPlugin implementation.
- *
- * <p>
- *
- * The reserved virtual memory and the total virtual memory values are
- * reported by the TaskTracker as part of heart-beat so that they can
- * considered by a scheduler.
- *
- */
- static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
- "mapred.tasktracker.pmem.reserved";
+ private long totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+ private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+ private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+ private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
"mapred.tasktracker.memory_calculator_plugin";
@@ -1247,14 +1199,14 @@
long freeDiskSpace = getFreeSpace();
long totVmem = getTotalVirtualMemoryOnTT();
long totPmem = getTotalPhysicalMemoryOnTT();
- long rsrvdVmem = getReservedVirtualMemory();
- long rsrvdPmem = getReservedPhysicalMemory();
status.getResourceStatus().setAvailableSpace(freeDiskSpace);
status.getResourceStatus().setTotalVirtualMemory(totVmem);
status.getResourceStatus().setTotalPhysicalMemory(totPmem);
- status.getResourceStatus().setReservedVirtualMemory(rsrvdVmem);
- status.getResourceStatus().setReservedPhysicalMemory(rsrvdPmem);
+ status.getResourceStatus().setMapSlotMemorySizeOnTT(
+ mapSlotMemorySizeOnTT);
+ status.getResourceStatus().setReduceSlotMemorySizeOnTT(
+ reduceSlotSizeMemoryOnTT);
}
//
@@ -1317,53 +1269,11 @@
* @return total size of physical memory.
*/
long getTotalPhysicalMemoryOnTT() {
- return totalPmemOnTT;
- }
-
- /**
- * Return the amount of virtual memory reserved on the TaskTracker for system
- * usage (OS, TT etc).
- */
- long getReservedVirtualMemory() {
- return reservedVirtualMemory;
- }
-
- /**
- * Return the amount of physical memory reserved on the TaskTracker for system
- * usage (OS, TT etc).
- */
- long getReservedPhysicalMemory() {
- return reservedPmem;
+ return totalPhysicalMemoryOnTT;
}
- /**
- * Return the limit on the maxVMemPerTask on this TaskTracker
- * @return limitMaxVmPerTask
- */
- long getLimitMaxVMemPerTask() {
- return limitMaxVmPerTask;
- }
-
- /**
- * Obtain the virtual memory allocated for a TIP.
- *
- * If the TIP's job has a configured value for the max-virtual memory, that
- * will be returned. Else, the cluster-wide default maxvirtual memory for
- * tasks is returned.
- *
- * @param conf
- * @return the virtual memory allocated for the TIP.
- */
- long getVirtualMemoryForTask(JobConf conf) {
- long vMemForTask =
- normalizeMemoryConfigValue(conf.getMaxVirtualMemoryForTask());
- if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
- vMemForTask =
- normalizeMemoryConfigValue(fConf.getLong(
- JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
- }
- return vMemForTask;
+ long getTotalMemoryAllottedForTasksOnTT() {
+ return totalMemoryAllottedForTasks;
}
/**
@@ -1637,7 +1547,6 @@
private TaskLauncher mapLauncher;
private TaskLauncher reduceLauncher;
-
public JvmManager getJvmManagerInstance() {
return jvmManager;
}
@@ -1775,10 +1684,12 @@
}
}
- void addToMemoryManager(TaskAttemptID attemptId,
+ void addToMemoryManager(TaskAttemptID attemptId, boolean isMap,
JobConf conf) {
if (isTaskMemoryManagerEnabled()) {
- taskMemoryManager.addTask(attemptId, getVirtualMemoryForTask(conf));
+ taskMemoryManager.addTask(attemptId, isMap ? conf
+ .getMemoryForMapTask() * 1024 * 1024L : conf
+ .getMemoryForReduceTask() * 1024 * 1024L);
}
}
@@ -3119,33 +3030,35 @@
+ "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
}
- totalPmemOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
- if (totalPmemOnTT <= 0) {
+ totalPhysicalMemoryOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
+ if (totalPhysicalMemoryOnTT <= 0) {
LOG.warn("TaskTracker's totalPmem could not be calculated. "
+ "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
- totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+ totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
}
}
- reservedVirtualMemory =
- normalizeMemoryConfigValue(fConf.getLong(
- TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
-
- reservedPmem =
- normalizeMemoryConfigValue(fConf.getLong(
- TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
-
- defaultMaxVmPerTask =
- normalizeMemoryConfigValue(fConf.getLong(
- JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
-
- limitMaxVmPerTask =
- normalizeMemoryConfigValue(fConf.getLong(
- JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
+ mapSlotMemorySizeOnTT =
+ fConf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT);
+ reduceSlotSizeMemoryOnTT =
+ fConf.getLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT);
+ totalMemoryAllottedForTasks =
+ maxCurrentMapTasks * mapSlotMemorySizeOnTT + maxCurrentReduceTasks
+ * reduceSlotSizeMemoryOnTT;
+ if (totalMemoryAllottedForTasks < 0) {
+ totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+ }
+ if (totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT) {
+ LOG.info("totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT."
+ + " Thrashing might happen.");
+ } else if (totalMemoryAllottedForTasks > totalVirtualMemoryOnTT) {
+ LOG.info("totalMemoryAllottedForTasks > totalVirtualMemoryOnTT."
+ + " Thrashing might happen.");
+ }
// start the taskMemoryManager thread only if enabled
setTaskMemoryManagerEnabledFlag();
@@ -3164,55 +3077,12 @@
return;
}
- // /// Missing configuration
- StringBuilder mesg = new StringBuilder();
-
- long totalVmemOnTT = getTotalVirtualMemoryOnTT();
- if (totalVmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
- mesg.append("TaskTracker's totalVmem could not be calculated.\n");
+ if (totalMemoryAllottedForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
taskMemoryManagerEnabled = false;
- }
-
- long reservedVmem = getReservedVirtualMemory();
- if (reservedVmem == JobConf.DISABLED_MEMORY_LIMIT) {
- mesg.append("TaskTracker's reservedVmem is not configured.\n");
- taskMemoryManagerEnabled = false;
- }
-
- if (defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
- mesg.append("TaskTracker's defaultMaxVmPerTask is not configured.\n");
- taskMemoryManagerEnabled = false;
- }
-
- if (limitMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
- mesg.append("TaskTracker's limitMaxVmPerTask is not configured.\n");
- taskMemoryManagerEnabled = false;
- }
-
- if (!taskMemoryManagerEnabled) {
- LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
- return;
- }
- // ///// End of missing configuration
-
- // ///// Mis-configuration
- if (defaultMaxVmPerTask > limitMaxVmPerTask) {
- mesg.append("defaultMaxVmPerTask is mis-configured. "
- + "It shouldn't be greater than limitMaxVmPerTask. ");
- taskMemoryManagerEnabled = false;
- }
-
- if (reservedVmem > totalVmemOnTT) {
- mesg.append("reservedVmemOnTT is mis-configured. "
- + "It shouldn't be greater than totalVmemOnTT");
- taskMemoryManagerEnabled = false;
- }
-
- if (!taskMemoryManagerEnabled) {
- LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
+ LOG.warn("TaskTracker's totalMemoryAllottedForTasks is -1."
+ + " TaskMemoryManager is disabled.");
return;
}
- // ///// End of mis-configuration
taskMemoryManagerEnabled = true;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue May 26 13:30:37 2009
@@ -55,16 +55,16 @@
static class ResourceStatus implements Writable {
private long totalVirtualMemory;
- private long reservedVirtualMemory;
private long totalPhysicalMemory;
- private long reservedPhysicalMemory;
+ private long mapSlotMemorySizeOnTT;
+ private long reduceSlotMemorySizeOnTT;
private long availableSpace;
ResourceStatus() {
totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
- reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
- reservedPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
+ mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+ reduceSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
availableSpace = Long.MAX_VALUE;
}
@@ -90,24 +90,6 @@
}
/**
- * Set the amount of virtual memory reserved on the TaskTracker for system
- * usage (OS, TT etc).
- *
- * @param reservedVmem amount of virtual memory reserved in bytes.
- */
- void setReservedVirtualMemory(long reservedVmem) {
- reservedVirtualMemory = reservedVmem;
- }
-
- /**
- * Get the amount of virtual memory reserved on the TaskTracker for system
- * usage (OS, TT etc).
- */
- long getReservedTotalMemory() {
- return reservedVirtualMemory;
- }
-
- /**
* Set the maximum amount of physical memory on the tasktracker.
*
* @param totalRAM maximum amount of physical memory on the tasktracker in
@@ -130,23 +112,49 @@
}
/**
- * Set the amount of physical memory reserved on the TaskTracker for system
- * usage (OS, TT etc).
+ * Set the memory size of each map slot on this TT. This will be used by JT
+ * for accounting more slots for jobs that use more memory.
+ *
+ * @param mem
+ */
+ void setMapSlotMemorySizeOnTT(long mem) {
+ mapSlotMemorySizeOnTT = mem;
+ }
+
+ /**
+ * Get the memory size of each map slot on this TT. See
+ * {@link #setMapSlotMemorySizeOnTT(long)}
*
- * @param reservedPmem amount of physical memory reserved in bytes.
+ * @return
*/
- void setReservedPhysicalMemory(long reservedPmem) {
- reservedPhysicalMemory = reservedPmem;
+ long getMapSlotMemorySizeOnTT() {
+ return mapSlotMemorySizeOnTT;
}
/**
- * Get the amount of physical memory reserved on the TaskTracker for system
- * usage (OS, TT etc).
+ * Set the memory size of each reduce slot on this TT. This will be used by
+ * JT for accounting more slots for jobs that use more memory.
+ *
+ * @param mem
*/
- long getReservedPhysicalMemory() {
- return reservedPhysicalMemory;
+ void setReduceSlotMemorySizeOnTT(long mem) {
+ reduceSlotMemorySizeOnTT = mem;
}
+ /**
+ * Get the memory size of each reduce slot on this TT. See
+ * {@link #setReduceSlotMemorySizeOnTT(long)}
+ *
+ * @return
+ */
+ long getReduceSlotMemorySizeOnTT() {
+ return reduceSlotMemorySizeOnTT;
+ }
+
+ /**
+ * Set the available disk space on the TT
+ * @param availSpace
+ */
void setAvailableSpace(long availSpace) {
availableSpace = availSpace;
}
@@ -161,17 +169,17 @@
public void write(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, totalVirtualMemory);
- WritableUtils.writeVLong(out, reservedVirtualMemory);
WritableUtils.writeVLong(out, totalPhysicalMemory);
- WritableUtils.writeVLong(out, reservedPhysicalMemory);
+ WritableUtils.writeVLong(out, mapSlotMemorySizeOnTT);
+ WritableUtils.writeVLong(out, reduceSlotMemorySizeOnTT);
WritableUtils.writeVLong(out, availableSpace);
}
public void readFields(DataInput in) throws IOException {
totalVirtualMemory = WritableUtils.readVLong(in);
- reservedVirtualMemory = WritableUtils.readVLong(in);
totalPhysicalMemory = WritableUtils.readVLong(in);
- reservedPhysicalMemory = WritableUtils.readVLong(in);
+ mapSlotMemorySizeOnTT = WritableUtils.readVLong(in);
+ reduceSlotMemorySizeOnTT = WritableUtils.readVLong(in);
availableSpace = WritableUtils.readVLong(in);
}
}
Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java?rev=778696&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java (added)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java Tue May 26 13:30:37 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.ToolRunner;
+
+import junit.framework.TestCase;
+
+public class TestSubmitJob extends TestCase {
+ private MiniMRCluster miniMRCluster;
+
+ @Override
+ protected void tearDown()
+ throws Exception {
+ if (miniMRCluster != null) {
+ miniMRCluster.shutdown();
+ }
+ }
+
+ /**
+ * Test to verify that jobs with invalid memory requirements are killed at the
+ * JT.
+ *
+ * @throws Exception
+ */
+ public void testJobWithInvalidMemoryReqs()
+ throws Exception {
+ JobConf jtConf = new JobConf();
+ jtConf
+ .setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
+ jtConf.setLong(JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+ 2 * 1024L);
+ jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 3 * 1024L);
+ jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 4 * 1024L);
+
+ miniMRCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
+
+ JobConf clusterConf = miniMRCluster.createJobConf();
+
+ // No map-memory configuration
+ JobConf jobConf = new JobConf(clusterConf);
+ jobConf.setMemoryForReduceTask(1 * 1024L);
+ runJobAndVerifyFailure(jobConf, JobConf.DISABLED_MEMORY_LIMIT, 1 * 1024L,
+ "Invalid job requirements.");
+
+ // No reduce-memory configuration
+ jobConf = new JobConf(clusterConf);
+ jobConf.setMemoryForMapTask(1 * 1024L);
+ runJobAndVerifyFailure(jobConf, 1 * 1024L, JobConf.DISABLED_MEMORY_LIMIT,
+ "Invalid job requirements.");
+
+ // Invalid map-memory configuration
+ jobConf = new JobConf(clusterConf);
+ jobConf.setMemoryForMapTask(4 * 1024L);
+ jobConf.setMemoryForReduceTask(1 * 1024L);
+ runJobAndVerifyFailure(jobConf, 4 * 1024L, 1 * 1024L,
+ "Exceeds the cluster's max-memory-limit.");
+
+ // No reduce-memory configuration
+ jobConf = new JobConf(clusterConf);
+ jobConf.setMemoryForMapTask(1 * 1024L);
+ jobConf.setMemoryForReduceTask(5 * 1024L);
+ runJobAndVerifyFailure(jobConf, 1 * 1024L, 5 * 1024L,
+ "Exceeds the cluster's max-memory-limit.");
+ }
+
+ private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks,
+ long memForReduceTasks, String expectedMsg)
+ throws Exception,
+ IOException {
+ String[] args = { "-m", "0", "-r", "0", "-mt", "0", "-rt", "0" };
+ boolean throwsException = false;
+ String msg = null;
+ try {
+ ToolRunner.run(jobConf, new SleepJob(), args);
+ } catch (RemoteException re) {
+ throwsException = true;
+ msg = re.unwrapRemoteException().getMessage();
+ }
+ assertTrue(throwsException);
+ assertNotNull(msg);
+
+ String overallExpectedMsg =
+ "(" + memForMapTasks + " memForMapTasks " + memForReduceTasks
+ + " memForReduceTasks): " + expectedMsg;
+ assertTrue("Observed message - " + msg
+ + " - doesn't contain expected message - " + overallExpectedMsg, msg
+ .contains(overallExpectedMsg));
+ }
+}
Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java Tue May 26 13:30:37 2009
@@ -22,10 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
import org.apache.hadoop.util.MemoryCalculatorPlugin;
import org.apache.hadoop.util.ToolRunner;
@@ -46,7 +43,6 @@
static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class);
- private MiniDFSCluster miniDFSCluster;
private MiniMRCluster miniMRCluster;
/**
@@ -77,41 +73,42 @@
getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
long totalPhysicalMemoryOnTT =
getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
- long virtualMemoryReservedOnTT =
- getConf().getLong("reservedVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
- long physicalMemoryReservedOnTT =
- getConf().getLong("reservedPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+ long mapSlotMemorySize =
+ getConf().getLong("mapSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
+ long reduceSlotMemorySize =
+ getConf()
+ .getLong("reduceSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
long reportedTotalVirtualMemoryOnTT =
status.getResourceStatus().getTotalVirtualMemory();
long reportedTotalPhysicalMemoryOnTT =
status.getResourceStatus().getTotalPhysicalMemory();
- long reportedVirtualMemoryReservedOnTT =
- status.getResourceStatus().getReservedTotalMemory();
- long reportedPhysicalMemoryReservedOnTT =
- status.getResourceStatus().getReservedPhysicalMemory();
+ long reportedMapSlotMemorySize =
+ status.getResourceStatus().getMapSlotMemorySizeOnTT();
+ long reportedReduceSlotMemorySize =
+ status.getResourceStatus().getReduceSlotMemorySizeOnTT();
message =
"expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
- + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
- + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ", "
- + virtualMemoryReservedOnTT + ", " + physicalMemoryReservedOnTT
- + ")";
+ + "mapSlotMemSize, reduceSlotMemorySize) = ("
+ + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ","
+ + mapSlotMemorySize + "," + reduceSlotMemorySize + ")";
message +=
"\nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
- + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
+ + "reportedMapSlotMemorySize, reportedReduceSlotMemorySize) = ("
+ reportedTotalVirtualMemoryOnTT
+ ", "
+ reportedTotalPhysicalMemoryOnTT
- + ", "
- + reportedVirtualMemoryReservedOnTT
- + ", "
- + reportedPhysicalMemoryReservedOnTT + ")";
+ + ","
+ + reportedMapSlotMemorySize
+ + ","
+ + reportedReduceSlotMemorySize
+ + ")";
LOG.info(message);
if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
|| totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
- || virtualMemoryReservedOnTT != reportedVirtualMemoryReservedOnTT
- || physicalMemoryReservedOnTT != reportedPhysicalMemoryReservedOnTT) {
+ || mapSlotMemorySize != reportedMapSlotMemorySize
+ || reduceSlotMemorySize != reportedReduceSlotMemorySize) {
hasPassed = false;
}
return super.assignTasks(status);
@@ -132,7 +129,7 @@
TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
setUpCluster(conf);
- runSleepJob();
+ runSleepJob(miniMRCluster.createJobConf());
verifyTestResults();
} finally {
tearDownCluster();
@@ -149,8 +146,9 @@
JobConf conf = new JobConf();
conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L);
conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L);
- conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
- conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
+ conf.setLong("mapSlotMemorySize", 1 * 512L);
+ conf.setLong("reduceSlotMemorySize", 1 * 1024L);
+
conf.setClass(
TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
@@ -158,15 +156,17 @@
4 * 1024 * 1024 * 1024L);
conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
2 * 1024 * 1024 * 1024L);
+ conf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+ 512L);
conf.setLong(
- TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
- 1 * 1024 * 1024 * 1024L);
- conf.setLong(
- TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
- 512 * 1024 * 1024L);
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1024L);
+
try {
setUpCluster(conf);
- runSleepJob();
+ JobConf jobConf = miniMRCluster.createJobConf();
+ jobConf.setMemoryForMapTask(1 * 1024L);
+ jobConf.setMemoryForReduceTask(2 * 1024L);
+ runSleepJob(jobConf);
verifyTestResults();
} finally {
tearDownCluster();
@@ -189,17 +189,10 @@
LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize());
conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize());
- conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
- conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
- conf.setLong(
- TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
- 1 * 1024 * 1024 * 1024L);
- conf.setLong(
- TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
- 512 * 1024 * 1024L);
+
try {
setUpCluster(conf);
- runSleepJob();
+ runSleepJob(miniMRCluster.createJobConf());
verifyTestResults();
} finally {
tearDownCluster();
@@ -208,22 +201,15 @@
private void setUpCluster(JobConf conf)
throws Exception {
- conf.setClass("mapred.jobtracker.taskScheduler",
- TestTTMemoryReporting.FakeTaskScheduler.class,
- TaskScheduler.class);
- miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
- FileSystem fileSys = miniDFSCluster.getFileSystem();
- String namenode = fileSys.getUri().toString();
- miniMRCluster = new MiniMRCluster(1, namenode, 3,
- null, null, conf);
+ conf.setClass("mapred.jobtracker.taskScheduler",
+ TestTTMemoryReporting.FakeTaskScheduler.class, TaskScheduler.class);
+ conf.set("mapred.job.tracker.handler.count", "1");
+ miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf);
}
- private void runSleepJob() throws Exception {
- Configuration conf = new Configuration();
- conf.set("mapred.job.tracker", "localhost:"
- + miniMRCluster.getJobTrackerPort());
+ private void runSleepJob(JobConf conf) throws Exception {
String[] args = { "-m", "1", "-r", "1",
- "-mt", "1000", "-rt", "1000" };
+ "-mt", "10", "-rt", "10" };
ToolRunner.run(conf, new SleepJob(), args);
}
@@ -235,7 +221,8 @@
}
private void tearDownCluster() {
- if (miniMRCluster != null) { miniMRCluster.shutdown(); }
- if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
+ if (miniMRCluster != null) {
+ miniMRCluster.shutdown();
+ }
}
-}
\ No newline at end of file
+}
Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue May 26 13:30:37 2009
@@ -19,20 +19,19 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.MemoryCalculatorPlugin;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.fs.FileSystem;
import junit.framework.TestCase;
@@ -43,18 +42,19 @@
private static final Log LOG =
LogFactory.getLog(TestTaskTrackerMemoryManager.class);
- private MiniDFSCluster miniDFSCluster;
private MiniMRCluster miniMRCluster;
private String taskOverLimitPatternString =
"TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. "
+ "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
- private void startCluster(JobConf conf) throws Exception {
- miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
- FileSystem fileSys = miniDFSCluster.getFileSystem();
- String namenode = fileSys.getUri().toString();
- miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, conf);
+ private void startCluster(JobConf conf)
+ throws Exception {
+ conf.set("mapred.job.tracker.handler.count", "1");
+ conf.set("mapred.tasktracker.map.tasks.maximum", "1");
+ conf.set("mapred.tasktracker.reduce.tasks.maximum", "1");
+ conf.set("mapred.tasktracker.tasks.sleeptime-before-sigkill", "0");
+ miniMRCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf);
}
@Override
@@ -62,9 +62,6 @@
if (miniMRCluster != null) {
miniMRCluster.shutdown();
}
- if (miniDFSCluster != null) {
- miniDFSCluster.shutdown();
- }
}
private int runSleepJob(JobConf conf) throws Exception {
@@ -74,15 +71,6 @@
private void runAndCheckSuccessfulJob(JobConf conf)
throws IOException {
- // Set up job.
- JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
- conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
- + jt.getTrackerPort());
- NameNode nn = miniDFSCluster.getNameNode();
- conf.set("fs.default.name", "hdfs://"
- + nn.getNameNodeAddress().getHostName() + ":"
- + nn.getNameNodeAddress().getPort());
-
Pattern taskOverLimitPattern =
Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*"));
Matcher mat = null;
@@ -148,43 +136,12 @@
return;
}
- JobConf conf = new JobConf();
// Task-memory management disabled by default.
- startCluster(conf);
- long PER_TASK_LIMIT = 100L; // Doesn't matter how low.
- conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
- runAndCheckSuccessfulJob(conf);
- }
-
- /**
- * Test for verifying that tasks with no limits, with the cumulative usage
- * still under TT's limits, succeed.
- *
- * @throws Exception
- */
- public void testTasksWithNoLimits()
- throws Exception {
- // Run the test only if memory management is enabled
- if (!isProcfsBasedTreeAvailable()) {
- return;
- }
-
- // Fairly large value for sleepJob to succeed
- long ttLimit = 4 * 1024 * 1024 * 1024L;
- // Start cluster with proper configuration.
- JobConf fConf = new JobConf();
-
- fConf.setClass(
- TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
- DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
- fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
- ttLimit);
- fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, ttLimit);
- fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, ttLimit);
- fConf.setLong(
- TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
- startCluster(fConf);
- JobConf conf = new JobConf();
+ startCluster(new JobConf());
+ long PER_TASK_LIMIT = 1L; // Doesn't matter how low.
+ JobConf conf = miniMRCluster.createJobConf();
+ conf.setMemoryForMapTask(PER_TASK_LIMIT);
+ conf.setMemoryForReduceTask(PER_TASK_LIMIT);
runAndCheckSuccessfulJob(conf);
}
@@ -202,33 +159,25 @@
}
// Large so that sleepjob goes through and fits total TT usage
- long PER_TASK_LIMIT = 2 * 1024 * 1024 * 1024L;
- long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L;
+ long PER_TASK_LIMIT = 2 * 1024L;
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
-
- fConf.setClass(
- TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
- DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
- fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
- TASK_TRACKER_LIMIT);
- fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- TASK_TRACKER_LIMIT);
- fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- TASK_TRACKER_LIMIT);
+ fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024L);
fConf.setLong(
- TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
- startCluster(fConf);
- JobConf conf = new JobConf();
- conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+ 2 * 1024L);
+ startCluster(new JobConf());
+
+ JobConf conf = new JobConf(miniMRCluster.createJobConf());
+ conf.setMemoryForMapTask(PER_TASK_LIMIT);
+ conf.setMemoryForReduceTask(PER_TASK_LIMIT);
runAndCheckSuccessfulJob(conf);
-
}
/**
- * Test for verifying that tasks that go beyond limits, though the cumulative
- * usage is under TT's limits, get killed.
+ * Test for verifying that tasks that go beyond limits get killed.
*
* @throws Exception
*/
@@ -240,43 +189,32 @@
return;
}
- long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks.
- long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L; // Large so as to fit
- // total usage
+ long PER_TASK_LIMIT = 1L; // Low enough to kill off sleepJob tasks.
+
Pattern taskOverLimitPattern =
Pattern.compile(String.format(taskOverLimitPatternString, String
- .valueOf(PER_TASK_LIMIT)));
+ .valueOf(PER_TASK_LIMIT*1024*1024L)));
Matcher mat = null;
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
- fConf.setClass(
- TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
- DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
- fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
- TASK_TRACKER_LIMIT);
- fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- TASK_TRACKER_LIMIT);
- fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- TASK_TRACKER_LIMIT);
- fConf.setLong(
- TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
// very small value, so that no task escapes to successful completion.
fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
String.valueOf(300));
+ fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024);
+ fConf.setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+ 2 * 1024);
startCluster(fConf);
// Set up job.
- JobConf conf = new JobConf();
- conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
- JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
- conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
- + jt.getTrackerPort());
- NameNode nn = miniDFSCluster.getNameNode();
- conf.set("fs.default.name", "hdfs://"
- + nn.getNameNodeAddress().getHostName() + ":"
- + nn.getNameNodeAddress().getPort());
+ JobConf conf = new JobConf(miniMRCluster.createJobConf());
+ conf.setMemoryForMapTask(PER_TASK_LIMIT);
+ conf.setMemoryForReduceTask(PER_TASK_LIMIT);
+ conf.setMaxMapAttempts(1);
+ conf.setMaxReduceAttempts(1);
// Start the job.
int ret = 0;
@@ -334,48 +272,39 @@
}
// Large enough for SleepJob Tasks.
- long PER_TASK_LIMIT = 100000000000L;
- // Very Limited TT. All tasks will be killed.
- long TASK_TRACKER_LIMIT = 100L;
- Pattern taskOverLimitPattern =
- Pattern.compile(String.format(taskOverLimitPatternString, String
- .valueOf(PER_TASK_LIMIT)));
- Pattern trackerOverLimitPattern =
- Pattern
- .compile("Killing one of the least progress tasks - .*, as "
- + "the cumulative memory usage of all the tasks on the TaskTracker"
- + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
- Matcher mat = null;
+ long PER_TASK_LIMIT = 100 * 1024L;
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
- fConf.setClass(
- TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
- DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
- fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
- TASK_TRACKER_LIMIT);
- fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- TASK_TRACKER_LIMIT);
- fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- TASK_TRACKER_LIMIT);
+ fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+ 1L);
fConf.setLong(
- TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1L);
+
+ // Because of the above, the total tt limit is 2mb
+ long TASK_TRACKER_LIMIT = 2 * 1024 * 1024L;
+
// very small value, so that no task escapes to successful completion.
fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
String.valueOf(300));
startCluster(fConf);
+ Pattern taskOverLimitPattern =
+ Pattern.compile(String.format(taskOverLimitPatternString, String
+ .valueOf(PER_TASK_LIMIT)));
+
+ Pattern trackerOverLimitPattern =
+ Pattern
+ .compile("Killing one of the least progress tasks - .*, as "
+ + "the cumulative memory usage of all the tasks on the TaskTracker"
+ + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
+ Matcher mat = null;
+
// Set up job.
- JobConf conf = new JobConf();
- conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
- JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
- conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
- + jt.getTrackerPort());
- NameNode nn = miniDFSCluster.getNameNode();
- conf.set("fs.default.name", "hdfs://"
- + nn.getNameNodeAddress().getHostName() + ":"
- + nn.getNameNodeAddress().getPort());
+ JobConf conf = new JobConf(miniMRCluster.createJobConf());
+ conf.setMemoryForMapTask(PER_TASK_LIMIT);
+ conf.setMemoryForReduceTask(PER_TASK_LIMIT);
JobClient jClient = new JobClient(conf);
SleepJob sleepJob = new SleepJob();
@@ -385,10 +314,12 @@
job.submit();
boolean TTOverFlowMsgPresent = false;
while (true) {
- // Set-up tasks are the first to be launched.
- TaskReport[] setUpReports = jClient.getSetupTaskReports(
- (org.apache.hadoop.mapred.JobID)job.getID());
- for (TaskReport tr : setUpReports) {
+ List<TaskReport> allTaskReports = new ArrayList<TaskReport>();
+ allTaskReports.addAll(Arrays.asList(jClient
+ .getSetupTaskReports((org.apache.hadoop.mapred.JobID) job.getID())));
+ allTaskReports.addAll(Arrays.asList(jClient
+ .getMapTaskReports((org.apache.hadoop.mapred.JobID) job.getID())));
+ for (TaskReport tr : allTaskReports) {
String[] diag = tr.getDiagnostics();
for (String str : diag) {
mat = taskOverLimitPattern.matcher(str);