You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2011/11/17 22:13:30 UTC
svn commit: r1203371 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/
Author: tucu
Date: Thu Nov 17 21:13:29 2011
New Revision: 1203371
URL: http://svn.apache.org/viewvc?rev=1203371&view=rev
Log:
MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides client APIs cross MR1 and MR2. (Ahmed via tucu)
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1203371&r1=1203370&r2=1203371&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Nov 17 21:13:29 2011
@@ -34,6 +34,9 @@ Trunk (unreleased changes)
MAPREDUCE-3149. Add a test to verify that TokenCache handles file system
uri with no authority. (John George via jitendra)
+ MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides
+ client APIs cross MR1 and MR2. (Ahmed via tucu)
+
BUG FIXES
MAPREDUCE-3346. [Rumen] LoggedTaskAttempt#getHostName() returns null.
(amarrk)
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java?rev=1203371&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java Thu Nov 17 21:13:29 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/*
+ * A simple interface for a client MR cluster used for testing. This interface
+ * provides basic methods which are independent of the underlying Mini Cluster (
+ * either through MR1 or MR2).
+ */
+public interface MiniMRClientCluster {
+
+ public void start() throws IOException;
+
+ public void stop() throws IOException;
+
+ public Configuration getConfig() throws IOException;
+
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java?rev=1203371&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java Thu Nov 17 21:13:29 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+
+/**
+ * A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster
+ * interface around the MiniMRYarnCluster. While in MR1, it provides such
+ * wrapper around MiniMRCluster. This factory should be used in tests to provide
+ * an easy migration of tests across MR1 and MR2.
+ */
+public class MiniMRClientClusterFactory {
+
+ public static MiniMRClientCluster create(Class<?> caller, int noOfNMs,
+ Configuration conf) throws IOException {
+
+ if (conf == null) {
+ conf = new Configuration();
+ }
+
+ FileSystem fs = FileSystem.get(conf);
+
+ Path testRootDir = new Path("target", caller.getName() + "-tmpDir")
+ .makeQualified(fs);
+ Path appJar = new Path(testRootDir, "MRAppJar.jar");
+
+ // Copy MRAppJar and make it private.
+ Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR);
+
+ fs.copyFromLocalFile(appMasterJar, appJar);
+ fs.setPermission(appJar, new FsPermission("700"));
+
+ Job job = Job.getInstance(conf);
+
+ job.addFileToClassPath(appJar);
+ job.setJarByClass(caller);
+
+ MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
+ .getName(), noOfNMs);
+ miniMRYarnCluster.init(job.getConfiguration());
+ miniMRYarnCluster.start();
+
+ return new MiniMRYarnClusterAdapter(miniMRYarnCluster);
+ }
+
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1203371&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java Thu Nov 17 21:13:29 2011
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * This class is an MR2 replacement for older MR1 MiniMRCluster, that was used
+ * by tests prior to MR2. This replacement class uses the new MiniMRYarnCluster
+ * in MR2 but provides the same old MR1 interface, so tests can be migrated from
+ * MR1 to MR2 with minimal changes.
+ *
+ * Due to major differences between MR1 and MR2, a number of methods are either
+ * unimplemented/unsupported or were re-implemented to provide wrappers around
+ * MR2 functionality.
+ */
+public class MiniMRCluster {
+ private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
+
+ private MiniMRClientCluster mrClientCluster;
+
+ public String getTaskTrackerLocalDir(int taskTracker) {
+ throw new UnsupportedOperationException();
+ }
+
+ public String[] getTaskTrackerLocalDirs(int taskTracker) {
+ throw new UnsupportedOperationException();
+ }
+
+ class JobTrackerRunner {
+ // Mock class
+ }
+
+ class TaskTrackerRunner {
+ // Mock class
+ }
+
+ public JobTrackerRunner getJobTrackerRunner() {
+ throw new UnsupportedOperationException();
+ }
+
+ TaskTrackerRunner getTaskTrackerRunner(int id) {
+ throw new UnsupportedOperationException();
+ }
+
+ public int getNumTaskTrackers() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setInlineCleanupThreads() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void waitUntilIdle() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void waitTaskTrackers() {
+ throw new UnsupportedOperationException();
+ }
+
+ public int getJobTrackerPort() {
+ throw new UnsupportedOperationException();
+ }
+
+ public JobConf createJobConf() {
+ JobConf jobConf = null;
+ try {
+ jobConf = new JobConf(mrClientCluster.getConfig());
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ return jobConf;
+ }
+
+ public JobConf createJobConf(JobConf conf) {
+ JobConf jobConf = null;
+ try {
+ jobConf = new JobConf(mrClientCluster.getConfig());
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ return jobConf;
+ }
+
+ static JobConf configureJobConf(JobConf conf, String namenode,
+ int jobTrackerPort, int jobTrackerInfoPort, UserGroupInformation ugi) {
+ throw new UnsupportedOperationException();
+ }
+
+ public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
+ String[] racks, String[] hosts) throws IOException {
+ this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts);
+ }
+
+ public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
+ String[] racks, String[] hosts, JobConf conf) throws IOException {
+ this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf);
+ }
+
+ public MiniMRCluster(int numTaskTrackers, String namenode, int numDir)
+ throws IOException {
+ this(0, 0, numTaskTrackers, namenode, numDir);
+ }
+
+ public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+ int numTaskTrackers, String namenode, int numDir) throws IOException {
+ this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+ null);
+ }
+
+ public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+ int numTaskTrackers, String namenode, int numDir, String[] racks)
+ throws IOException {
+ this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+ racks, null);
+ }
+
+ public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+ int numTaskTrackers, String namenode, int numDir, String[] racks,
+ String[] hosts) throws IOException {
+ this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+ racks, hosts, null);
+ }
+
+ public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+ int numTaskTrackers, String namenode, int numDir, String[] racks,
+ String[] hosts, UserGroupInformation ugi) throws IOException {
+ this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+ racks, hosts, ugi, null);
+ }
+
+ public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+ int numTaskTrackers, String namenode, int numDir, String[] racks,
+ String[] hosts, UserGroupInformation ugi, JobConf conf)
+ throws IOException {
+ this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+ racks, hosts, ugi, conf, 0);
+ }
+
+ public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+ int numTaskTrackers, String namenode, int numDir, String[] racks,
+ String[] hosts, UserGroupInformation ugi, JobConf conf,
+ int numTrackerToExclude) throws IOException {
+ this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+ racks, hosts, ugi, conf, numTrackerToExclude, new Clock());
+ }
+
+ public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+ int numTaskTrackers, String namenode, int numDir, String[] racks,
+ String[] hosts, UserGroupInformation ugi, JobConf conf,
+ int numTrackerToExclude, Clock clock) throws IOException {
+ if (conf == null) conf = new JobConf();
+ FileSystem.setDefaultUri(conf, namenode);
+ mrClientCluster = MiniMRClientClusterFactory.create(this.getClass(),
+ numTaskTrackers, conf);
+ }
+
+ public UserGroupInformation getUgi() {
+ throw new UnsupportedOperationException();
+ }
+
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from,
+ int max) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setJobPriority(JobID jobId, JobPriority priority)
+ throws AccessControlException, IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public JobPriority getJobPriority(JobID jobId) {
+ throw new UnsupportedOperationException();
+ }
+
+ public long getJobFinishTime(JobID jobId) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void initializeJob(JobID jobId) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public MapTaskCompletionEventsUpdate getMapTaskCompletionEventsUpdates(
+ int index, JobID jobId, int max) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public JobConf getJobTrackerConf() {
+ JobConf jobConf = null;
+ try {
+ jobConf = new JobConf(mrClientCluster.getConfig());
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ return jobConf;
+ }
+
+ public int getFaultCount(String hostName) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void startJobTracker() {
+ // Do nothing
+ }
+
+ public void startJobTracker(boolean wait) {
+ // Do nothing
+ }
+
+ public void stopJobTracker() {
+ // Do nothing
+ }
+
+ public void stopTaskTracker(int id) {
+ // Do nothing
+ }
+
+ public void startTaskTracker(String host, String rack, int idx, int numDir)
+ throws IOException {
+ // Do nothing
+ }
+
+ void addTaskTracker(TaskTrackerRunner taskTracker) {
+ throw new UnsupportedOperationException();
+ }
+
+ int getTaskTrackerID(String trackerName) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void shutdown() {
+ try {
+ mrClientCluster.stop();
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java?rev=1203371&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java Thu Nov 17 21:13:29 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+
+/**
+ * An adapter for MiniMRYarnCluster providing a MiniMRClientCluster interface.
+ * This interface could be used by tests across both MR1 and MR2.
+ */
+public class MiniMRYarnClusterAdapter implements MiniMRClientCluster {
+
+ private MiniMRYarnCluster miniMRYarnCluster;
+
+ public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) {
+ this.miniMRYarnCluster = miniMRYarnCluster;
+ }
+
+ @Override
+ public Configuration getConfig() {
+ return miniMRYarnCluster.getConfig();
+ }
+
+ @Override
+ public void start() {
+ miniMRYarnCluster.start();
+ }
+
+ @Override
+ public void stop() {
+ miniMRYarnCluster.stop();
+ }
+
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java?rev=1203371&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java Thu Nov 17 21:13:29 2011
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Basic testing for the MiniMRClientCluster. This test shows an example class
+ * that can be used in MR1 or MR2, without any change to the test. The test will
+ * use MiniMRYarnCluster in MR2, and MiniMRCluster in MR1.
+ */
+public class TestMiniMRClientCluster {
+
+ private static Path inDir = null;
+ private static Path outDir = null;
+ private static Path testdir = null;
+ private static Path[] inFiles = new Path[5];
+ private static MiniMRClientCluster mrCluster;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ final Configuration conf = new Configuration();
+ final Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+ "/tmp"));
+ testdir = new Path(TEST_ROOT_DIR, "TestMiniMRClientCluster");
+ inDir = new Path(testdir, "in");
+ outDir = new Path(testdir, "out");
+
+ FileSystem fs = FileSystem.getLocal(conf);
+ if (fs.exists(testdir) && !fs.delete(testdir, true)) {
+ throw new IOException("Could not delete " + testdir);
+ }
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir);
+ }
+
+ for (int i = 0; i < inFiles.length; i++) {
+ inFiles[i] = new Path(inDir, "part_" + i);
+ createFile(inFiles[i], conf);
+ }
+
+ // create the mini cluster to be used for the tests
+ mrCluster = MiniMRClientClusterFactory.create(
+ TestMiniMRClientCluster.class, 1, new Configuration());
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ // clean up the input and output files
+ final Configuration conf = new Configuration();
+ final FileSystem fs = testdir.getFileSystem(conf);
+ if (fs.exists(testdir)) {
+ fs.delete(testdir, true);
+ }
+ // stopping the mini cluster
+ mrCluster.stop();
+ }
+
+ @Test
+ public void testJob() throws Exception {
+ final Job job = createJob();
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job,
+ inDir);
+ org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job,
+ new Path(outDir, "testJob"));
+ assertTrue(job.waitForCompletion(true));
+ validateCounters(job.getCounters(), 5, 25, 5, 5);
+ }
+
+ private void validateCounters(Counters counters, long mapInputRecords,
+ long mapOutputRecords, long reduceInputGroups, long reduceOutputRecords) {
+ assertEquals("MapInputRecords", mapInputRecords, counters.findCounter(
+ "MyCounterGroup", "MAP_INPUT_RECORDS").getValue());
+ assertEquals("MapOutputRecords", mapOutputRecords, counters.findCounter(
+ "MyCounterGroup", "MAP_OUTPUT_RECORDS").getValue());
+ assertEquals("ReduceInputGroups", reduceInputGroups, counters.findCounter(
+ "MyCounterGroup", "REDUCE_INPUT_GROUPS").getValue());
+ assertEquals("ReduceOutputRecords", reduceOutputRecords, counters
+ .findCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS").getValue());
+ }
+
+ private static void createFile(Path inFile, Configuration conf)
+ throws IOException {
+ final FileSystem fs = inFile.getFileSystem(conf);
+ if (fs.exists(inFile)) {
+ return;
+ }
+ FSDataOutputStream out = fs.create(inFile);
+ out.writeBytes("This is a test file");
+ out.close();
+ }
+
+ public static Job createJob() throws IOException {
+ final Job baseJob = new Job(mrCluster.getConfig());
+ baseJob.setOutputKeyClass(Text.class);
+ baseJob.setOutputValueClass(IntWritable.class);
+ baseJob.setMapperClass(MyMapper.class);
+ baseJob.setReducerClass(MyReducer.class);
+ baseJob.setNumReduceTasks(1);
+ return baseJob;
+ }
+
+ public static class MyMapper extends
+ org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable> {
+ private final static IntWritable one = new IntWritable(1);
+ private Text word = new Text();
+
+ public void map(Object key, Text value, Context context)
+ throws IOException, InterruptedException {
+ context.getCounter("MyCounterGroup", "MAP_INPUT_RECORDS").increment(1);
+ StringTokenizer iter = new StringTokenizer(value.toString());
+ while (iter.hasMoreTokens()) {
+ word.set(iter.nextToken());
+ context.write(word, one);
+ context.getCounter("MyCounterGroup", "MAP_OUTPUT_RECORDS").increment(1);
+ }
+ }
+ }
+
+ public static class MyReducer extends
+ org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
+ private IntWritable result = new IntWritable();
+
+ public void reduce(Text key, Iterable<IntWritable> values, Context context)
+ throws IOException, InterruptedException {
+ context.getCounter("MyCounterGroup", "REDUCE_INPUT_GROUPS").increment(1);
+ int sum = 0;
+ for (IntWritable val : values) {
+ sum += val.get();
+ }
+ result.set(sum);
+ context.write(key, result);
+ context.getCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS")
+ .increment(1);
+ }
+ }
+
+}