You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2011/11/19 02:24:34 UTC
svn commit: r1203941 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/
hadoop-mapreduce-cl...
Author: tucu
Date: Sat Nov 19 01:24:32 2011
New Revision: 1203941
URL: http://svn.apache.org/viewvc?rev=1203941&view=rev
Log:
HADOOP-7590. Mavenize streaming and MR examples. (tucu)
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/ (with props)
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/
- copied from r1203935, hadoop/common/trunk/hadoop-mapreduce-project/src/examples/org/
Removed:
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/streaming/src/java/
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/streaming/src/test/
hadoop/common/trunk/hadoop-mapreduce-project/src/examples/org/
hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/testjar/ClassWithNoPackage.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/DistSum.java
hadoop/common/trunk/hadoop-mapreduce-project/pom.xml
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1203941&r1=1203940&r2=1203941&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml Sat Nov 19 01:24:32 2011
@@ -82,6 +82,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
</dependencies>
<build>
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=1203941&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Sat Nov 19 01:24:32 2011
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Test case to run a MapReduce job.
+ * <p/>
+ * It runs a 2 node cluster Hadoop with a 2 node DFS.
+ * <p/>
+ * The JobConf to use must be obtained via the creatJobConf() method.
+ * <p/>
+ * It creates a temporary directory -accessible via getTestRootDir()-
+ * for both input and output.
+ * <p/>
+ * The input directory is accesible via getInputDir() and the output
+ * directory via getOutputDir()
+ * <p/>
+ * The DFS filesystem is formated before the testcase starts and after it ends.
+ */
+public abstract class ClusterMapReduceTestCase extends TestCase {
+ private MiniDFSCluster dfsCluster = null;
+ private MiniMRCluster mrCluster = null;
+
+ /**
+ * Creates Hadoop Cluster and DFS before a test case is run.
+ *
+ * @throws Exception
+ */
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ startCluster(true, null);
+ }
+
+ /**
+ * Starts the cluster within a testcase.
+ * <p/>
+ * Note that the cluster is already started when the testcase method
+ * is invoked. This method is useful if as part of the testcase the
+ * cluster has to be shutdown and restarted again.
+ * <p/>
+ * If the cluster is already running this method does nothing.
+ *
+ * @param reformatDFS indicates if DFS has to be reformated
+ * @param props configuration properties to inject to the mini cluster
+ * @throws Exception if the cluster could not be started
+ */
+ protected synchronized void startCluster(boolean reformatDFS, Properties props)
+ throws Exception {
+ if (dfsCluster == null) {
+ JobConf conf = new JobConf();
+ if (props != null) {
+ for (Map.Entry entry : props.entrySet()) {
+ conf.set((String) entry.getKey(), (String) entry.getValue());
+ }
+ }
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .format(reformatDFS).racks(null).build();
+
+ ConfigurableMiniMRCluster.setConfiguration(props);
+ //noinspection deprecation
+ mrCluster = new ConfigurableMiniMRCluster(2,
+ getFileSystem().getUri().toString(), 1, conf);
+ }
+ }
+
+ private static class ConfigurableMiniMRCluster extends MiniMRCluster {
+ private static Properties config;
+
+ public static void setConfiguration(Properties props) {
+ config = props;
+ }
+
+ public ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
+ int numDir, JobConf conf)
+ throws Exception {
+ super(0,0, numTaskTrackers, namenode, numDir, null, null, null, conf);
+ }
+
+ public JobConf createJobConf() {
+ JobConf conf = super.createJobConf();
+ if (config != null) {
+ for (Map.Entry entry : config.entrySet()) {
+ conf.set((String) entry.getKey(), (String) entry.getValue());
+ }
+ }
+ return conf;
+ }
+ }
+
+ /**
+ * Stops the cluster within a testcase.
+ * <p/>
+ * Note that the cluster is already started when the testcase method
+ * is invoked. This method is useful if as part of the testcase the
+ * cluster has to be shutdown.
+ * <p/>
+ * If the cluster is already stopped this method does nothing.
+ *
+ * @throws Exception if the cluster could not be stopped
+ */
+ protected void stopCluster() throws Exception {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ mrCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ /**
+ * Destroys Hadoop Cluster and DFS after a test case is run.
+ *
+ * @throws Exception
+ */
+ protected void tearDown() throws Exception {
+ stopCluster();
+ super.tearDown();
+ }
+
+ /**
+ * Returns a preconfigured Filesystem instance for test cases to read and
+ * write files to it.
+ * <p/>
+ * TestCases should use this Filesystem instance.
+ *
+ * @return the filesystem used by Hadoop.
+ * @throws IOException
+ */
+ protected FileSystem getFileSystem() throws IOException {
+ return dfsCluster.getFileSystem();
+ }
+
+ protected MiniMRCluster getMRCluster() {
+ return mrCluster;
+ }
+
+ /**
+ * Returns the path to the root directory for the testcase.
+ *
+ * @return path to the root directory for the testcase.
+ */
+ protected Path getTestRootDir() {
+ return new Path("x").getParent();
+ }
+
+ /**
+ * Returns a path to the input directory for the testcase.
+ *
+ * @return path to the input directory for the tescase.
+ */
+ protected Path getInputDir() {
+ return new Path("target/input");
+ }
+
+ /**
+ * Returns a path to the output directory for the testcase.
+ *
+ * @return path to the output directory for the tescase.
+ */
+ protected Path getOutputDir() {
+ return new Path("target/output");
+ }
+
+ /**
+ * Returns a job configuration preconfigured to run against the Hadoop
+ * managed by the testcase.
+ *
+ * @return configuration that works on the testcase Hadoop instance
+ */
+ protected JobConf createJobConf() {
+ return mrCluster.createJobConf();
+ }
+
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=1203941&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Sat Nov 19 01:24:32 2011
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.TaskLog.Reader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Utility methods used in various Job Control unit tests.
+ */
+public class MapReduceTestUtil {
+ public static final Log LOG =
+ LogFactory.getLog(MapReduceTestUtil.class.getName());
+
+ static private Random rand = new Random();
+
+ private static NumberFormat idFormat = NumberFormat.getInstance();
+
+ static {
+ idFormat.setMinimumIntegerDigits(4);
+ idFormat.setGroupingUsed(false);
+ }
+
+ /**
+ * Cleans the data from the passed Path in the passed FileSystem.
+ *
+ * @param fs FileSystem to delete data from.
+ * @param dirPath Path to be deleted.
+ * @throws IOException If an error occurs cleaning the data.
+ */
+ public static void cleanData(FileSystem fs, Path dirPath)
+ throws IOException {
+ fs.delete(dirPath, true);
+ }
+
+ /**
+ * Generates a string of random digits.
+ *
+ * @return A random string.
+ */
+ public static String generateRandomWord() {
+ return idFormat.format(rand.nextLong());
+ }
+
+ /**
+ * Generates a line of random text.
+ *
+ * @return A line of random text.
+ */
+ public static String generateRandomLine() {
+ long r = rand.nextLong() % 7;
+ long n = r + 20;
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < n; i++) {
+ sb.append(generateRandomWord()).append(" ");
+ }
+ sb.append("\n");
+ return sb.toString();
+ }
+
+ /**
+ * Generates random data consisting of 10000 lines.
+ *
+ * @param fs FileSystem to create data in.
+ * @param dirPath Path to create the data in.
+ * @throws IOException If an error occurs creating the data.
+ */
+ public static void generateData(FileSystem fs, Path dirPath)
+ throws IOException {
+ FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt"));
+ for (int i = 0; i < 10000; i++) {
+ String line = generateRandomLine();
+ out.write(line.getBytes("UTF-8"));
+ }
+ out.close();
+ }
+
+ /**
+ * Creates a simple copy job.
+ *
+ * @param conf Configuration object
+ * @param outdir Output directory.
+ * @param indirs Comma separated input directories.
+ * @return Job initialized for a data copy job.
+ * @throws Exception If an error occurs creating job configuration.
+ */
+ public static Job createCopyJob(Configuration conf, Path outdir,
+ Path... indirs) throws Exception {
+ conf.setInt(MRJobConfig.NUM_MAPS, 3);
+ Job theJob = Job.getInstance(conf);
+ theJob.setJobName("DataMoveJob");
+
+ FileInputFormat.setInputPaths(theJob, indirs);
+ theJob.setMapperClass(DataCopyMapper.class);
+ FileOutputFormat.setOutputPath(theJob, outdir);
+ theJob.setOutputKeyClass(Text.class);
+ theJob.setOutputValueClass(Text.class);
+ theJob.setReducerClass(DataCopyReducer.class);
+ theJob.setNumReduceTasks(1);
+ return theJob;
+ }
+
+ /**
+ * Creates a simple fail job.
+ *
+ * @param conf Configuration object
+ * @param outdir Output directory.
+ * @param indirs Comma separated input directories.
+ * @return Job initialized for a simple fail job.
+ * @throws Exception If an error occurs creating job configuration.
+ */
+ public static Job createFailJob(Configuration conf, Path outdir,
+ Path... indirs) throws Exception {
+ FileSystem fs = outdir.getFileSystem(conf);
+ if (fs.exists(outdir)) {
+ fs.delete(outdir, true);
+ }
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
+ Job theJob = Job.getInstance(conf);
+ theJob.setJobName("Fail-Job");
+
+ FileInputFormat.setInputPaths(theJob, indirs);
+ theJob.setMapperClass(FailMapper.class);
+ theJob.setReducerClass(Reducer.class);
+ theJob.setNumReduceTasks(0);
+ FileOutputFormat.setOutputPath(theJob, outdir);
+ theJob.setOutputKeyClass(Text.class);
+ theJob.setOutputValueClass(Text.class);
+ return theJob;
+ }
+
+ /**
+ * Creates a simple fail job.
+ *
+ * @param conf Configuration object
+ * @param outdir Output directory.
+ * @param indirs Comma separated input directories.
+ * @return Job initialized for a simple kill job.
+ * @throws Exception If an error occurs creating job configuration.
+ */
+ public static Job createKillJob(Configuration conf, Path outdir,
+ Path... indirs) throws Exception {
+
+ Job theJob = Job.getInstance(conf);
+ theJob.setJobName("Kill-Job");
+
+ FileInputFormat.setInputPaths(theJob, indirs);
+ theJob.setMapperClass(KillMapper.class);
+ theJob.setReducerClass(Reducer.class);
+ theJob.setNumReduceTasks(0);
+ FileOutputFormat.setOutputPath(theJob, outdir);
+ theJob.setOutputKeyClass(Text.class);
+ theJob.setOutputValueClass(Text.class);
+ return theJob;
+ }
+
+ /**
+ * Simple Mapper and Reducer implementation which copies data it reads in.
+ */
+ public static class DataCopyMapper extends
+ Mapper<LongWritable, Text, Text, Text> {
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ context.write(new Text(key.toString()), value);
+ }
+ }
+
+ public static class DataCopyReducer extends Reducer<Text, Text, Text, Text> {
+ public void reduce(Text key, Iterator<Text> values, Context context)
+ throws IOException, InterruptedException {
+ Text dumbKey = new Text("");
+ while (values.hasNext()) {
+ Text data = values.next();
+ context.write(dumbKey, data);
+ }
+ }
+ }
+
+ // Mapper that fails
+ public static class FailMapper extends
+ Mapper<WritableComparable<?>, Writable, WritableComparable<?>, Writable> {
+
+ public void map(WritableComparable<?> key, Writable value, Context context)
+ throws IOException {
+ throw new RuntimeException("failing map");
+ }
+ }
+
+ // Mapper that sleeps for a long time.
+ // Used for running a job that will be killed
+ public static class KillMapper extends
+ Mapper<WritableComparable<?>, Writable, WritableComparable<?>, Writable> {
+
+ public void map(WritableComparable<?> key, Writable value, Context context)
+ throws IOException {
+ try {
+ Thread.sleep(1000000);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
+ }
+
+ public static class IncomparableKey implements WritableComparable<Object> {
+ public void write(DataOutput out) { }
+ public void readFields(DataInput in) { }
+ public int compareTo(Object o) {
+ throw new RuntimeException("Should never see this.");
+ }
+ }
+
+ public static class FakeSplit extends InputSplit implements Writable {
+ public void write(DataOutput out) throws IOException { }
+ public void readFields(DataInput in) throws IOException { }
+ public long getLength() { return 0L; }
+ public String[] getLocations() { return new String[0]; }
+ }
+
+ public static class Fake_IF<K,V>
+ extends InputFormat<K, V>
+ implements Configurable {
+
+ public Fake_IF() { }
+
+ public List<InputSplit> getSplits(JobContext context) {
+ List<InputSplit> ret = new ArrayList<InputSplit>();
+ ret.add(new FakeSplit());
+ return ret;
+ }
+ public static void setKeyClass(Configuration conf, Class<?> k) {
+ conf.setClass("test.fakeif.keyclass", k, WritableComparable.class);
+ }
+
+ public static void setValClass(Configuration job, Class<?> v) {
+ job.setClass("test.fakeif.valclass", v, Writable.class);
+ }
+
+ protected Class<? extends K> keyclass;
+ protected Class<? extends V> valclass;
+ Configuration conf = null;
+
+ @SuppressWarnings("unchecked")
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ keyclass = (Class<? extends K>) conf.getClass("test.fakeif.keyclass",
+ NullWritable.class, WritableComparable.class);
+ valclass = (Class<? extends V>) conf.getClass("test.fakeif.valclass",
+ NullWritable.class, WritableComparable.class);
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public RecordReader<K,V> createRecordReader(
+ InputSplit ignored, TaskAttemptContext context) {
+ return new RecordReader<K,V>() {
+ public boolean nextKeyValue() throws IOException { return false; }
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {}
+ public K getCurrentKey() {
+ return null;
+ }
+ public V getCurrentValue() {
+ return null;
+ }
+ public void close() throws IOException { }
+ public float getProgress() throws IOException { return 0.0f; }
+ };
+ }
+ }
+
+ public static class Fake_RR<K, V> extends RecordReader<K,V> {
+ private Class<? extends K> keyclass;
+ private Class<? extends V> valclass;
+ public boolean nextKeyValue() throws IOException { return false; }
+ @SuppressWarnings("unchecked")
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ keyclass = (Class<? extends K>) conf.getClass("test.fakeif.keyclass",
+ NullWritable.class, WritableComparable.class);
+ valclass = (Class<? extends V>) conf.getClass("test.fakeif.valclass",
+ NullWritable.class, WritableComparable.class);
+
+ }
+ public K getCurrentKey() {
+ return ReflectionUtils.newInstance(keyclass, null);
+ }
+ public V getCurrentValue() {
+ return ReflectionUtils.newInstance(valclass, null);
+ }
+ public void close() throws IOException { }
+ public float getProgress() throws IOException { return 0.0f; }
+ }
+
+ public static Job createJob(Configuration conf, Path inDir, Path outDir,
+ int numInputFiles, int numReds) throws IOException {
+ String input = "The quick brown fox\n" + "has many silly\n"
+ + "red fox sox\n";
+ return createJob(conf, inDir, outDir, numInputFiles, numReds, input);
+ }
+
+ public static Job createJob(Configuration conf, Path inDir, Path outDir,
+ int numInputFiles, int numReds, String input) throws IOException {
+ Job job = Job.getInstance(conf);
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
+ if (fs.exists(inDir)) {
+ fs.delete(inDir, true);
+ }
+ fs.mkdirs(inDir);
+ for (int i = 0; i < numInputFiles; ++i) {
+ DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
+ file.writeBytes(input);
+ file.close();
+ }
+
+ FileInputFormat.setInputPaths(job, inDir);
+ FileOutputFormat.setOutputPath(job, outDir);
+ job.setNumReduceTasks(numReds);
+ return job;
+ }
+
+ public static TaskAttemptContext createDummyMapTaskAttemptContext(
+ Configuration conf) {
+ TaskAttemptID tid = new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0);
+ conf.set(MRJobConfig.TASK_ATTEMPT_ID, tid.toString());
+ return new TaskAttemptContextImpl(conf, tid);
+ }
+
+ public static StatusReporter createDummyReporter() {
+ return new StatusReporter() {
+ public void setStatus(String s) {
+ }
+ public void progress() {
+ }
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+ public Counter getCounter(Enum<?> name) {
+ return new Counters().findCounter(name);
+ }
+ public Counter getCounter(String group, String name) {
+ return new Counters().findCounter(group, name);
+ }
+ };
+ }
+
+ // Return output of MR job by reading from the given output directory
+ public static String readOutput(Path outDir, Configuration conf)
+ throws IOException {
+ FileSystem fs = outDir.getFileSystem(conf);
+ StringBuffer result = new StringBuffer();
+
+ Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+ new Utils.OutputFileUtils.OutputFilesFilter()));
+ for (Path outputFile : fileList) {
+ LOG.info("Path" + ": "+ outputFile);
+ BufferedReader file =
+ new BufferedReader(new InputStreamReader(fs.open(outputFile)));
+ String line = file.readLine();
+ while (line != null) {
+ result.append(line);
+ result.append("\n");
+ line = file.readLine();
+ }
+ file.close();
+ }
+ return result.toString();
+ }
+
+ /**
+ * Reads tasklog and returns it as string after trimming it.
+ *
+ * @param filter
+ * Task log filter; can be STDOUT, STDERR, SYSLOG, DEBUGOUT, PROFILE
+ * @param taskId
+ * The task id for which the log has to collected
+ * @param isCleanup
+ * whether the task is a cleanup attempt or not.
+ * @return task log as string
+ * @throws IOException
+ */
+ public static String readTaskLog(TaskLog.LogName filter,
+ org.apache.hadoop.mapred.TaskAttemptID taskId, boolean isCleanup)
+ throws IOException {
+ // string buffer to store task log
+ StringBuffer result = new StringBuffer();
+ int res;
+
+ // reads the whole tasklog into inputstream
+ InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1,
+ isCleanup);
+ // construct string log from inputstream.
+ byte[] b = new byte[65536];
+ while (true) {
+ res = taskLogReader.read(b);
+ if (res > 0) {
+ result.append(new String(b));
+ } else {
+ break;
+ }
+ }
+ taskLogReader.close();
+
+ // trim the string and return it
+ String str = result.toString();
+ str = str.trim();
+ return str;
+ }
+
+}
Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Nov 19 01:24:32 2011
@@ -0,0 +1 @@
+target
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml?rev=1203941&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml Sat Nov 19 01:24:32 2011
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project>
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-project</artifactId>
+ <version>0.24.0-SNAPSHOT</version>
+ <relativePath>../../hadoop-project</relativePath>
+ </parent>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-examples</artifactId>
+ <version>0.24.0-SNAPSHOT</version>
+ <description>Apache Hadoop MapReduce Examples</description>
+ <name>Apache Hadoop MapReduce Examples</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+</project>
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/DistSum.java?rev=1203941&r1=1203935&r2=1203941&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/DistSum.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/DistSum.java Sat Nov 19 01:24:32 2011
@@ -38,7 +38,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -53,6 +52,7 @@ import org.apache.hadoop.mapreduce.Reduc
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -386,8 +386,11 @@ public final class DistSum extends Confi
@Override
public synchronized void init(Job job) throws IOException {
final Configuration conf = job.getConfiguration();
- if (cluster == null)
- cluster = new Cluster(JobTracker.getAddress(conf), conf);
+ if (cluster == null) {
+ String jobTrackerStr = conf.get("mapreduce.jobtracker.address", "localhost:8012");
+ cluster = new Cluster(NetUtils.createSocketAddr(jobTrackerStr), conf);
+
+ }
chooseMachine(conf).init(job);
}
@@ -604,4 +607,4 @@ public final class DistSum extends Confi
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(null, new DistSum(), args));
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/pom.xml?rev=1203941&r1=1203940&r2=1203941&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/pom.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/pom.xml Sat Nov 19 01:24:32 2011
@@ -35,12 +35,13 @@
<fork.mode>once</fork.mode>
<mr.basedir>${basedir}</mr.basedir>
</properties>
-
+
<modules>
<module>hadoop-yarn</module>
- <module>hadoop-mapreduce-client</module>
+ <module>hadoop-mapreduce-client</module>
+ <module>hadoop-mapreduce-examples</module>
</modules>
-
+
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
@@ -106,7 +107,7 @@
</exclusion>
</exclusions>
</dependency>
-
+
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -166,9 +167,9 @@
<artifactId>clover</artifactId>
<version>3.0.2</version>
</dependency>
-
+
</dependencies>
-
+
<build>
<pluginManagement>
<plugins>
@@ -321,7 +322,7 @@
</executions>
</plugin>
</plugins>
- </build>
+ </build>
</profile>
<profile>
<id>dist</id>