You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/19 23:48:08 UTC
svn commit: r415438 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
src/webapps/task/
Author: cutting
Date: Mon Jun 19 14:48:07 2006
New Revision: 415438
URL: http://svn.apache.org/viewvc?rev=415438&view=rev
Log:
HADOOP-123. Add MapReduce unit tests that run a jobtracker and tasktracker, greatly increasing code coverage. Contributed by Milind.
Added:
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/build.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=415438&r1=415437&r2=415438&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jun 19 14:48:07 2006
@@ -12,6 +12,10 @@
3. HADOOP-250. Add an HTTP user interface to the namenode, running
on port 50070. (Devaraj Das via cutting)
+ 4. HADOOP-123. Add MapReduce unit tests that run a jobtracker and
+ tasktracker, greatly increasing code coverage.
+ (Milind Bhandarkar via cutting)
+
Release 0.3.2 - 2006-06-09
Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?rev=415438&r1=415437&r2=415438&view=diff
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Mon Jun 19 14:48:07 2006
@@ -28,12 +28,14 @@
<property name="build.examples" value="${build.dir}/examples"/>
<property name="build.libhdfs" value="${build.dir}/libhdfs"/>
<property name="build.docs" value="${build.dir}/docs"/>
+ <property name="build.minimr" value="${build.dir}/minimr"/>
<property name="build.javadoc" value="${build.docs}/api"/>
<property name="build.encoding" value="ISO-8859-1"/>
<property name="test.src.dir" value="${basedir}/src/test"/>
<property name="test.build.dir" value="${build.dir}/test"/>
<property name="test.build.data" value="${test.build.dir}/data"/>
+ <property name="hadoop.log.dir" value="${test.build.dir}/logs"/>
<property name="test.build.classes" value="${test.build.dir}/classes"/>
<property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
<property name="test.include" value="Test*"/>
@@ -65,6 +67,7 @@
<!-- the unit test classpath: uses test.src.dir for configuration -->
<path id="test.classpath">
<pathelement location="${test.build.classes}" />
+ <pathelement location="${build.minimr}" />
<pathelement location="${test.src.dir}"/>
<pathelement location="${build.dir}"/>
<path refid="classpath"/>
@@ -89,6 +92,7 @@
<mkdir dir="${build.webapps}/job/WEB-INF"/>
<mkdir dir="${build.webapps}/dfs/WEB-INF"/>
<mkdir dir="${build.examples}"/>
+ <mkdir dir="${build.minimr}"/>
<mkdir dir="${test.build.dir}"/>
<mkdir dir="${test.build.classes}"/>
@@ -258,10 +262,13 @@
<delete dir="${test.build.data}"/>
<mkdir dir="${test.build.data}"/>
+ <delete dir="${hadoop.log.dir}"/>
+ <mkdir dir="${hadoop.log.dir}"/>
<junit printsummary="yes" haltonfailure="no" fork="yes" dir="${basedir}"
errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="test.build.data" value="${test.build.data}"/>
+ <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
<sysproperty key="test.src.dir" value="${test.src.dir}"/>
<sysproperty key="hadoop.log.dir" value="."/>
<classpath refid="${test.classpath.id}"/>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=415438&r1=415437&r2=415438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jun 19 14:48:07 2006
@@ -58,10 +58,12 @@
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobTracker");
private static JobTracker tracker = null;
+ private static boolean runTracker = true;
public static void startTracker(Configuration conf) throws IOException {
if (tracker != null)
throw new IOException("JobTracker already running.");
- while (true) {
+ runTracker = true;
+ while (runTracker) {
try {
tracker = new JobTracker(conf);
break;
@@ -73,13 +75,21 @@
} catch (InterruptedException e) {
}
}
- tracker.offerService();
+ if (runTracker) { tracker.offerService(); }
}
public static JobTracker getTracker() {
return tracker;
}
+ public static void stopTracker() throws IOException {
+ if (tracker == null)
+ throw new IOException("Trying to stop JobTracker that is not running.");
+ runTracker = false;
+ tracker.close();
+ tracker = null;
+ }
+
/**
* A thread to timeout tasks that have been assigned to task trackers,
* but that haven't reported back yet.
@@ -353,8 +363,11 @@
private TreeMap taskTrackers = new TreeMap();
Vector jobInitQueue = new Vector();
ExpireTrackers expireTrackers = new ExpireTrackers();
+ Thread expireTrackersThread = null;
RetireJobs retireJobs = new RetireJobs();
+ Thread retireJobsThread = null;
JobInitThread initJobs = new JobInitThread();
+ Thread initJobsThread = null;
ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);
@@ -439,9 +452,12 @@
this.startTime = System.currentTimeMillis();
- new Thread(this.expireTrackers).start();
- new Thread(this.retireJobs).start();
- new Thread(this.initJobs).start();
+ this.expireTrackersThread = new Thread(this.expireTrackers);
+ this.expireTrackersThread.start();
+ this.retireJobsThread = new Thread(this.retireJobs);
+ this.retireJobsThread.start();
+ this.initJobsThread = new Thread(this.initJobs);
+ this.initJobsThread.start();
expireLaunchingTaskThread.start();
}
@@ -466,8 +482,66 @@
this.interTrackerServer.join();
} catch (InterruptedException ie) {
}
+ LOG.info("Stopped interTrackerServer");
}
+ void close() throws IOException {
+ if (this.infoServer != null) {
+ LOG.info("Stopping infoServer");
+ try {
+ this.infoServer.stop();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+ if (this.interTrackerServer != null) {
+ LOG.info("Stopping interTrackerServer");
+ this.interTrackerServer.stop();
+ }
+ if (this.expireTrackers != null) {
+ LOG.info("Stopping expireTrackers");
+ this.expireTrackers.stopTracker();
+ try {
+ this.expireTrackersThread.interrupt();
+ this.expireTrackersThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+ if (this.retireJobs != null) {
+ LOG.info("Stopping retirer");
+ this.retireJobs.stopRetirer();
+ try {
+ this.retireJobsThread.interrupt();
+ this.retireJobsThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+ if (this.initJobs != null) {
+ LOG.info("Stopping initer");
+ this.initJobs.stopIniter();
+ try {
+ this.initJobsThread.interrupt();
+ this.initJobsThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+ if (this.expireLaunchingTaskThread != null) {
+ LOG.info("Stopping expireLaunchingTasks");
+ this.expireLaunchingTasks.stop();
+ try {
+ this.expireLaunchingTaskThread.interrupt();
+ this.expireLaunchingTaskThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+ LOG.info("stopped all jobtracker services");
+ return;
+ }
+
///////////////////////////////////////////////////////
// Maintain lookup tables; called by JobInProgress
// and TaskInProgress
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=415438&r1=415437&r2=415438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Jun 19 14:48:07 2006
@@ -60,6 +60,10 @@
Server mapOutputServer = null;
InterTrackerProtocol jobClient;
+ StatusHttpServer server = null;
+
+ boolean shuttingDown = false;
+
TreeMap tasks = null;
TreeMap runningTasks = null;
int mapTotal = 0;
@@ -145,8 +149,22 @@
this.justStarted = true;
this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf);
+
+ this.running = true;
}
+ public synchronized void shutdown() throws IOException {
+ shuttingDown = true;
+ close();
+ if (this.server != null) {
+ try {
+ LOG.info("Shttting down StatusHttpServer");
+ this.server.stop();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+ }
/**
* Close down the TaskTracker and all its components. We must also shutdown
* any running tasks or threads, and cleanup disk space. A new TaskTracker
@@ -191,6 +209,8 @@
mapOutputServer = null;
}
+ this.running = false;
+
// Clear local storage
this.mapOutputFile.cleanupStorage();
}
@@ -206,7 +226,7 @@
this.mapOutputFile = new MapOutputFile();
this.mapOutputFile.setConf(conf);
int httpPort = conf.getInt("tasktracker.http.port", 50060);
- StatusHttpServer server = new StatusHttpServer("task", httpPort, true);
+ this.server = new StatusHttpServer("task", httpPort, true);
int workerThreads = conf.getInt("tasktracker.http.threads", 40);
server.setThreads(1, workerThreads);
server.start();
@@ -236,7 +256,7 @@
long lastHeartbeat = 0;
this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
- while (running) {
+ while (running && !shuttingDown) {
long now = System.currentTimeMillis();
long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
@@ -407,26 +427,30 @@
*/
public void run() {
try {
- while (running) {
+ while (running && !shuttingDown) {
boolean staleState = false;
try {
// This while-loop attempts reconnects if we get network errors
- while (running && ! staleState) {
+ while (running && ! staleState && !shuttingDown ) {
try {
if (offerService() == STALE_STATE) {
staleState = true;
}
} catch (Exception ex) {
- LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "]. Retrying...", ex);
- try {
- Thread.sleep(5000);
- } catch (InterruptedException ie) {
+ if (!shuttingDown) {
+ LOG.info("Lost connection to JobTracker [" +
+ jobTrackAddr + "]. Retrying...", ex);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ }
}
}
}
} finally {
close();
}
+ if (shuttingDown) { return; }
LOG.info("Reinitializing local state");
initialize();
}
@@ -529,18 +553,20 @@
localJobConf = new JobConf(localJobFile);
localJobConf.set("mapred.task.id", task.getTaskId());
+ localJobConf.set("mapred.local.dir",
+ this.defaultJobConf.get("mapred.local.dir"));
String jarFile = localJobConf.getJar();
if (jarFile != null) {
fs.copyToLocalFile(new Path(jarFile), localJarFile);
localJobConf.setJar(localJarFile.toString());
+ }
- FileSystem localFs = FileSystem.getNamed("local", fConf);
- OutputStream out = localFs.create(localJobFile);
- try {
- localJobConf.write(out);
- } finally {
- out.close();
- }
+ FileSystem localFs = FileSystem.getNamed("local", fConf);
+ OutputStream out = localFs.create(localJobFile);
+ try {
+ localJobConf.write(out);
+ } finally {
+ out.close();
}
// set the task's configuration to the local job conf
// rather than the default.
@@ -836,7 +862,7 @@
Task task = umbilical.getTask(taskid);
JobConf job = new JobConf(task.getJobFile());
-
+
defaultConf.addFinalResource(new Path(task.getJobFile()));
startPinging(umbilical, taskid); // start pinging parent
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=415438&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Mon Jun 19 14:48:07 2006
@@ -0,0 +1,210 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This class creates a single-process Map-Reduce cluster for junit testing.
+ * One thread is created for each server.
+ * @author Milind Bhandarkar
+ */
+public class MiniMRCluster {
+
+ private Thread jobTrackerThread;
+ private JobTrackerRunner jobTracker;
+ private TaskTrackerRunner taskTracker;
+
+ private int jobTrackerPort = 0;
+ private int taskTrackerPort = 0;
+
+ private int numTaskTrackers;
+
+ private ArrayList taskTrackerList = new ArrayList();
+ private ArrayList taskTrackerThreadList = new ArrayList();
+
+ private String namenode;
+
+ /**
+ * An inner class that runs a job tracker.
+ */
+ class JobTrackerRunner implements Runnable {
+ /**
+ * Create the job tracker and run it.
+ */
+ public void run() {
+ try {
+ JobConf jc = new JobConf();
+ jc.set("fs.name.node", namenode);
+ jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
+ // this timeout seems to control the minimum time for the test, so
+ // set it down at 2 seconds.
+ jc.setInt("ipc.client.timeout", 1000);
+ jc.set("mapred.local.dir","build/test/mapred/local");
+ JobTracker.startTracker(jc);
+ } catch (Throwable e) {
+ System.err.println("Job tracker crashed:");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Shutdown the job tracker and wait for it to finish.
+ */
+ public void shutdown() {
+ try {
+ JobTracker.stopTracker();
+ } catch (Throwable e) {
+ System.err.println("Unable to shut down job tracker:");
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * An inner class to run the task tracker.
+ */
+ class TaskTrackerRunner implements Runnable {
+ TaskTracker tt;
+
+ /**
+ * Create and run the task tracker.
+ */
+ public void run() {
+ try {
+ JobConf jc = new JobConf();
+ jc.set("fs.name.node", namenode);
+ jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
+ // this timeout seems to control the minimum time for the test, so
+ // set it down at 2 seconds.
+ jc.setInt("ipc.client.timeout", 1000);
+ jc.setInt("mapred.task.tracker.info.port", taskTrackerPort++);
+ jc.setInt("mapred.task.tracker.output.port", taskTrackerPort++);
+ jc.setInt("mapred.task.tracker.report.port", taskTrackerPort++);
+ File localDir = new File(jc.get("mapred.local.dir"));
+ File ttDir = new File(localDir, Integer.toString(taskTrackerPort));
+ ttDir.mkdirs();
+ jc.set("mapred.local.dir", ttDir.getAbsolutePath());
+ tt = new TaskTracker(jc);
+ tt.run();
+ } catch (Throwable e) {
+ tt = null;
+ System.err.println("Task tracker crashed:");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Shut down the server and wait for it to finish.
+ */
+ public void shutdown() {
+ if (tt != null) {
+ try {
+ tt.shutdown();
+ } catch (Throwable e) {
+ System.err.println("Unable to shut down task tracker:");
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ /**
+ * Create the config and start up the servers.
+ */
+ public MiniMRCluster(int jobTrackerPort,
+ int taskTrackerPort,
+ int numTaskTrackers,
+ String namenode) throws IOException {
+ this.jobTrackerPort = jobTrackerPort;
+ this.taskTrackerPort = taskTrackerPort;
+ this.numTaskTrackers = numTaskTrackers;
+ this.namenode = namenode;
+
+ File configDir = new File("build", "minimr");
+ configDir.mkdirs();
+ File siteFile = new File(configDir, "hadoop-site.xml");
+ PrintWriter pw = new PrintWriter(siteFile);
+ pw.print("<?xml version=\"1.0\"?>\n"+
+ "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"+
+ "<configuration>\n"+
+ " <property>\n"+
+ " <name>mapred.system.dir</name>\n"+
+ " <value>build/test/mapred/system</value>\n"+
+ " </property>\n"+
+ "</configuration>\n");
+ pw.close();
+ jobTracker = new JobTrackerRunner();
+ jobTrackerThread = new Thread(jobTracker);
+ jobTrackerThread.start();
+ try { // let jobTracker get started
+ Thread.sleep(2000);
+ } catch(InterruptedException e) {
+ }
+ for (int idx = 0; idx < numTaskTrackers; idx++) {
+ TaskTrackerRunner taskTracker = new TaskTrackerRunner();
+ Thread taskTrackerThread = new Thread(taskTracker);
+ taskTrackerThread.start();
+ taskTrackerList.add(taskTracker);
+ taskTrackerThreadList.add(taskTrackerThread);
+ }
+ try { // let taskTrackers get started
+ Thread.sleep(2000);
+ } catch(InterruptedException e) {
+ }
+ }
+
+ /**
+ * Shut down the servers.
+ */
+ public void shutdown() {
+ try {
+ for (int idx = 0; idx < numTaskTrackers; idx++) {
+ TaskTrackerRunner taskTracker = (TaskTrackerRunner) taskTrackerList.get(idx);
+ Thread taskTrackerThread = (Thread) taskTrackerThreadList.get(idx);
+ taskTracker.shutdown();
+ taskTrackerThread.interrupt();
+ try {
+ taskTrackerThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+ jobTracker.shutdown();
+ jobTrackerThread.interrupt();
+ try {
+ jobTrackerThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ } finally {
+ File configDir = new File("build", "minimr");
+ File siteFile = new File(configDir, "hadoop-site.xml");
+ siteFile.delete();
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ System.out.println("Bringing up Jobtracker and tasktrackers.");
+ MiniMRCluster mr = new MiniMRCluster(50000, 50002, 4, "local");
+ System.out.println("JobTracker and TaskTrackers are up.");
+ mr.shutdown();
+ System.out.println("JobTracker and TaskTrackers brought down.");
+ }
+}
+
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java?rev=415438&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java Mon Jun 19 14:48:07 2006
@@ -0,0 +1,212 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A Map-reduce program to estimaate the valu eof Pi using monte-carlo
+ * method.
+ *
+ * @author Milind Bhandarkar
+ */
+public class PiEstimator {
+
+ /**
+ * Mappper class for Pi estimation.
+ */
+
+ public static class PiMapper extends MapReduceBase implements Mapper {
+
+ /** Mapper configuration.
+ *
+ */
+ public void configure(JobConf job) {
+ }
+
+ static Random r = new Random();
+
+ /** Map method.
+ * @param key
+ * @param value not-used.
+ * @param out
+ * @param reporter
+ */
+ public void map(WritableComparable key,
+ Writable val,
+ OutputCollector out,
+ Reporter reporter) throws IOException {
+ int nSamples = ((IntWritable) key).get();
+ for(int idx = 0; idx < nSamples; idx++) {
+ double x = r.nextDouble();
+ double y = r.nextDouble();
+ double d = (x-0.5)*(x-0.5)+(y-0.5)*(y-0.5);
+ if (d > 0.25) {
+ out.collect(new IntWritable(0), new IntWritable(1));
+ } else {
+ out.collect(new IntWritable(1), new IntWritable(1));
+ }
+ if (idx%100 == 1) {
+ reporter.setStatus("Generated "+idx+" samples.");
+ }
+ }
+ }
+
+ public void close() {
+ // nothing
+ }
+ }
+
+ public static class PiReducer extends MapReduceBase implements Reducer {
+ int numInside = 0;
+ int numOutside = 0;
+ JobConf conf;
+
+ /** Reducer configuration.
+ *
+ */
+ public void configure(JobConf job) {
+ conf = job;
+ }
+ /** Reduce method.
+ * @ param key
+ * @param values
+ * @param output
+ * @param reporter
+ */
+ public void reduce(WritableComparable key,
+ Iterator values,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ if (((IntWritable)key).get() == 1) {
+ while (values.hasNext()) {
+ int num = ((IntWritable)values.next()).get();
+ numInside += num;
+ }
+ } else {
+ while (values.hasNext()) {
+ int num = ((IntWritable)values.next()).get();
+ numOutside += num;
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ Path tmpDir = new Path("test-mini-mr");
+ Path outDir = new Path(tmpDir, "out");
+ Path outFile = new Path(outDir, "reduce-out");
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, outFile,
+ IntWritable.class, IntWritable.class);
+ writer.append(new IntWritable(numInside), new IntWritable(numOutside));
+ writer.close();
+ }
+ }
+
+ /**
+ * This is the main driver for computing the value of Pi using
+ * monte-carlo method.
+ */
+ static double launch(int numMaps, int numPoints, String jt, String dfs)
+ throws IOException {
+
+ Configuration conf = new Configuration();
+ JobConf jobConf = new JobConf(conf, PiEstimator.class);
+ if (jt != null) { jobConf.set("mapred.job.tracker", jt); }
+ if (dfs != null) { jobConf.set("fs.default.name", dfs); }
+ jobConf.setJobName("test-mini-mr");
+
+ // turn off speculative execution, because DFS doesn't handle
+ // multiple writers to the same file.
+ jobConf.setSpeculativeExecution(false);
+ jobConf.setInputKeyClass(IntWritable.class);
+ jobConf.setInputValueClass(IntWritable.class);
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+
+ jobConf.setOutputKeyClass(IntWritable.class);
+ jobConf.setOutputValueClass(IntWritable.class);
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ jobConf.setMapperClass(PiMapper.class);
+ jobConf.setReducerClass(PiReducer.class);
+
+ jobConf.setNumReduceTasks(1);
+
+ Path tmpDir = new Path("test-mini-mr");
+ Path inDir = new Path(tmpDir, "in");
+ Path outDir = new Path(tmpDir, "out");
+ FileSystem fileSys = FileSystem.get(jobConf);
+ fileSys.delete(tmpDir);
+ fileSys.mkdirs(inDir);
+
+ jobConf.setInputPath(inDir);
+ jobConf.setOutputPath(outDir);
+
+ jobConf.setNumMapTasks(numMaps);
+
+ for(int idx=0; idx < numMaps; ++idx) {
+ Path file = new Path(inDir, "part"+idx);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file,
+ IntWritable.class, IntWritable.class);
+ writer.append(new IntWritable(numPoints), new IntWritable(0));
+ writer.close();
+ }
+
+ double estimate = 0.0;
+
+ try {
+ JobClient.runJob(jobConf);
+ Path inFile = new Path(outDir, "reduce-out");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
+ jobConf);
+ IntWritable numInside = new IntWritable();
+ IntWritable numOutside = new IntWritable();
+ reader.next(numInside, numOutside);
+ reader.close();
+ estimate = (double) (numInside.get()*4.0)/(numMaps*numPoints);
+ } finally {
+ fileSys.delete(tmpDir);
+ }
+
+ return estimate;
+ }
+
+ /**
+ * Launches all the tasks in order.
+ */
+ public static void main(String[] argv) throws Exception {
+ if (argv.length < 2) {
+ System.err.println("Usage: TestMiniMR <nMaps> <nSamples>");
+ return;
+ }
+
+ int nMaps = Integer.parseInt(argv[0]);
+ int nSamples = Integer.parseInt(argv[1]);
+
+ System.out.println("Estimated value of PI is "+
+ launch(nMaps, nSamples, null, null));
+ }
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java?rev=415438&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java Mon Jun 19 14:48:07 2006
@@ -0,0 +1,38 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import junit.framework.TestCase;
+
+/**
+ * A Unit-test to test bringup and shutdown of Mini Map-Reduce Cluster.
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestMiniMRBringup extends TestCase {
+
+ public void testBringUp() throws IOException {
+ MiniMRCluster mr = null;
+ try {
+ mr = new MiniMRCluster(50000, 50010, 1, "local");
+ } finally {
+ if (mr != null) { mr.shutdown(); }
+ }
+ }
+
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=415438&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Mon Jun 19 14:48:07 2006
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import junit.framework.TestCase;
+
+/**
+ * A JUnit test to test min map-reduce cluster with local file system.
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestMiniMRLocalFS extends TestCase {
+
+ static final int NUM_MAPS = 10;
+ static final int NUM_SAMPLES = 100000;
+
+ public void testWithLocal() throws IOException {
+ MiniMRCluster mr = null;
+ try {
+ mr = new MiniMRCluster(60030, 60040, 2, "local");
+ double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
+ double error = Math.abs(Math.PI - estimate);
+ assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+ } finally {
+ if (mr != null) { mr.shutdown(); }
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=415438&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Jun 19 14:48:07 2006
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestMiniMRWithDFS extends TestCase {
+
+ static final int NUM_MAPS = 10;
+ static final int NUM_SAMPLES = 100000;
+
+ public void testWithDFS() throws IOException {
+ String namenode = null;
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fileSys = null;
+ try {
+ Configuration conf = new Configuration();
+ dfs = new MiniDFSCluster(65314, conf);
+ fileSys = dfs.getFileSystem();
+ namenode = fileSys.getName();
+ mr = new MiniMRCluster(50050, 50060, 4, namenode);
+ double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:50050", namenode);
+ double error = Math.abs(Math.PI - estimate);
+ assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+ } finally {
+ if (fileSys != null) { fileSys.close(); }
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown();
+ }
+ }
+ }
+
+}
Modified: lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp?rev=415438&r1=415437&r2=415438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp Mon Jun 19 14:48:07 2006
@@ -6,10 +6,10 @@
import="javax.servlet.http.*"
import="java.io.*"
import="java.util.*"
- import="java.util.logging.Logger"
import="org.apache.hadoop.fs.*"
import="org.apache.hadoop.mapred.*"
import="org.apache.hadoop.util.*"
+ import="org.apache.commons.logging.*"
%><%
String mapId = request.getParameter("map");
String reduceId = request.getParameter("reduce");
@@ -35,8 +35,8 @@
} catch (IOException ie) {
TaskTracker tracker =
(TaskTracker) application.getAttribute("task.tracker");
- Logger log = (Logger) application.getAttribute("log");
- log.warning("Http server (getMapOutput.jsp): " +
+ Log log = (Log) application.getAttribute("log");
+ log.warn("Http server (getMapOutput.jsp): " +
StringUtils.stringifyException(ie));
tracker.mapOutputLost(mapId);
throw ie;