You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2011/11/17 22:13:30 UTC

svn commit: r1203371 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/

Author: tucu
Date: Thu Nov 17 21:13:29 2011
New Revision: 1203371

URL: http://svn.apache.org/viewvc?rev=1203371&view=rev
Log:
MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides client APIs cross MR1 and MR2. (Ahmed via tucu)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1203371&r1=1203370&r2=1203371&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Nov 17 21:13:29 2011
@@ -34,6 +34,9 @@ Trunk (unreleased changes)
     MAPREDUCE-3149. Add a test to verify that TokenCache handles file system 
     uri with no authority. (John George via jitendra)
 
+    MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides 
+    client APIs cross MR1 and MR2. (Ahmed via tucu)
+
   BUG FIXES
     MAPREDUCE-3346. [Rumen] LoggedTaskAttempt#getHostName() returns null. 
                     (amarrk)

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java?rev=1203371&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java Thu Nov 17 21:13:29 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/*
+ * A simple interface for a client MR cluster used for testing. This interface
+ * provides basic methods which are independent of the underlying Mini Cluster (
+ * either through MR1 or MR2).
+ */
+public interface MiniMRClientCluster {
+
+  public void start() throws IOException;
+
+  public void stop() throws IOException;
+
+  public Configuration getConfig() throws IOException;
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java?rev=1203371&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java Thu Nov 17 21:13:29 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+
+/**
+ * A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster
+ * interface around the MiniMRYarnCluster. While in MR1, it provides such
+ * wrapper around MiniMRCluster. This factory should be used in tests to provide
+ * an easy migration of tests across MR1 and MR2.
+ */
+public class MiniMRClientClusterFactory {
+
+  public static MiniMRClientCluster create(Class<?> caller, int noOfNMs,
+      Configuration conf) throws IOException {
+
+    if (conf == null) {
+      conf = new Configuration();
+    }
+
+    FileSystem fs = FileSystem.get(conf);
+
+    Path testRootDir = new Path("target", caller.getName() + "-tmpDir")
+        .makeQualified(fs);
+    Path appJar = new Path(testRootDir, "MRAppJar.jar");
+
+    // Copy MRAppJar and make it private.
+    Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR);
+
+    fs.copyFromLocalFile(appMasterJar, appJar);
+    fs.setPermission(appJar, new FsPermission("700"));
+
+    Job job = Job.getInstance(conf);
+
+    job.addFileToClassPath(appJar);
+    job.setJarByClass(caller);
+
+    MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
+        .getName(), noOfNMs);
+    miniMRYarnCluster.init(job.getConfiguration());
+    miniMRYarnCluster.start();
+
+    return new MiniMRYarnClusterAdapter(miniMRYarnCluster);
+  }
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1203371&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java Thu Nov 17 21:13:29 2011
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * This class is an MR2 replacement for older MR1 MiniMRCluster, that was used
+ * by tests prior to MR2. This replacement class uses the new MiniMRYarnCluster
+ * in MR2 but provides the same old MR1 interface, so tests can be migrated from
+ * MR1 to MR2 with minimal changes.
+ *
+ * Due to major differences between MR1 and MR2, a number of methods are either
+ * unimplemented/unsupported or were re-implemented to provide wrappers around
+ * MR2 functionality.
+ */
+public class MiniMRCluster {
+  private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
+
+  private MiniMRClientCluster mrClientCluster;
+
+  public String getTaskTrackerLocalDir(int taskTracker) {
+    throw new UnsupportedOperationException();
+  }
+
+  public String[] getTaskTrackerLocalDirs(int taskTracker) {
+    throw new UnsupportedOperationException();
+  }
+
+  class JobTrackerRunner {
+    // Mock class
+  }
+
+  class TaskTrackerRunner {
+    // Mock class
+  }
+
+  public JobTrackerRunner getJobTrackerRunner() {
+    throw new UnsupportedOperationException();
+  }
+
+  TaskTrackerRunner getTaskTrackerRunner(int id) {
+    throw new UnsupportedOperationException();
+  }
+
+  public int getNumTaskTrackers() {
+    throw new UnsupportedOperationException();
+  }
+
+  public void setInlineCleanupThreads() {
+    throw new UnsupportedOperationException();
+  }
+
+  public void waitUntilIdle() {
+    throw new UnsupportedOperationException();
+  }
+
+  private void waitTaskTrackers() {
+    throw new UnsupportedOperationException();
+  }
+
+  public int getJobTrackerPort() {
+    throw new UnsupportedOperationException();
+  }
+
+  public JobConf createJobConf() {
+    JobConf jobConf = null;
+    try {
+      jobConf = new JobConf(mrClientCluster.getConfig());
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    return jobConf;
+  }
+
+  public JobConf createJobConf(JobConf conf) {
+    JobConf jobConf = null;
+    try {
+      jobConf = new JobConf(mrClientCluster.getConfig());
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    return jobConf;
+  }
+
+  static JobConf configureJobConf(JobConf conf, String namenode,
+      int jobTrackerPort, int jobTrackerInfoPort, UserGroupInformation ugi) {
+    throw new UnsupportedOperationException();
+  }
+
+  public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
+      String[] racks, String[] hosts) throws IOException {
+    this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts);
+  }
+
+  public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
+      String[] racks, String[] hosts, JobConf conf) throws IOException {
+    this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf);
+  }
+
+  public MiniMRCluster(int numTaskTrackers, String namenode, int numDir)
+      throws IOException {
+    this(0, 0, numTaskTrackers, namenode, numDir);
+  }
+
+  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode, int numDir) throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+        null);
+  }
+
+  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode, int numDir, String[] racks)
+      throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+        racks, null);
+  }
+
+  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode, int numDir, String[] racks,
+      String[] hosts) throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+        racks, hosts, null);
+  }
+
+  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode, int numDir, String[] racks,
+      String[] hosts, UserGroupInformation ugi) throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+        racks, hosts, ugi, null);
+  }
+
+  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode, int numDir, String[] racks,
+      String[] hosts, UserGroupInformation ugi, JobConf conf)
+      throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+        racks, hosts, ugi, conf, 0);
+  }
+
+  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode, int numDir, String[] racks,
+      String[] hosts, UserGroupInformation ugi, JobConf conf,
+      int numTrackerToExclude) throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
+        racks, hosts, ugi, conf, numTrackerToExclude, new Clock());
+  }
+
+  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode, int numDir, String[] racks,
+      String[] hosts, UserGroupInformation ugi, JobConf conf,
+      int numTrackerToExclude, Clock clock) throws IOException {
+    if (conf == null) conf = new JobConf();
+    FileSystem.setDefaultUri(conf, namenode);
+    mrClientCluster = MiniMRClientClusterFactory.create(this.getClass(),
+        numTaskTrackers, conf);
+  }
+
+  public UserGroupInformation getUgi() {
+    throw new UnsupportedOperationException();
+  }
+
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from,
+      int max) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  public void setJobPriority(JobID jobId, JobPriority priority)
+      throws AccessControlException, IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  public JobPriority getJobPriority(JobID jobId) {
+    throw new UnsupportedOperationException();
+  }
+
+  public long getJobFinishTime(JobID jobId) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void initializeJob(JobID jobId) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  public MapTaskCompletionEventsUpdate getMapTaskCompletionEventsUpdates(
+      int index, JobID jobId, int max) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  public JobConf getJobTrackerConf() {
+    JobConf jobConf = null;
+    try {
+      jobConf = new JobConf(mrClientCluster.getConfig());
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    return jobConf;
+  }
+
+  public int getFaultCount(String hostName) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void startJobTracker() {
+    // Do nothing
+  }
+
+  public void startJobTracker(boolean wait) {
+    // Do nothing
+  }
+
+  public void stopJobTracker() {
+    // Do nothing
+  }
+
+  public void stopTaskTracker(int id) {
+    // Do nothing
+  }
+
+  public void startTaskTracker(String host, String rack, int idx, int numDir)
+      throws IOException {
+    // Do nothing
+  }
+
+  void addTaskTracker(TaskTrackerRunner taskTracker) {
+    throw new UnsupportedOperationException();
+  }
+
+  int getTaskTrackerID(String trackerName) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void shutdown() {
+    try {
+      mrClientCluster.stop();
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java?rev=1203371&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java Thu Nov 17 21:13:29 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+
+/**
+ * An adapter for MiniMRYarnCluster providing a MiniMRClientCluster interface.
+ * This interface could be used by tests across both MR1 and MR2.
+ */
+public class MiniMRYarnClusterAdapter implements MiniMRClientCluster {
+
+  private MiniMRYarnCluster miniMRYarnCluster;
+
+  public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) {
+    this.miniMRYarnCluster = miniMRYarnCluster;
+  }
+
+  @Override
+  public Configuration getConfig() {
+    return miniMRYarnCluster.getConfig();
+  }
+
+  @Override
+  public void start() {
+    miniMRYarnCluster.start();
+  }
+
+  @Override
+  public void stop() {
+    miniMRYarnCluster.stop();
+  }
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java?rev=1203371&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java Thu Nov 17 21:13:29 2011
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Basic testing for the MiniMRClientCluster. This test shows an example class
+ * that can be used in MR1 or MR2, without any change to the test. The test will
+ * use MiniMRYarnCluster in MR2, and MiniMRCluster in MR1.
+ */
+public class TestMiniMRClientCluster {
+
+  private static Path inDir = null;
+  private static Path outDir = null;
+  private static Path testdir = null;
+  private static Path[] inFiles = new Path[5];
+  private static MiniMRClientCluster mrCluster;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    final Configuration conf = new Configuration();
+    final Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+        "/tmp"));
+    testdir = new Path(TEST_ROOT_DIR, "TestMiniMRClientCluster");
+    inDir = new Path(testdir, "in");
+    outDir = new Path(testdir, "out");
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(testdir) && !fs.delete(testdir, true)) {
+      throw new IOException("Could not delete " + testdir);
+    }
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir);
+    }
+
+    for (int i = 0; i < inFiles.length; i++) {
+      inFiles[i] = new Path(inDir, "part_" + i);
+      createFile(inFiles[i], conf);
+    }
+
+    // create the mini cluster to be used for the tests
+    mrCluster = MiniMRClientClusterFactory.create(
+        TestMiniMRClientCluster.class, 1, new Configuration());
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    // clean up the input and output files
+    final Configuration conf = new Configuration();
+    final FileSystem fs = testdir.getFileSystem(conf);
+    if (fs.exists(testdir)) {
+      fs.delete(testdir, true);
+    }
+    // stopping the mini cluster
+    mrCluster.stop();
+  }
+
+  @Test
+  public void testJob() throws Exception {
+    final Job job = createJob();
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job,
+        inDir);
+    org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job,
+        new Path(outDir, "testJob"));
+    assertTrue(job.waitForCompletion(true));
+    validateCounters(job.getCounters(), 5, 25, 5, 5);
+  }
+
+  private void validateCounters(Counters counters, long mapInputRecords,
+      long mapOutputRecords, long reduceInputGroups, long reduceOutputRecords) {
+    assertEquals("MapInputRecords", mapInputRecords, counters.findCounter(
+        "MyCounterGroup", "MAP_INPUT_RECORDS").getValue());
+    assertEquals("MapOutputRecords", mapOutputRecords, counters.findCounter(
+        "MyCounterGroup", "MAP_OUTPUT_RECORDS").getValue());
+    assertEquals("ReduceInputGroups", reduceInputGroups, counters.findCounter(
+        "MyCounterGroup", "REDUCE_INPUT_GROUPS").getValue());
+    assertEquals("ReduceOutputRecords", reduceOutputRecords, counters
+        .findCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS").getValue());
+  }
+
+  private static void createFile(Path inFile, Configuration conf)
+      throws IOException {
+    final FileSystem fs = inFile.getFileSystem(conf);
+    if (fs.exists(inFile)) {
+      return;
+    }
+    FSDataOutputStream out = fs.create(inFile);
+    out.writeBytes("This is a test file");
+    out.close();
+  }
+
+  public static Job createJob() throws IOException {
+    final Job baseJob = new Job(mrCluster.getConfig());
+    baseJob.setOutputKeyClass(Text.class);
+    baseJob.setOutputValueClass(IntWritable.class);
+    baseJob.setMapperClass(MyMapper.class);
+    baseJob.setReducerClass(MyReducer.class);
+    baseJob.setNumReduceTasks(1);
+    return baseJob;
+  }
+
+  public static class MyMapper extends
+      org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable> {
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+
+    public void map(Object key, Text value, Context context)
+        throws IOException, InterruptedException {
+      context.getCounter("MyCounterGroup", "MAP_INPUT_RECORDS").increment(1);
+      StringTokenizer iter = new StringTokenizer(value.toString());
+      while (iter.hasMoreTokens()) {
+        word.set(iter.nextToken());
+        context.write(word, one);
+        context.getCounter("MyCounterGroup", "MAP_OUTPUT_RECORDS").increment(1);
+      }
+    }
+  }
+
+  public static class MyReducer extends
+      org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
+    private IntWritable result = new IntWritable();
+
+    public void reduce(Text key, Iterable<IntWritable> values, Context context)
+        throws IOException, InterruptedException {
+      context.getCounter("MyCounterGroup", "REDUCE_INPUT_GROUPS").increment(1);
+      int sum = 0;
+      for (IntWritable val : values) {
+        sum += val.get();
+      }
+      result.set(sum);
+      context.write(key, result);
+      context.getCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS")
+          .increment(1);
+    }
+  }
+
+}