You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/19 05:56:35 UTC
svn commit: r465465 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
src/test/testjar/
Author: cutting
Date: Wed Oct 18 20:56:34 2006
New Revision: 465465
URL: http://svn.apache.org/viewvc?view=rev&rev=465465
Log:
HADOOP-607 & HADOOP-609. Fix a critical bug where job jars were not found by tasks. Also add unit tests to check for this and other conditions. Contributed by Mahadev.
Added:
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
lucene/hadoop/trunk/src/test/testjar/
lucene/hadoop/trunk/src/test/testjar/ClassWordCount.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/build.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Oct 18 20:56:34 2006
@@ -28,6 +28,16 @@
than ArrayOutOfBoundsException. (Dhruba Borthakur via cutting)
+Release 0.7.2 - 2006-10-18
+
+ 1. HADOOP-607. Fix a bug where classes included in job jars were not
+ found by tasks. (Mahadev Konar via cutting)
+
+ 2. HADOOP-609. Add a unit test that checks that classes in job jars
+ can be found by tasks. Also modify unit tests to specify multiple
+ local directories. (Mahadev Konar via cutting)
+
+
Release 0.7.1 - 2006-10-11
1. HADOOP-593. Fix a NullPointerException in the JobTracker.
Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Wed Oct 18 20:56:34 2006
@@ -39,6 +39,7 @@
<property name="test.cache.data" value="${test.build.dir}/cache"/>
<property name="hadoop.log.dir" value="${test.build.dir}/logs"/>
<property name="test.build.classes" value="${test.build.dir}/classes"/>
+ <property name="test.build.testjar" value="${test.build.dir}/testjar"/>
<property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
<property name="test.include" value="Test*"/>
<property name="test.classpath.id" value="test.classpath"/>
@@ -104,7 +105,7 @@
<mkdir dir="${test.build.dir}"/>
<mkdir dir="${test.build.classes}"/>
-
+ <mkdir dir="${test.build.testjar}"/>
<touch datetime="01/25/1971 2:00 pm">
<fileset dir="${conf.dir}" includes="**/*.template"/>
</touch>
@@ -282,7 +283,24 @@
source="${javac.version}"
deprecation="${javac.deprecation}">
<classpath refid="test.classpath"/>
- </javac>
+ </javac>
+ <javac
+ encoding="${build.encoding}"
+ srcdir="${test.src.dir}/testjar"
+ includes="*.java"
+ destdir="${test.build.testjar}"
+ debug="${javac.debug}"
+ optimize="${javac.optimize}"
+ target="${javac.version}"
+ source="${javac.version}"
+ deprecation="${javac.deprecation}">
+ <classpath refid="test.classpath"/>
+ </javac>
+ <delete file="${test.build.testjar}/testjob.jar"/>
+ <jar jarfile="${test.build.testjar}/testjob.jar"
+ basedir="${test.build.testjar}">
+ </jar>
+
<jar jarfile="${build.dir}/${final.name}-test.jar"
basedir="${test.build.classes}">
<manifest>
@@ -294,7 +312,7 @@
<mkdir dir="${test.cache.data}"/>
<copy file="${test.src.dir}/org/apache/hadoop/mapred/test.txt" todir="${test.cache.data}"/>
<copy file="${test.src.dir}/org/apache/hadoop/mapred/test.jar" todir="${test.cache.data}"/>
- <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.zip" todir="${test.cache.data}"/>
+ <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.zip" todir="${test.cache.data}"/>
</target>
<!-- ================================================================== -->
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Oct 18 20:56:34 2006
@@ -817,9 +817,8 @@
private void localizeTask(Task task) throws IOException{
Path localTaskDir =
- new Path(this.defaultJobConf.getLocalPath(SUBDIR+ Path.SEPARATOR
- + JOBCACHE + Path.SEPARATOR
- + task.getJobId()), task.getTaskId());
+ new Path(this.defaultJobConf.getLocalPath(TaskTracker.getJobCacheSubdir()),
+ (task.getJobId() + Path.SEPARATOR + task.getTaskId()));
FileSystem localFs = FileSystem.getNamed("local", fConf);
localFs.mkdirs(localTaskDir);
Path localTaskFile = new Path(localTaskDir, "job.xml");
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Wed Oct 18 20:56:34 2006
@@ -79,9 +79,16 @@
*/
class TaskTrackerRunner implements Runnable {
TaskTracker tt;
- String localDir;
+ // the localDirs for this taskTracker
+ String[] localDir;
boolean isInitialized = false;
boolean isDead = false;
+ int numDir;
+ TaskTrackerRunner(int numDir) {
+ this.numDir = numDir;
+ // a maximum of 10 local dirs can be specified in MinMRCluster
+ localDir = new String[10];
+ }
/**
* Create and run the task tracker.
@@ -97,10 +104,19 @@
jc.setInt("mapred.task.tracker.info.port", taskTrackerPort++);
jc.setInt("mapred.task.tracker.report.port", taskTrackerPort++);
File localDir = new File(jc.get("mapred.local.dir"));
- File ttDir = new File(localDir, Integer.toString(taskTrackerPort));
+ String mapredDir = "";
+ File ttDir = new File(localDir, Integer.toString(taskTrackerPort) + "_" + 0);
ttDir.mkdirs();
- this.localDir = ttDir.getAbsolutePath();
- jc.set("mapred.local.dir", ttDir.getAbsolutePath());
+ this.localDir[0] = ttDir.getAbsolutePath();
+ mapredDir = ttDir.getAbsolutePath();
+ for (int i = 1; i < numDir; i++){
+ ttDir = new File(localDir, Integer.toString(taskTrackerPort) + "_" + i);
+ ttDir.mkdirs();
+ this.localDir[i] = ttDir.getAbsolutePath();
+ mapredDir = mapredDir + "," + ttDir.getAbsolutePath();
+ }
+ jc.set("mapred.local.dir", mapredDir);
+ System.out.println("mapred.local.dir is " + mapredDir);
tt = new TaskTracker(jc);
isInitialized = true;
tt.run();
@@ -114,12 +130,17 @@
/**
* Get the local dir for this TaskTracker.
+ * This is there so that we do not break
+ * previous tests.
* @return the absolute pathname
*/
public String getLocalDir() {
- return localDir;
+ return localDir[0];
}
-
+
+ public String[] getLocalDirs(){
+ return localDir;
+ }
/**
* Shut down the server and wait for it to finish.
*/
@@ -176,10 +197,18 @@
* Create the config and start up the servers.
*/
public MiniMRCluster(int jobTrackerPort,
+ int taskTrackerPort,
+ int numTaskTrackers,
+ String namenode,
+ boolean taskTrackerFirst) throws IOException {
+ this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, taskTrackerFirst, 1);
+ }
+
+ public MiniMRCluster(int jobTrackerPort,
int taskTrackerPort,
int numTaskTrackers,
String namenode,
- boolean taskTrackerFirst) throws IOException {
+ boolean taskTrackerFirst, int numDir) throws IOException {
this.jobTrackerPort = jobTrackerPort;
this.taskTrackerPort = taskTrackerPort;
this.numTaskTrackers = numTaskTrackers;
@@ -204,7 +233,7 @@
jobTrackerThread.start();
}
for (int idx = 0; idx < numTaskTrackers; idx++) {
- TaskTrackerRunner taskTracker = new TaskTrackerRunner();
+ TaskTrackerRunner taskTracker = new TaskTrackerRunner(numDir);
Thread taskTrackerThread = new Thread(taskTracker);
taskTrackerThread.start();
taskTrackerList.add(taskTracker);
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java Wed Oct 18 20:56:34 2006
@@ -106,7 +106,7 @@
fileSys = dfs.getFileSystem();
namenode = fileSys.getName();
mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers,
- namenode, true);
+ namenode, true, 2);
JobConf jobConf = new JobConf();
boolean result;
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?view=auto&rev=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Wed Oct 18 20:56:34 2006
@@ -0,0 +1,122 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * A JUnit test to test Mini Map-Reduce Cluster with multiple directories
+ * and check for correct classpath
+ */
+public class TestMiniMRClasspath extends TestCase {
+
+
+ static String launchWordCount(String fileSys,
+ String jobTracker,
+ JobConf conf,
+ String input,
+ int numMaps,
+ int numReduces) throws IOException {
+ final Path inDir = new Path("/testing/wc/input");
+ final Path outDir = new Path("/testing/wc/output");
+ FileSystem fs = FileSystem.getNamed(fileSys, conf);
+ fs.delete(outDir);
+ fs.mkdirs(inDir);
+ {
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes(input);
+ file.close();
+ }
+ conf.set("fs.default.name", fileSys);
+ conf.set("mapred.job.tracker", jobTracker);
+ conf.setJobName("wordcount");
+ conf.setInputFormat(TextInputFormat.class);
+
+ // the keys are words (strings)
+ conf.setOutputKeyClass(Text.class);
+ // the values are counts (ints)
+ conf.setOutputValueClass(IntWritable.class);
+
+ conf.set("mapred.mapper.class", "ClassWordCount$MapClass");
+ conf.set("mapred.combine.class", "ClassWordCount$Reduce");
+ conf.set("mapred.reducer.class", "ClassWordCount$Reduce");
+ conf.setInputPath(inDir);
+ conf.setOutputPath(outDir);
+ conf.setNumMapTasks(numMaps);
+ conf.setNumReduceTasks(numReduces);
+ //pass a job.jar already included in the hadoop build
+ conf.setJar("build/test/testjar/testjob.jar");
+ JobClient.runJob(conf);
+ StringBuffer result = new StringBuffer();
+ {
+ Path[] fileList = fs.listPaths(outDir);
+ for(int i=0; i < fileList.length; ++i) {
+ BufferedReader file =
+ new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
+ String line = file.readLine();
+ while (line != null) {
+ result.append(line);
+ result.append("\n");
+ line = file.readLine();
+ }
+ file.close();
+ }
+ }
+ return result.toString();
+ }
+
+ public void testClassPath() throws IOException {
+ String namenode = null;
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fileSys = null;
+ try {
+ final int taskTrackers = 4;
+ final int jobTrackerPort = 50050;
+ final String jobTrackerName = "localhost:" + jobTrackerPort;
+ Configuration conf = new Configuration();
+ dfs = new MiniDFSCluster(65314, conf, true);
+ fileSys = dfs.getFileSystem();
+ namenode = fileSys.getName();
+ mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers,
+ namenode, true, 3);
+ JobConf jobConf = new JobConf();
+ String result;
+ result = launchWordCount(namenode, jobTrackerName, jobConf,
+ "The quick brown fox\nhas many silly\n" +
+ "red fox sox\n",
+ 3, 1);
+ assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+ "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
+
+ } finally {
+ if (fileSys != null) { fileSys.close(); }
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown();
+ }
+ }
+ }
+
+}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Wed Oct 18 20:56:34 2006
@@ -42,7 +42,7 @@
dfs = new MiniDFSCluster(65314, conf, true);
fileSys = dfs.getFileSystem();
namenode = fileSys.getName();
- mr = new MiniMRCluster(50050, 50060, 2, namenode, true);
+ mr = new MiniMRCluster(50050, 50060, 2, namenode, true, 4);
// run the wordcount example with caching
boolean ret = MRCaching.launchMRCache("localhost:50050",
"/testing/wc/input",
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=465465&r1=465464&r2=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Wed Oct 18 20:56:34 2006
@@ -32,7 +32,7 @@
public void testWithLocal() throws IOException {
MiniMRCluster mr = null;
try {
- mr = new MiniMRCluster(60030, 60040, 2, "local", false);
+ mr = new MiniMRCluster(60030, 60040, 2, "local", false, 3);
double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
double error = Math.abs(Math.PI - estimate);
assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
Added: lucene/hadoop/trunk/src/test/testjar/ClassWordCount.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/testjar/ClassWordCount.java?view=auto&rev=465465
==============================================================================
--- lucene/hadoop/trunk/src/test/testjar/ClassWordCount.java (added)
+++ lucene/hadoop/trunk/src/test/testjar/ClassWordCount.java Wed Oct 18 20:56:34 2006
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.examples.WordCount;
+
+/**
+ * This is an example Hadoop Map/Reduce application being used for
+ * TestMiniMRClasspath. Uses the WordCount examples in hadoop.
+ * @author Owen O'Malley
+ *
+ */
+public class ClassWordCount {
+
+ /**
+ * Counts the words in each line.
+ * For each line of input, break the line into words and emit them as
+ * (<b>word</b>, <b>1</b>).
+ */
+ public static class MapClass extends WordCount.MapClass implements Mapper {
+ }
+
+ /**
+ * A reducer class that just emits the sum of the input values.
+ */
+ public static class Reduce extends WordCount.Reduce implements Reducer {
+ }
+}